iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::Duration,
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use chrono::prelude::*;
24use fastcrypto::{
25    encoding::{Base58, Encoding},
26    hash::MultisetHash,
27};
28use iota_archival::reader::ArchiveReaderBalancer;
29use iota_common::debug_fatal;
30use iota_config::{
31    NodeConfig,
32    genesis::Genesis,
33    node::{
34        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
35        StateDebugDumpConfig,
36    },
37};
38use iota_framework::{BuiltInFramework, SystemPackage};
39use iota_json_rpc_types::{
40    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
41    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
42    IotaTransactionBlockEvents, TransactionFilter,
43};
44use iota_macros::{fail_point, fail_point_async, fail_point_if};
45use iota_metrics::{
46    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
47};
48use iota_storage::{
49    key_value_store::{
50        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51    },
52    key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57    IOTA_SYSTEM_ADDRESS, TypeTag,
58    authenticator_state::get_authenticator_state,
59    base_types::*,
60    committee::{Committee, EpochId, ProtocolVersion},
61    crypto::{AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer, default_hash},
62    deny_list_v1::check_coin_deny_list_v1_during_signing,
63    digests::{ChainIdentifier, TransactionEventsDigest},
64    dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
65    effects::{
66        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
67        TransactionEvents, VerifiedCertifiedTransactionEffects, VerifiedSignedTransactionEffects,
68    },
69    error::{ExecutionError, IotaError, IotaResult, UserInputError},
70    event::{Event, EventID, SystemEpochInfoEvent},
71    executable_transaction::VerifiedExecutableTransaction,
72    execution_config_utils::to_binary_config,
73    execution_status::ExecutionStatus,
74    fp_ensure,
75    gas::{GasCostSummary, IotaGasStatus},
76    inner_temporary_store::{
77        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
78        WrittenObjects,
79    },
80    iota_system_state::{
81        IotaSystemState, IotaSystemStateTrait,
82        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
83    },
84    is_system_package,
85    layout_resolver::{LayoutResolver, into_struct_layout},
86    message_envelope::Message,
87    messages_checkpoint::{
88        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
89        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
90        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
91        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
92    },
93    messages_consensus::AuthorityCapabilitiesV1,
94    messages_grpc::{
95        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
96        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
97        TransactionStatus,
98    },
99    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
100    object::{
101        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
102        bounded_visitor::BoundedVisitor,
103    },
104    storage::{
105        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
106    },
107    supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions},
108    transaction::*,
109    transaction_executor::SimulateTransactionResult,
110};
111use itertools::Itertools;
112use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
113use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
114use once_cell::sync::OnceCell;
115use parking_lot::Mutex;
116use prometheus::{
117    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
118    register_histogram_vec_with_registry, register_histogram_with_registry,
119    register_int_counter_vec_with_registry, register_int_counter_with_registry,
120    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
121};
122use serde::{Deserialize, Serialize, de::DeserializeOwned};
123use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
124use tap::{TapFallible, TapOptional};
125use tokio::{
126    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
127    task::JoinHandle,
128};
129use tracing::{Instrument, debug, error, info, instrument, trace, warn};
130use typed_store::TypedStoreError;
131
132use self::{
133    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
134};
135#[cfg(msim)]
136pub use crate::checkpoints::checkpoint_executor::{
137    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
138};
139use crate::{
140    authority::{
141        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
142        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
143        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
144        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
145        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
146    },
147    authority_client::NetworkAuthorityClient,
148    checkpoints::CheckpointStore,
149    consensus_adapter::ConsensusAdapter,
150    epoch::committee_store::CommitteeStore,
151    execution_cache::{
152        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
153        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
154        TransactionCacheRead,
155    },
156    execution_driver::execution_process,
157    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
158    metrics::{LatencyObserver, RateTracker},
159    module_cache_metrics::ResolverMetrics,
160    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
161    rest_index::RestIndexStore,
162    stake_aggregator::StakeAggregator,
163    state_accumulator::{AccumulatorStore, StateAccumulator},
164    subscription_handler::SubscriptionHandler,
165    transaction_input_loader::TransactionInputLoader,
166    transaction_manager::TransactionManager,
167    transaction_outputs::TransactionOutputs,
168    validator_tx_finalizer::ValidatorTxFinalizer,
169    verify_indexes::verify_indexes,
170};
171
172#[cfg(test)]
173#[path = "unit_tests/authority_tests.rs"]
174pub mod authority_tests;
175
176#[cfg(test)]
177#[path = "unit_tests/transaction_tests.rs"]
178pub mod transaction_tests;
179
180#[cfg(test)]
181#[path = "unit_tests/batch_transaction_tests.rs"]
182mod batch_transaction_tests;
183
184#[cfg(test)]
185#[path = "unit_tests/move_integration_tests.rs"]
186pub mod move_integration_tests;
187
188#[cfg(test)]
189#[path = "unit_tests/gas_tests.rs"]
190mod gas_tests;
191
192#[cfg(test)]
193#[path = "unit_tests/batch_verification_tests.rs"]
194mod batch_verification_tests;
195
196#[cfg(test)]
197#[path = "unit_tests/coin_deny_list_tests.rs"]
198mod coin_deny_list_tests;
199
200#[cfg(any(test, feature = "test-utils"))]
201pub mod authority_test_utils;
202
203pub mod authority_per_epoch_store;
204pub mod authority_per_epoch_store_pruner;
205
206pub mod authority_store_pruner;
207pub mod authority_store_tables;
208pub mod authority_store_types;
209pub mod epoch_start_configuration;
210pub mod shared_object_congestion_tracker;
211pub mod shared_object_version_manager;
212#[cfg(any(test, feature = "test-utils"))]
213pub mod test_authority_builder;
214pub mod transaction_deferral;
215
216pub(crate) mod authority_store;
217
218pub static CHAIN_IDENTIFIER: OnceCell<ChainIdentifier> = OnceCell::new();
219
220/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
221pub struct AuthorityMetrics {
222    tx_orders: IntCounter,
223    total_certs: IntCounter,
224    total_cert_attempts: IntCounter,
225    total_effects: IntCounter,
226    pub shared_obj_tx: IntCounter,
227    sponsored_tx: IntCounter,
228    tx_already_processed: IntCounter,
229    num_input_objs: Histogram,
230    num_shared_objects: Histogram,
231    batch_size: Histogram,
232
233    authority_state_handle_transaction_latency: Histogram,
234
235    execute_certificate_latency_single_writer: Histogram,
236    execute_certificate_latency_shared_object: Histogram,
237
238    execute_certificate_with_effects_latency: Histogram,
239    internal_execution_latency: Histogram,
240    execution_load_input_objects_latency: Histogram,
241    prepare_certificate_latency: Histogram,
242    commit_certificate_latency: Histogram,
243    db_checkpoint_latency: Histogram,
244
245    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
246    pub(crate) transaction_manager_num_missing_objects: IntGauge,
247    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
248    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
249    pub(crate) transaction_manager_num_ready: IntGauge,
250    pub(crate) transaction_manager_object_cache_size: IntGauge,
251    pub(crate) transaction_manager_object_cache_hits: IntCounter,
252    pub(crate) transaction_manager_object_cache_misses: IntCounter,
253    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
254    pub(crate) transaction_manager_package_cache_size: IntGauge,
255    pub(crate) transaction_manager_package_cache_hits: IntCounter,
256    pub(crate) transaction_manager_package_cache_misses: IntCounter,
257    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
258    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
259
260    pub(crate) execution_driver_executed_transactions: IntCounter,
261    pub(crate) execution_driver_dispatch_queue: IntGauge,
262    pub(crate) execution_queueing_delay_s: Histogram,
263    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
264    pub(crate) execution_gas_latency_ratio: Histogram,
265
266    pub(crate) skipped_consensus_txns: IntCounter,
267    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
268
269    pub(crate) authority_overload_status: IntGauge,
270    pub(crate) authority_load_shedding_percentage: IntGauge,
271
272    pub(crate) transaction_overload_sources: IntCounterVec,
273
274    /// Post processing metrics
275    post_processing_total_events_emitted: IntCounter,
276    post_processing_total_tx_indexed: IntCounter,
277    post_processing_total_tx_had_event_processed: IntCounter,
278    post_processing_total_failures: IntCounter,
279
280    /// Consensus handler metrics
281    pub consensus_handler_processed: IntCounterVec,
282    pub consensus_handler_transaction_sizes: HistogramVec,
283    pub consensus_handler_num_low_scoring_authorities: IntGauge,
284    pub consensus_handler_scores: IntGaugeVec,
285    pub consensus_handler_deferred_transactions: IntCounter,
286    pub consensus_handler_congested_transactions: IntCounter,
287    pub consensus_handler_cancelled_transactions: IntCounter,
288    pub consensus_handler_max_object_costs: IntGaugeVec,
289    pub consensus_committed_subdags: IntCounterVec,
290    pub consensus_committed_messages: IntGaugeVec,
291    pub consensus_committed_user_transactions: IntGaugeVec,
292    pub consensus_calculated_throughput: IntGauge,
293    pub consensus_calculated_throughput_profile: IntGauge,
294
295    pub limits_metrics: Arc<LimitsMetrics>,
296
297    /// bytecode verifier metrics for tracking timeouts
298    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
299
300    /// Count of zklogin signatures
301    pub zklogin_sig_count: IntCounter,
302    /// Count of multisig signatures
303    pub multisig_sig_count: IntCounter,
304
305    // Tracks recent average txn queueing delay between when it is ready for execution
306    // until it starts executing.
307    pub execution_queueing_latency: LatencyObserver,
308
309    // Tracks the rate of transactions become ready for execution in transaction manager.
310    // The need for the Mutex is that the tracker is updated in transaction manager and read
311    // in the overload_monitor. There should be low mutex contention because
312    // transaction manager is single threaded and the read rate in overload_monitor is
313    // low. In the case where transaction manager becomes multi-threaded, we can
314    // create one rate tracker per thread.
315    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
316
317    // Tracks the rate of transactions starts execution in execution driver.
318    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
319    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
320}
321
322// Override default Prom buckets for positive numbers in 0-10M range
323const POSITIVE_INT_BUCKETS: &[f64] = &[
324    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
325    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
326    7000000., 10000000.,
327];
328
329const LATENCY_SEC_BUCKETS: &[f64] = &[
330    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
331    10., 20., 30., 60., 90.,
332];
333
334// Buckets for low latency samples. Starts from 10us.
335const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
336    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
337    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
338];
339
340const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
341    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
342    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
343];
344
345pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000;
346
347impl AuthorityMetrics {
348    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
349        let execute_certificate_latency = register_histogram_vec_with_registry!(
350            "authority_state_execute_certificate_latency",
351            "Latency of executing certificates, including waiting for inputs",
352            &["tx_type"],
353            LATENCY_SEC_BUCKETS.to_vec(),
354            registry,
355        )
356        .unwrap();
357
358        let execute_certificate_latency_single_writer =
359            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
360        let execute_certificate_latency_shared_object =
361            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
362
363        Self {
364            tx_orders: register_int_counter_with_registry!(
365                "total_transaction_orders",
366                "Total number of transaction orders",
367                registry,
368            )
369            .unwrap(),
370            total_certs: register_int_counter_with_registry!(
371                "total_transaction_certificates",
372                "Total number of transaction certificates handled",
373                registry,
374            )
375            .unwrap(),
376            total_cert_attempts: register_int_counter_with_registry!(
377                "total_handle_certificate_attempts",
378                "Number of calls to handle_certificate",
379                registry,
380            )
381            .unwrap(),
382            // total_effects == total transactions finished
383            total_effects: register_int_counter_with_registry!(
384                "total_transaction_effects",
385                "Total number of transaction effects produced",
386                registry,
387            )
388            .unwrap(),
389
390            shared_obj_tx: register_int_counter_with_registry!(
391                "num_shared_obj_tx",
392                "Number of transactions involving shared objects",
393                registry,
394            )
395            .unwrap(),
396
397            sponsored_tx: register_int_counter_with_registry!(
398                "num_sponsored_tx",
399                "Number of sponsored transactions",
400                registry,
401            )
402            .unwrap(),
403
404            tx_already_processed: register_int_counter_with_registry!(
405                "num_tx_already_processed",
406                "Number of transaction orders already processed previously",
407                registry,
408            )
409            .unwrap(),
410            num_input_objs: register_histogram_with_registry!(
411                "num_input_objects",
412                "Distribution of number of input TX objects per TX",
413                POSITIVE_INT_BUCKETS.to_vec(),
414                registry,
415            )
416            .unwrap(),
417            num_shared_objects: register_histogram_with_registry!(
418                "num_shared_objects",
419                "Number of shared input objects per TX",
420                POSITIVE_INT_BUCKETS.to_vec(),
421                registry,
422            )
423            .unwrap(),
424            batch_size: register_histogram_with_registry!(
425                "batch_size",
426                "Distribution of size of transaction batch",
427                POSITIVE_INT_BUCKETS.to_vec(),
428                registry,
429            )
430            .unwrap(),
431            authority_state_handle_transaction_latency: register_histogram_with_registry!(
432                "authority_state_handle_transaction_latency",
433                "Latency of handling transactions",
434                LATENCY_SEC_BUCKETS.to_vec(),
435                registry,
436            )
437            .unwrap(),
438            execute_certificate_latency_single_writer,
439            execute_certificate_latency_shared_object,
440            execute_certificate_with_effects_latency: register_histogram_with_registry!(
441                "authority_state_execute_certificate_with_effects_latency",
442                "Latency of executing certificates with effects, including waiting for inputs",
443                LATENCY_SEC_BUCKETS.to_vec(),
444                registry,
445            )
446            .unwrap(),
447            internal_execution_latency: register_histogram_with_registry!(
448                "authority_state_internal_execution_latency",
449                "Latency of actual certificate executions",
450                LATENCY_SEC_BUCKETS.to_vec(),
451                registry,
452            )
453            .unwrap(),
454            execution_load_input_objects_latency: register_histogram_with_registry!(
455                "authority_state_execution_load_input_objects_latency",
456                "Latency of loading input objects for execution",
457                LOW_LATENCY_SEC_BUCKETS.to_vec(),
458                registry,
459            )
460            .unwrap(),
461            prepare_certificate_latency: register_histogram_with_registry!(
462                "authority_state_prepare_certificate_latency",
463                "Latency of executing certificates, before committing the results",
464                LATENCY_SEC_BUCKETS.to_vec(),
465                registry,
466            )
467            .unwrap(),
468            commit_certificate_latency: register_histogram_with_registry!(
469                "authority_state_commit_certificate_latency",
470                "Latency of committing certificate execution results",
471                LATENCY_SEC_BUCKETS.to_vec(),
472                registry,
473            )
474            .unwrap(),
475            db_checkpoint_latency: register_histogram_with_registry!(
476                "db_checkpoint_latency",
477                "Latency of checkpointing dbs",
478                LATENCY_SEC_BUCKETS.to_vec(),
479                registry,
480            ).unwrap(),
481            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
482                "transaction_manager_num_enqueued_certificates",
483                "Current number of certificates enqueued to TransactionManager",
484                &["result"],
485                registry,
486            )
487            .unwrap(),
488            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
489                "transaction_manager_num_missing_objects",
490                "Current number of missing objects in TransactionManager",
491                registry,
492            )
493            .unwrap(),
494            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
495                "transaction_manager_num_pending_certificates",
496                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
497                registry,
498            )
499            .unwrap(),
500            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
501                "transaction_manager_num_executing_certificates",
502                "Number of executing certificates, including queued and actually running certificates",
503                registry,
504            )
505            .unwrap(),
506            transaction_manager_num_ready: register_int_gauge_with_registry!(
507                "transaction_manager_num_ready",
508                "Number of ready transactions in TransactionManager",
509                registry,
510            )
511            .unwrap(),
512            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
513                "transaction_manager_object_cache_size",
514                "Current size of object-availability cache in TransactionManager",
515                registry,
516            )
517            .unwrap(),
518            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
519                "transaction_manager_object_cache_hits",
520                "Number of object-availability cache hits in TransactionManager",
521                registry,
522            )
523            .unwrap(),
524            authority_overload_status: register_int_gauge_with_registry!(
525                "authority_overload_status",
526                "Whether authority is current experiencing overload and enters load shedding mode.",
527                registry)
528            .unwrap(),
529            authority_load_shedding_percentage: register_int_gauge_with_registry!(
530                "authority_load_shedding_percentage",
531                "The percentage of transactions is shed when the authority is in load shedding mode.",
532                registry)
533            .unwrap(),
534            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
535                "transaction_manager_object_cache_misses",
536                "Number of object-availability cache misses in TransactionManager",
537                registry,
538            )
539            .unwrap(),
540            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
541                "transaction_manager_object_cache_evictions",
542                "Number of object-availability cache evictions in TransactionManager",
543                registry,
544            )
545            .unwrap(),
546            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
547                "transaction_manager_package_cache_size",
548                "Current size of package-availability cache in TransactionManager",
549                registry,
550            )
551            .unwrap(),
552            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
553                "transaction_manager_package_cache_hits",
554                "Number of package-availability cache hits in TransactionManager",
555                registry,
556            )
557            .unwrap(),
558            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
559                "transaction_manager_package_cache_misses",
560                "Number of package-availability cache misses in TransactionManager",
561                registry,
562            )
563            .unwrap(),
564            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
565                "transaction_manager_package_cache_evictions",
566                "Number of package-availability cache evictions in TransactionManager",
567                registry,
568            )
569            .unwrap(),
570            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
571                "transaction_manager_transaction_queue_age_s",
572                "Time spent in waiting for transaction in the queue",
573                LATENCY_SEC_BUCKETS.to_vec(),
574                registry,
575            )
576            .unwrap(),
577            transaction_overload_sources: register_int_counter_vec_with_registry!(
578                "transaction_overload_sources",
579                "Number of times each source indicates transaction overload.",
580                &["source"],
581                registry)
582            .unwrap(),
583            execution_driver_executed_transactions: register_int_counter_with_registry!(
584                "execution_driver_executed_transactions",
585                "Cumulative number of transaction executed by execution driver",
586                registry,
587            )
588            .unwrap(),
589            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
590                "execution_driver_dispatch_queue",
591                "Number of transaction pending in execution driver dispatch queue",
592                registry,
593            )
594            .unwrap(),
595            execution_queueing_delay_s: register_histogram_with_registry!(
596                "execution_queueing_delay_s",
597                "Queueing delay between a transaction is ready for execution until it starts executing.",
598                LATENCY_SEC_BUCKETS.to_vec(),
599                registry
600            )
601            .unwrap(),
602            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
603                "prepare_cert_gas_latency_ratio",
604                "The ratio of computation gas divided by VM execution latency.",
605                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
606                registry
607            )
608            .unwrap(),
609            execution_gas_latency_ratio: register_histogram_with_registry!(
610                "execution_gas_latency_ratio",
611                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
612                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
613                registry
614            )
615            .unwrap(),
616            skipped_consensus_txns: register_int_counter_with_registry!(
617                "skipped_consensus_txns",
618                "Total number of consensus transactions skipped",
619                registry,
620            )
621            .unwrap(),
622            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
623                "skipped_consensus_txns_cache_hit",
624                "Total number of consensus transactions skipped because of local cache hit",
625                registry,
626            )
627            .unwrap(),
628            post_processing_total_events_emitted: register_int_counter_with_registry!(
629                "post_processing_total_events_emitted",
630                "Total number of events emitted in post processing",
631                registry,
632            )
633            .unwrap(),
634            post_processing_total_tx_indexed: register_int_counter_with_registry!(
635                "post_processing_total_tx_indexed",
636                "Total number of txes indexed in post processing",
637                registry,
638            )
639            .unwrap(),
640            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
641                "post_processing_total_tx_had_event_processed",
642                "Total number of txes finished event processing in post processing",
643                registry,
644            )
645            .unwrap(),
646            post_processing_total_failures: register_int_counter_with_registry!(
647                "post_processing_total_failures",
648                "Total number of failure in post processing",
649                registry,
650            )
651            .unwrap(),
652            consensus_handler_processed: register_int_counter_vec_with_registry!(
653                "consensus_handler_processed",
654                "Number of transactions processed by consensus handler",
655                &["class"],
656                registry
657            ).unwrap(),
658            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
659                "consensus_handler_transaction_sizes",
660                "Sizes of each type of transactions processed by consensus handler",
661                &["class"],
662                POSITIVE_INT_BUCKETS.to_vec(),
663                registry
664            ).unwrap(),
665            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
666                "consensus_handler_num_low_scoring_authorities",
667                "Number of low scoring authorities based on reputation scores from consensus",
668                registry
669            ).unwrap(),
670            consensus_handler_scores: register_int_gauge_vec_with_registry!(
671                "consensus_handler_scores",
672                "scores from consensus for each authority",
673                &["authority"],
674                registry,
675            ).unwrap(),
676            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
677                "consensus_handler_deferred_transactions",
678                "Number of transactions deferred by consensus handler",
679                registry,
680            ).unwrap(),
681            consensus_handler_congested_transactions: register_int_counter_with_registry!(
682                "consensus_handler_congested_transactions",
683                "Number of transactions deferred by consensus handler due to congestion",
684                registry,
685            ).unwrap(),
686            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
687                "consensus_handler_cancelled_transactions",
688                "Number of transactions cancelled by consensus handler",
689                registry,
690            ).unwrap(),
691            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
692                "consensus_handler_max_congestion_control_object_costs",
693                "Max object costs for congestion control in the current consensus commit",
694                &["commit_type"],
695                registry,
696            ).unwrap(),
697            consensus_committed_subdags: register_int_counter_vec_with_registry!(
698                "consensus_committed_subdags",
699                "Number of committed subdags, sliced by author",
700                &["authority"],
701                registry,
702            ).unwrap(),
703            consensus_committed_messages: register_int_gauge_vec_with_registry!(
704                "consensus_committed_messages",
705                "Total number of committed consensus messages, sliced by author",
706                &["authority"],
707                registry,
708            ).unwrap(),
709            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
710                "consensus_committed_user_transactions",
711                "Number of committed user transactions, sliced by submitter",
712                &["authority"],
713                registry,
714            ).unwrap(),
715            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
716            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
717            zklogin_sig_count: register_int_counter_with_registry!(
718                "zklogin_sig_count",
719                "Count of zkLogin signatures",
720                registry,
721            )
722            .unwrap(),
723            multisig_sig_count: register_int_counter_with_registry!(
724                "multisig_sig_count",
725                "Count of zkLogin signatures",
726                registry,
727            )
728            .unwrap(),
729            consensus_calculated_throughput: register_int_gauge_with_registry!(
730                "consensus_calculated_throughput",
731                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
732                registry,
733            ).unwrap(),
734            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
735                "consensus_calculated_throughput_profile",
736                "The current active calculated throughput profile",
737                registry
738            ).unwrap(),
739            execution_queueing_latency: LatencyObserver::new(),
740            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
741            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
742        }
743    }
744
745    /// Reset metrics that contain `hostname` as one of the labels. This is
746    /// needed to avoid retaining metrics for long-gone committee members and
747    /// only exposing metrics for the committee in the current epoch.
748    pub fn reset_on_reconfigure(&self) {
749        self.consensus_committed_messages.reset();
750        self.consensus_handler_scores.reset();
751        self.consensus_committed_user_transactions.reset();
752    }
753}
754
755/// a Trait object for `Signer` that is:
756/// - Pin, i.e. confined to one place in memory (we don't want to copy private
757///   keys).
758/// - Sync, i.e. can be safely shared between threads.
759///
760/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
761pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
762
763pub struct AuthorityState {
764    // Fixed size, static, identity of the authority
765    /// The name of this authority.
766    pub name: AuthorityName,
767    /// The signature key of the authority.
768    pub secret: StableSyncAuthoritySigner,
769
770    /// The database
771    input_loader: TransactionInputLoader,
772    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
773
774    epoch_store: ArcSwap<AuthorityPerEpochStore>,
775
776    /// This lock denotes current 'execution epoch'.
777    /// Execution acquires read lock, checks certificate epoch and holds it
778    /// until all writes are complete. Reconfiguration acquires write lock,
779    /// changes the epoch and revert all transactions from previous epoch
780    /// that are executed but did not make into checkpoint.
781    execution_lock: RwLock<EpochId>,
782
783    pub indexes: Option<Arc<IndexStore>>,
784    pub rest_index: Option<Arc<RestIndexStore>>,
785
786    pub subscription_handler: Arc<SubscriptionHandler>,
787    checkpoint_store: Arc<CheckpointStore>,
788
789    committee_store: Arc<CommitteeStore>,
790
791    /// Manages pending certificates and their missing input objects.
792    transaction_manager: Arc<TransactionManager>,
793
794    /// Shuts down the execution task. Used only in testing.
795    #[cfg_attr(not(test), expect(unused))]
796    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
797
798    pub metrics: Arc<AuthorityMetrics>,
799    _pruner: AuthorityStorePruner,
800    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
801
802    /// Take db checkpoints of different dbs
803    db_checkpoint_config: DBCheckpointConfig,
804
805    pub config: NodeConfig,
806
807    /// Current overload status in this authority. Updated periodically.
808    pub overload_info: AuthorityOverloadInfo,
809
810    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
811}
812
813/// The authority state encapsulates all state, drives execution, and ensures
814/// safety.
815///
816/// Note the authority operations can be accessed through a read ref (&) and do
817/// not require &mut. Internally a database is synchronized through a mutex
818/// lock.
819///
820/// Repeating valid commands should produce no changes and return no error.
821impl AuthorityState {
822    pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
823        epoch_store.committee().authority_exists(&self.name)
824    }
825
826    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
827        !self.is_validator(epoch_store)
828    }
829
830    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
831        &self.committee_store
832    }
833
834    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
835        self.committee_store.clone()
836    }
837
838    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
839        &self.config.authority_overload_config
840    }
841
842    pub fn get_epoch_state_commitments(
843        &self,
844        epoch: EpochId,
845    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
846        self.checkpoint_store.get_epoch_state_commitments(epoch)
847    }
848
849    /// This is a private method and should be kept that way. It doesn't check
850    /// whether the provided transaction is a system transaction, and hence
851    /// can only be called internally.
852    #[instrument(level = "trace", skip_all)]
853    async fn handle_transaction_impl(
854        &self,
855        transaction: VerifiedTransaction,
856        epoch_store: &Arc<AuthorityPerEpochStore>,
857    ) -> IotaResult<VerifiedSignedTransaction> {
858        // Ensure that validator cannot reconfigure while we are signing the tx
859        let _execution_lock = self.execution_lock_for_signing().await;
860
861        let tx_digest = transaction.digest();
862        let tx_data = transaction.data().transaction_data();
863
864        let input_object_kinds = tx_data.input_objects()?;
865        let receiving_objects_refs = tx_data.receiving_objects();
866
867        // Note: the deny checks may do redundant package loads but:
868        // - they only load packages when there is an active package deny map
869        // - the loads are cached anyway
870        iota_transaction_checks::deny::check_transaction_for_signing(
871            tx_data,
872            transaction.tx_signatures(),
873            &input_object_kinds,
874            &receiving_objects_refs,
875            &self.config.transaction_deny_config,
876            self.get_backing_package_store().as_ref(),
877        )?;
878
879        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
880            Some(tx_digest),
881            &input_object_kinds,
882            &receiving_objects_refs,
883            epoch_store.epoch(),
884        )?;
885
886        let (_gas_status, checked_input_objects) =
887            iota_transaction_checks::check_transaction_input(
888                epoch_store.protocol_config(),
889                epoch_store.reference_gas_price(),
890                tx_data,
891                input_objects,
892                &receiving_objects,
893                &self.metrics.bytecode_verifier_metrics,
894                &self.config.verifier_signing_config,
895            )?;
896
897        check_coin_deny_list_v1_during_signing(
898            tx_data.sender(),
899            &checked_input_objects,
900            &receiving_objects,
901            &self.get_object_store(),
902        )?;
903
904        let owned_objects = checked_input_objects.inner().filter_owned_objects();
905
906        let signed_transaction = VerifiedSignedTransaction::new(
907            epoch_store.epoch(),
908            transaction,
909            self.name,
910            &*self.secret,
911        );
912
913        // Check and write locks, to signed transaction, into the database
914        // The call to self.set_transaction_lock checks the lock is not conflicting,
915        // and returns ConflictingTransaction error in case there is a lock on a
916        // different existing transaction.
917        self.get_cache_writer()
918            .acquire_transaction_locks(epoch_store, &owned_objects, signed_transaction.clone())
919            .await?;
920
921        Ok(signed_transaction)
922    }
923
924    /// Initiate a new transaction.
925    #[instrument(level = "trace", skip_all)]
926    pub async fn handle_transaction(
927        &self,
928        epoch_store: &Arc<AuthorityPerEpochStore>,
929        transaction: VerifiedTransaction,
930    ) -> IotaResult<HandleTransactionResponse> {
931        let tx_digest = *transaction.digest();
932        debug!("handle_transaction");
933
934        // Ensure an idempotent answer.
935        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
936            return Ok(HandleTransactionResponse { status });
937        }
938
939        let _metrics_guard = self
940            .metrics
941            .authority_state_handle_transaction_latency
942            .start_timer();
943        self.metrics.tx_orders.inc();
944
945        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
946        match signed {
947            Ok(s) => {
948                if self.is_validator(epoch_store) {
949                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
950                        let tx = s.clone();
951                        let validator_tx_finalizer = validator_tx_finalizer.clone();
952                        let cache_reader = self.get_transaction_cache_reader().clone();
953                        let epoch_store = epoch_store.clone();
954                        spawn_monitored_task!(epoch_store.within_alive_epoch(
955                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
956                        ));
957                    }
958                }
959                Ok(HandleTransactionResponse {
960                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
961                })
962            }
963            // It happens frequently that while we are checking the validity of the transaction, it
964            // has just been executed.
965            // In that case, we could still return Ok to avoid showing confusing errors.
966            Err(err) => Ok(HandleTransactionResponse {
967                status: self
968                    .get_transaction_status(&tx_digest, epoch_store)?
969                    .ok_or(err)?
970                    .1,
971            }),
972        }
973    }
974
975    pub fn check_system_overload_at_signing(&self) -> bool {
976        self.config
977            .authority_overload_config
978            .check_system_overload_at_signing
979    }
980
981    pub fn check_system_overload_at_execution(&self) -> bool {
982        self.config
983            .authority_overload_config
984            .check_system_overload_at_execution
985    }
986
987    pub(crate) fn check_system_overload(
988        &self,
989        consensus_adapter: &Arc<ConsensusAdapter>,
990        tx_data: &SenderSignedData,
991        do_authority_overload_check: bool,
992    ) -> IotaResult {
993        if do_authority_overload_check {
994            self.check_authority_overload(tx_data).tap_err(|_| {
995                self.update_overload_metrics("execution_queue");
996            })?;
997        }
998        self.transaction_manager
999            .check_execution_overload(self.overload_config(), tx_data)
1000            .tap_err(|_| {
1001                self.update_overload_metrics("execution_pending");
1002            })?;
1003        consensus_adapter.check_consensus_overload().tap_err(|_| {
1004            self.update_overload_metrics("consensus");
1005        })?;
1006        Ok(())
1007    }
1008
1009    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1010        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1011            return Ok(());
1012        }
1013
1014        let load_shedding_percentage = self
1015            .overload_info
1016            .load_shedding_percentage
1017            .load(Ordering::Relaxed);
1018        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1019    }
1020
1021    fn update_overload_metrics(&self, source: &str) {
1022        self.metrics
1023            .transaction_overload_sources
1024            .with_label_values(&[source])
1025            .inc();
1026    }
1027
1028    /// Executes a transaction that's known to have correct effects.
1029    /// For such transaction, we don't have to wait for consensus to set shared
1030    /// object locks because we already know the shared object versions
1031    /// based on the effects. This function can be called by a fullnode
1032    /// only.
1033    #[instrument(level = "trace", skip_all)]
1034    pub async fn fullnode_execute_certificate_with_effects(
1035        &self,
1036        transaction: &VerifiedExecutableTransaction,
1037        // NOTE: the caller of this must promise to wait until it
1038        // knows for sure this tx is finalized, namely, it has seen a
1039        // CertifiedTransactionEffects or at least f+1 identifical effects
1040        // digests matching this TransactionEffectsEnvelope, before calling
1041        // this function, in order to prevent a byzantine validator from
1042        // giving us incorrect effects.
1043        effects: &VerifiedCertifiedTransactionEffects,
1044        epoch_store: &Arc<AuthorityPerEpochStore>,
1045    ) -> IotaResult {
1046        assert!(self.is_fullnode(epoch_store));
1047        // NOTE: the fullnode can change epoch during local execution. It should not
1048        // cause data inconsistency, but can be problematic for certain tests.
1049        // The check below mitigates the issue, but it is not a fundamental solution to
1050        // avoid race between local execution and reconfiguration.
1051        if self.epoch_store.load().epoch() != epoch_store.epoch() {
1052            return Err(IotaError::EpochEnded(epoch_store.epoch()));
1053        }
1054        let _metrics_guard = self
1055            .metrics
1056            .execute_certificate_with_effects_latency
1057            .start_timer();
1058        let digest = *transaction.digest();
1059        debug!("execute_certificate_with_effects");
1060        fp_ensure!(
1061            *effects.data().transaction_digest() == digest,
1062            IotaError::ErrorWhileProcessingCertificate {
1063                err: "effects/tx digest mismatch".to_string()
1064            }
1065        );
1066
1067        if transaction.contains_shared_object() {
1068            epoch_store
1069                .acquire_shared_version_assignments_from_effects(
1070                    transaction,
1071                    effects.data(),
1072                    self.get_object_cache_reader().as_ref(),
1073                )
1074                .await?;
1075        }
1076
1077        let expected_effects_digest = effects.digest();
1078
1079        self.transaction_manager
1080            .enqueue(vec![transaction.clone()], epoch_store);
1081
1082        let observed_effects = self
1083            .get_transaction_cache_reader()
1084            .notify_read_executed_effects(&[digest])
1085            .instrument(tracing::debug_span!(
1086                "notify_read_effects_in_execute_certificate_with_effects"
1087            ))
1088            .await?
1089            .pop()
1090            .expect("notify_read_effects should return exactly 1 element");
1091
1092        let observed_effects_digest = observed_effects.digest();
1093        if &observed_effects_digest != expected_effects_digest {
1094            panic!(
1095                "Locally executed effects do not match canonical effects! expected_effects_digest={:?} observed_effects_digest={:?} expected_effects={:?} observed_effects={:?} input_objects={:?}",
1096                expected_effects_digest,
1097                observed_effects_digest,
1098                effects.data(),
1099                observed_effects,
1100                transaction.data().transaction_data().input_objects()
1101            );
1102        }
1103        Ok(())
1104    }
1105
1106    /// Executes a certificate for its effects.
1107    #[instrument(level = "trace", skip_all)]
1108    pub async fn execute_certificate(
1109        &self,
1110        certificate: &VerifiedCertificate,
1111        epoch_store: &Arc<AuthorityPerEpochStore>,
1112    ) -> IotaResult<TransactionEffects> {
1113        let _metrics_guard = if certificate.contains_shared_object() {
1114            self.metrics
1115                .execute_certificate_latency_shared_object
1116                .start_timer()
1117        } else {
1118            self.metrics
1119                .execute_certificate_latency_single_writer
1120                .start_timer()
1121        };
1122        trace!("execute_certificate");
1123
1124        self.metrics.total_cert_attempts.inc();
1125
1126        if !certificate.contains_shared_object() {
1127            // Shared object transactions need to be sequenced by the consensus before
1128            // enqueueing for execution, done in
1129            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1130            // object transactions, they can be enqueued for execution immediately.
1131            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1132        }
1133
1134        // tx could be reverted when epoch ends, so we must be careful not to return a
1135        // result here after the epoch ends.
1136        epoch_store
1137            .within_alive_epoch(self.notify_read_effects(certificate))
1138            .await
1139            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1140            .and_then(|r| r)
1141    }
1142
1143    /// Internal logic to execute a certificate.
1144    ///
1145    /// Guarantees that
1146    /// - If input objects are available, return no permanent failure.
1147    /// - Execution and output commit are atomic. i.e. outputs are only written
1148    ///   to storage,
1149    /// on successful execution; crashed execution has no observable effect and
1150    /// can be retried.
1151    ///
1152    /// It is caller's responsibility to ensure input objects are available and
1153    /// locks are set. If this cannot be satisfied by the caller,
1154    /// execute_certificate() should be called instead.
1155    ///
1156    /// Should only be called within iota-core.
1157    #[instrument(level = "trace", skip_all)]
1158    pub async fn try_execute_immediately(
1159        &self,
1160        certificate: &VerifiedExecutableTransaction,
1161        mut expected_effects_digest: Option<TransactionEffectsDigest>,
1162        epoch_store: &Arc<AuthorityPerEpochStore>,
1163    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1164        let _scope = monitored_scope("Execution::try_execute_immediately");
1165        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1166
1167        let tx_digest = certificate.digest();
1168
1169        // Acquire a lock to prevent concurrent executions of the same transaction.
1170        let tx_guard = epoch_store.acquire_tx_guard(certificate).await?;
1171
1172        // The cert could have been processed by a concurrent attempt of the same cert,
1173        // so check if the effects have already been written.
1174        if let Some(effects) = self
1175            .get_transaction_cache_reader()
1176            .get_executed_effects(tx_digest)?
1177        {
1178            tx_guard.release();
1179            return Ok((effects, None));
1180        }
1181        let input_objects =
1182            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1183
1184        if expected_effects_digest.is_none() {
1185            // We could be re-executing a previously executed but uncommitted transaction,
1186            // perhaps after restarting with a new binary. In this situation, if
1187            // we have published an effects signature, we must be sure not to
1188            // equivocate. TODO: read from cache instead of DB
1189            expected_effects_digest = epoch_store.get_signed_effects_digest(tx_digest)?;
1190        }
1191
1192        self.process_certificate(
1193            tx_guard,
1194            certificate,
1195            input_objects,
1196            expected_effects_digest,
1197            epoch_store,
1198        )
1199        .await
1200        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1201        .tap_ok(
1202            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1203        )
1204    }
1205
1206    pub fn read_objects_for_execution(
1207        &self,
1208        tx_lock: &CertLockGuard,
1209        certificate: &VerifiedExecutableTransaction,
1210        epoch_store: &Arc<AuthorityPerEpochStore>,
1211    ) -> IotaResult<InputObjects> {
1212        let _scope = monitored_scope("Execution::load_input_objects");
1213        let _metrics_guard = self
1214            .metrics
1215            .execution_load_input_objects_latency
1216            .start_timer();
1217        let input_objects = &certificate.data().transaction_data().input_objects()?;
1218        self.input_loader.read_objects_for_execution(
1219            epoch_store,
1220            &certificate.key(),
1221            tx_lock,
1222            input_objects,
1223            epoch_store.epoch(),
1224        )
1225    }
1226
1227    /// Test only wrapper for `try_execute_immediately()` above, useful for
1228    /// checking errors if the pre-conditions are not satisfied, and
1229    /// executing change epoch transactions.
1230    pub async fn try_execute_for_test(
1231        &self,
1232        certificate: &VerifiedCertificate,
1233    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1234        let epoch_store = self.epoch_store_for_testing();
1235        let (effects, execution_error_opt) = self
1236            .try_execute_immediately(
1237                &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1238                None,
1239                &epoch_store,
1240            )
1241            .await?;
1242        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1243        Ok((signed_effects, execution_error_opt))
1244    }
1245
1246    pub async fn notify_read_effects(
1247        &self,
1248        certificate: &VerifiedCertificate,
1249    ) -> IotaResult<TransactionEffects> {
1250        self.get_transaction_cache_reader()
1251            .notify_read_executed_effects(&[*certificate.digest()])
1252            .await
1253            .map(|mut r| r.pop().expect("must return correct number of effects"))
1254    }
1255
1256    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1257        self.get_object_cache_reader()
1258            .check_owned_objects_are_live(owned_object_refs)
1259    }
1260
1261    /// This function captures the required state to debug a forked transaction.
1262    /// The dump is written to a file in dir `path`, with name prefixed by the
1263    /// transaction digest. NOTE: Since this info escapes the validator
1264    /// context, make sure not to leak any private info here
1265    pub(crate) fn debug_dump_transaction_state(
1266        &self,
1267        tx_digest: &TransactionDigest,
1268        effects: &TransactionEffects,
1269        expected_effects_digest: TransactionEffectsDigest,
1270        inner_temporary_store: &InnerTemporaryStore,
1271        certificate: &VerifiedExecutableTransaction,
1272        debug_dump_config: &StateDebugDumpConfig,
1273    ) -> IotaResult<PathBuf> {
1274        let dump_dir = debug_dump_config
1275            .dump_file_directory
1276            .as_ref()
1277            .cloned()
1278            .unwrap_or(std::env::temp_dir());
1279        let epoch_store = self.load_epoch_store_one_call_per_task();
1280
1281        NodeStateDump::new(
1282            tx_digest,
1283            effects,
1284            expected_effects_digest,
1285            self.get_object_store().as_ref(),
1286            &epoch_store,
1287            inner_temporary_store,
1288            certificate,
1289        )?
1290        .write_to_file(&dump_dir)
1291        .map_err(|e| IotaError::FileIO(e.to_string()))
1292    }
1293
1294    #[instrument(level = "trace", skip_all)]
1295    pub(crate) async fn process_certificate(
1296        &self,
1297        tx_guard: CertTxGuard,
1298        certificate: &VerifiedExecutableTransaction,
1299        input_objects: InputObjects,
1300        expected_effects_digest: Option<TransactionEffectsDigest>,
1301        epoch_store: &Arc<AuthorityPerEpochStore>,
1302    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1303        let process_certificate_start_time = tokio::time::Instant::now();
1304        let digest = *certificate.digest();
1305
1306        fail_point_if!("correlated-crash-process-certificate", || {
1307            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1308                iota_simulator::task::kill_current_node(None);
1309            }
1310        });
1311
1312        let execution_guard = self
1313            .execution_lock_for_executable_transaction(certificate)
1314            .await;
1315        // Any caller that verifies the signatures on the certificate will have already
1316        // checked the epoch. But paths that don't verify sigs (e.g. execution
1317        // from checkpoint, reading from db) present the possibility of an epoch
1318        // mismatch. If this cert is not finalzied in previous epoch, then it's
1319        // invalid.
1320        let execution_guard = match execution_guard {
1321            Ok(execution_guard) => execution_guard,
1322            Err(err) => {
1323                tx_guard.release();
1324                return Err(err);
1325            }
1326        };
1327        // Since we obtain a reference to the epoch store before taking the execution
1328        // lock, it's possible that reconfiguration has happened and they no
1329        // longer match.
1330        if *execution_guard != epoch_store.epoch() {
1331            tx_guard.release();
1332            info!("The epoch of the execution_guard doesn't match the epoch store");
1333            return Err(IotaError::WrongEpoch {
1334                expected_epoch: epoch_store.epoch(),
1335                actual_epoch: *execution_guard,
1336            });
1337        }
1338
1339        // Errors originating from prepare_certificate may be transient (failure to read
1340        // locks) or non-transient (transaction input is invalid, move vm
1341        // errors). However, all errors from this function occur before we have
1342        // written anything to the db, so we commit the tx guard and rely on the
1343        // client to retry the tx (if it was transient).
1344        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1345            &execution_guard,
1346            certificate,
1347            input_objects,
1348            epoch_store,
1349        ) {
1350            Err(e) => {
1351                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1352                tx_guard.release();
1353                return Err(e);
1354            }
1355            Ok(res) => res,
1356        };
1357
1358        if let Some(expected_effects_digest) = expected_effects_digest {
1359            if effects.digest() != expected_effects_digest {
1360                // We dont want to mask the original error, so we log it and continue.
1361                match self.debug_dump_transaction_state(
1362                    &digest,
1363                    &effects,
1364                    expected_effects_digest,
1365                    &inner_temporary_store,
1366                    certificate,
1367                    &self.config.state_debug_dump_config,
1368                ) {
1369                    Ok(out_path) => {
1370                        info!(
1371                            "Dumped node state for transaction {} to {}",
1372                            digest,
1373                            out_path.as_path().display().to_string()
1374                        );
1375                    }
1376                    Err(e) => {
1377                        error!("Error dumping state for transaction {}: {e}", digest);
1378                    }
1379                }
1380                error!(
1381                    tx_digest = ?digest,
1382                    ?expected_effects_digest,
1383                    actual_effects = ?effects,
1384                    "fork detected!"
1385                );
1386                panic!(
1387                    "Transaction {} is expected to have effects digest {}, but got {}!",
1388                    digest,
1389                    expected_effects_digest,
1390                    effects.digest(),
1391                );
1392            }
1393        }
1394
1395        fail_point_async!("crash");
1396
1397        self.commit_certificate(
1398            certificate,
1399            inner_temporary_store,
1400            &effects,
1401            tx_guard,
1402            execution_guard,
1403            epoch_store,
1404        )
1405        .await?;
1406
1407        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1408            certificate.data().transaction_data().kind()
1409        {
1410            if let Some(err) = &execution_error_opt {
1411                debug_fatal!("Authenticator state update failed: {:?}", err);
1412            }
1413            epoch_store.update_authenticator_state(auth_state);
1414
1415            // double check that the signature verifier always matches the authenticator
1416            // state
1417            if cfg!(debug_assertions) {
1418                let authenticator_state = get_authenticator_state(self.get_object_store())
1419                    .expect("Read cannot fail")
1420                    .expect("Authenticator state must exist");
1421
1422                let mut sys_jwks: Vec<_> = authenticator_state
1423                    .active_jwks
1424                    .into_iter()
1425                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1426                    .collect();
1427                let mut active_jwks: Vec<_> = epoch_store
1428                    .signature_verifier
1429                    .get_jwks()
1430                    .into_iter()
1431                    .collect();
1432                sys_jwks.sort();
1433                active_jwks.sort();
1434
1435                assert_eq!(sys_jwks, active_jwks);
1436            }
1437        }
1438
1439        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1440        if elapsed > 0.0 {
1441            self.metrics
1442                .execution_gas_latency_ratio
1443                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1444        };
1445        Ok((effects, execution_error_opt))
1446    }
1447
1448    #[instrument(level = "trace", skip_all)]
1449    async fn commit_certificate(
1450        &self,
1451        certificate: &VerifiedExecutableTransaction,
1452        inner_temporary_store: InnerTemporaryStore,
1453        effects: &TransactionEffects,
1454        tx_guard: CertTxGuard,
1455        _execution_guard: ExecutionLockReadGuard<'_>,
1456        epoch_store: &Arc<AuthorityPerEpochStore>,
1457    ) -> IotaResult {
1458        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1459            monitored_scope("Execution::commit_certificate");
1460        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1461
1462        let tx_key = certificate.key();
1463        let tx_digest = certificate.digest();
1464        let input_object_count = inner_temporary_store.input_objects.len();
1465        let shared_object_count = effects.input_shared_objects().len();
1466
1467        let output_keys = inner_temporary_store.get_output_keys(effects);
1468
1469        // index certificate
1470        let _ = self
1471            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1472            .await
1473            .tap_err(|e| {
1474                self.metrics.post_processing_total_failures.inc();
1475                error!(?tx_digest, "tx post processing failed: {e}");
1476            });
1477
1478        // The insertion to epoch_store is not atomic with the insertion to the
1479        // perpetual store. This is OK because we insert to the epoch store
1480        // first. And during lookups we always look up in the perpetual store first.
1481        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1482
1483        // Allow testing what happens if we crash here.
1484        fail_point_async!("crash");
1485
1486        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1487            certificate.clone().into_unsigned(),
1488            effects.clone(),
1489            inner_temporary_store,
1490        );
1491        self.get_cache_writer()
1492            .write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())
1493            .await?;
1494
1495        if certificate.transaction_data().is_end_of_epoch_tx() {
1496            // At the end of epoch, since system packages may have been upgraded, force
1497            // reload them in the cache.
1498            self.get_object_cache_reader()
1499                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1500        }
1501
1502        // commit_certificate finished, the tx is fully committed to the store.
1503        tx_guard.commit_tx();
1504
1505        // Notifies transaction manager about transaction and output objects committed.
1506        // This provides necessary information to transaction manager to start executing
1507        // additional ready transactions.
1508        self.transaction_manager
1509            .notify_commit(tx_digest, output_keys, epoch_store);
1510
1511        self.update_metrics(certificate, input_object_count, shared_object_count);
1512
1513        Ok(())
1514    }
1515
1516    fn update_metrics(
1517        &self,
1518        certificate: &VerifiedExecutableTransaction,
1519        input_object_count: usize,
1520        shared_object_count: usize,
1521    ) {
1522        // count signature by scheme, for zklogin and multisig
1523        if certificate.has_zklogin_sig() {
1524            self.metrics.zklogin_sig_count.inc();
1525        } else if certificate.has_upgraded_multisig() {
1526            self.metrics.multisig_sig_count.inc();
1527        }
1528
1529        self.metrics.total_effects.inc();
1530        self.metrics.total_certs.inc();
1531
1532        if shared_object_count > 0 {
1533            self.metrics.shared_obj_tx.inc();
1534        }
1535
1536        if certificate.is_sponsored_tx() {
1537            self.metrics.sponsored_tx.inc();
1538        }
1539
1540        self.metrics
1541            .num_input_objs
1542            .observe(input_object_count as f64);
1543        self.metrics
1544            .num_shared_objects
1545            .observe(shared_object_count as f64);
1546        self.metrics.batch_size.observe(
1547            certificate
1548                .data()
1549                .intent_message()
1550                .value
1551                .kind()
1552                .num_commands() as f64,
1553        );
1554    }
1555
1556    /// prepare_certificate validates the transaction input, and executes the
1557    /// certificate, returning effects, output objects, events, etc.
1558    ///
1559    /// It reads state from the db (both owned and shared locks), but it has no
1560    /// side effects.
1561    ///
1562    /// It can be generally understood that a failure of prepare_certificate
1563    /// indicates a non-transient error, e.g. the transaction input is
1564    /// somehow invalid, the correct locks are not held, etc. However, this
1565    /// is not entirely true, as a transient db read error may also cause
1566    /// this function to fail.
1567    #[instrument(level = "trace", skip_all)]
1568    fn prepare_certificate(
1569        &self,
1570        _execution_guard: &ExecutionLockReadGuard<'_>,
1571        certificate: &VerifiedExecutableTransaction,
1572        input_objects: InputObjects,
1573        epoch_store: &Arc<AuthorityPerEpochStore>,
1574    ) -> IotaResult<(
1575        InnerTemporaryStore,
1576        TransactionEffects,
1577        Option<ExecutionError>,
1578    )> {
1579        let _scope = monitored_scope("Execution::prepare_certificate");
1580        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1581        let prepare_certificate_start_time = tokio::time::Instant::now();
1582
1583        // TODO: We need to move this to a more appropriate place to avoid redundant
1584        // checks.
1585        let tx_data = certificate.data().transaction_data();
1586        tx_data.validity_check(epoch_store.protocol_config())?;
1587
1588        // The cost of partially re-auditing a transaction before execution is
1589        // tolerated.
1590        let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1591            certificate,
1592            input_objects,
1593            epoch_store.protocol_config(),
1594            epoch_store.reference_gas_price(),
1595        )?;
1596
1597        let owned_object_refs = input_objects.inner().filter_owned_objects();
1598        self.check_owned_locks(&owned_object_refs)?;
1599        let tx_digest = *certificate.digest();
1600        let protocol_config = epoch_store.protocol_config();
1601        let transaction_data = &certificate.data().intent_message().value;
1602        let (kind, signer, gas) = transaction_data.execution_parts();
1603
1604        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1605        let (inner_temp_store, _, mut effects, execution_error_opt) =
1606            epoch_store.executor().execute_transaction_to_effects(
1607                self.get_backing_store().as_ref(),
1608                protocol_config,
1609                self.metrics.limits_metrics.clone(),
1610                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1611                // cyclic dependency w/ iota-adapter
1612                self.config
1613                    .expensive_safety_check_config
1614                    .enable_deep_per_tx_iota_conservation_check(),
1615                self.config.certificate_deny_config.certificate_deny_set(),
1616                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1617                epoch_store
1618                    .epoch_start_config()
1619                    .epoch_data()
1620                    .epoch_start_timestamp(),
1621                input_objects,
1622                gas,
1623                gas_status,
1624                kind,
1625                signer,
1626                tx_digest,
1627                &mut None,
1628            );
1629
1630        fail_point_if!("cp_execution_nondeterminism", || {
1631            #[cfg(msim)]
1632            self.create_fail_state(certificate, epoch_store, &mut effects);
1633        });
1634
1635        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1636        if elapsed > 0.0 {
1637            self.metrics
1638                .prepare_cert_gas_latency_ratio
1639                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1640        }
1641
1642        Ok((inner_temp_store, effects, execution_error_opt.err()))
1643    }
1644
1645    pub fn prepare_certificate_for_benchmark(
1646        &self,
1647        certificate: &VerifiedExecutableTransaction,
1648        input_objects: InputObjects,
1649        epoch_store: &Arc<AuthorityPerEpochStore>,
1650    ) -> IotaResult<(
1651        InnerTemporaryStore,
1652        TransactionEffects,
1653        Option<ExecutionError>,
1654    )> {
1655        let lock: RwLock<EpochId> = RwLock::new(epoch_store.epoch());
1656        let execution_guard = lock.try_read().unwrap();
1657
1658        self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1659    }
1660
1661    pub async fn dry_exec_transaction(
1662        &self,
1663        transaction: TransactionData,
1664        transaction_digest: TransactionDigest,
1665    ) -> IotaResult<(
1666        DryRunTransactionBlockResponse,
1667        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1668        TransactionEffects,
1669        Option<ObjectID>,
1670    )> {
1671        let epoch_store = self.load_epoch_store_one_call_per_task();
1672        if !self.is_fullnode(&epoch_store) {
1673            return Err(IotaError::UnsupportedFeature {
1674                error: "dry-exec is only supported on fullnodes".to_string(),
1675            });
1676        }
1677
1678        if transaction.kind().is_system_tx() {
1679            return Err(IotaError::UnsupportedFeature {
1680                error: "dry-exec does not support system transactions".to_string(),
1681            });
1682        }
1683
1684        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1685            .await
1686    }
1687
1688    pub async fn dry_exec_transaction_for_benchmark(
1689        &self,
1690        transaction: TransactionData,
1691        transaction_digest: TransactionDigest,
1692    ) -> IotaResult<(
1693        DryRunTransactionBlockResponse,
1694        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1695        TransactionEffects,
1696        Option<ObjectID>,
1697    )> {
1698        let epoch_store = self.load_epoch_store_one_call_per_task();
1699        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1700            .await
1701    }
1702
1703    async fn dry_exec_transaction_impl(
1704        &self,
1705        epoch_store: &AuthorityPerEpochStore,
1706        transaction: TransactionData,
1707        transaction_digest: TransactionDigest,
1708    ) -> IotaResult<(
1709        DryRunTransactionBlockResponse,
1710        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1711        TransactionEffects,
1712        Option<ObjectID>,
1713    )> {
1714        // Cheap validity checks for a transaction, including input size limits.
1715        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1716
1717        let input_object_kinds = transaction.input_objects()?;
1718        let receiving_object_refs = transaction.receiving_objects();
1719
1720        iota_transaction_checks::deny::check_transaction_for_signing(
1721            &transaction,
1722            &[],
1723            &input_object_kinds,
1724            &receiving_object_refs,
1725            &self.config.transaction_deny_config,
1726            self.get_backing_package_store().as_ref(),
1727        )?;
1728
1729        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1730            // We don't want to cache this transaction since it's a dry run.
1731            None,
1732            &input_object_kinds,
1733            &receiving_object_refs,
1734            epoch_store.epoch(),
1735        )?;
1736
1737        // make a gas object if one was not provided
1738        let mut gas_object_refs = transaction.gas().to_vec();
1739        let reference_gas_price = epoch_store.reference_gas_price();
1740        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1741            let sender = transaction.sender();
1742            // use a 1B iota coin
1743            const NANOS_TO_IOTA: u64 = 1_000_000_000;
1744            const DRY_RUN_IOTA: u64 = 1_000_000_000;
1745            let max_coin_value = NANOS_TO_IOTA * DRY_RUN_IOTA;
1746            let gas_object_id = ObjectID::random();
1747            let gas_object = Object::new_move(
1748                MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
1749                Owner::AddressOwner(sender),
1750                TransactionDigest::genesis_marker(),
1751            );
1752            let gas_object_ref = gas_object.compute_object_reference();
1753            gas_object_refs = vec![gas_object_ref];
1754            (
1755                iota_transaction_checks::check_transaction_input_with_given_gas(
1756                    epoch_store.protocol_config(),
1757                    reference_gas_price,
1758                    &transaction,
1759                    input_objects,
1760                    receiving_objects,
1761                    gas_object,
1762                    &self.metrics.bytecode_verifier_metrics,
1763                    &self.config.verifier_signing_config,
1764                )?,
1765                Some(gas_object_id),
1766            )
1767        } else {
1768            (
1769                iota_transaction_checks::check_transaction_input(
1770                    epoch_store.protocol_config(),
1771                    reference_gas_price,
1772                    &transaction,
1773                    input_objects,
1774                    &receiving_objects,
1775                    &self.metrics.bytecode_verifier_metrics,
1776                    &self.config.verifier_signing_config,
1777                )?,
1778                None,
1779            )
1780        };
1781
1782        let protocol_config = epoch_store.protocol_config();
1783        let (kind, signer, _) = transaction.execution_parts();
1784
1785        let silent = true;
1786        let executor = iota_execution::executor(protocol_config, silent, None)
1787            .expect("Creating an executor should not fail here");
1788
1789        let expensive_checks = false;
1790        let (inner_temp_store, _, effects, _execution_error) = executor
1791            .execute_transaction_to_effects(
1792                self.get_backing_store().as_ref(),
1793                protocol_config,
1794                self.metrics.limits_metrics.clone(),
1795                expensive_checks,
1796                self.config.certificate_deny_config.certificate_deny_set(),
1797                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1798                epoch_store
1799                    .epoch_start_config()
1800                    .epoch_data()
1801                    .epoch_start_timestamp(),
1802                checked_input_objects,
1803                gas_object_refs,
1804                gas_status,
1805                kind,
1806                signer,
1807                transaction_digest,
1808                &mut None,
1809            );
1810        let tx_digest = *effects.transaction_digest();
1811
1812        let module_cache =
1813            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1814
1815        let mut layout_resolver =
1816            epoch_store
1817                .executor()
1818                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1819                    &inner_temp_store,
1820                    self.get_backing_package_store(),
1821                )));
1822        // Returning empty vector here because we recalculate changes in the rpc layer.
1823        let object_changes = Vec::new();
1824
1825        // Returning empty vector here because we recalculate changes in the rpc layer.
1826        let balance_changes = Vec::new();
1827
1828        let written_with_kind = effects
1829            .created()
1830            .into_iter()
1831            .map(|(oref, _)| (oref, WriteKind::Create))
1832            .chain(
1833                effects
1834                    .unwrapped()
1835                    .into_iter()
1836                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1837            )
1838            .chain(
1839                effects
1840                    .mutated()
1841                    .into_iter()
1842                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
1843            )
1844            .map(|(oref, kind)| {
1845                let obj = inner_temp_store.written.get(&oref.0).unwrap();
1846                // TODO: Avoid clones.
1847                (oref.0, (oref, obj.clone(), kind))
1848            })
1849            .collect();
1850
1851        Ok((
1852            DryRunTransactionBlockResponse {
1853                input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1854                    .map_err(|e| IotaError::TransactionSerialization {
1855                        error: format!(
1856                            "Failed to convert transaction to IotaTransactionBlockData: {e}",
1857                        ),
1858                    })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
1859                effects: effects.clone().try_into()?,
1860                events: IotaTransactionBlockEvents::try_from(
1861                    inner_temp_store.events.clone(),
1862                    tx_digest,
1863                    None,
1864                    layout_resolver.as_mut(),
1865                )?,
1866                object_changes,
1867                balance_changes,
1868            },
1869            written_with_kind,
1870            effects,
1871            mock_gas,
1872        ))
1873    }
1874
1875    pub fn simulate_transaction(
1876        &self,
1877        transaction: TransactionData,
1878    ) -> IotaResult<SimulateTransactionResult> {
1879        if transaction.kind().is_system_tx() {
1880            return Err(IotaError::UnsupportedFeature {
1881                error: "simulate does not support system transactions".to_string(),
1882            });
1883        }
1884
1885        let epoch_store = self.load_epoch_store_one_call_per_task();
1886        if !self.is_fullnode(&epoch_store) {
1887            return Err(IotaError::UnsupportedFeature {
1888                error: "simulate is only supported on fullnodes".to_string(),
1889            });
1890        }
1891
1892        self.simulate_transaction_impl(&epoch_store, transaction)
1893    }
1894
1895    fn simulate_transaction_impl(
1896        &self,
1897        epoch_store: &AuthorityPerEpochStore,
1898        transaction: TransactionData,
1899    ) -> IotaResult<SimulateTransactionResult> {
1900        // Cheap validity checks for a transaction, including input size limits.
1901        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1902
1903        let input_object_kinds = transaction.input_objects()?;
1904        let receiving_object_refs = transaction.receiving_objects();
1905
1906        iota_transaction_checks::deny::check_transaction_for_signing(
1907            &transaction,
1908            &[],
1909            &input_object_kinds,
1910            &receiving_object_refs,
1911            &self.config.transaction_deny_config,
1912            self.get_backing_package_store().as_ref(),
1913        )?;
1914
1915        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1916            // We don't want to cache this transaction since it's a dry run.
1917            None,
1918            &input_object_kinds,
1919            &receiving_object_refs,
1920            epoch_store.epoch(),
1921        )?;
1922
1923        // make a gas object if one was not provided
1924        let mut gas_object_refs = transaction.gas().to_vec();
1925        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1926            let sender = transaction.sender();
1927            // use a 1B iota coin
1928            const NANOS_TO_IOTA: u64 = 1_000_000_000;
1929            const DRY_RUN_IOTA: u64 = 1_000_000_000;
1930            let max_coin_value = NANOS_TO_IOTA * DRY_RUN_IOTA;
1931            let gas_object_id = ObjectID::MAX;
1932            let gas_object = Object::new_move(
1933                MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
1934                Owner::AddressOwner(sender),
1935                TransactionDigest::genesis_marker(),
1936            );
1937            let gas_object_ref = gas_object.compute_object_reference();
1938            gas_object_refs = vec![gas_object_ref];
1939            (
1940                iota_transaction_checks::check_transaction_input_with_given_gas(
1941                    epoch_store.protocol_config(),
1942                    epoch_store.reference_gas_price(),
1943                    &transaction,
1944                    input_objects,
1945                    receiving_objects,
1946                    gas_object,
1947                    &self.metrics.bytecode_verifier_metrics,
1948                    &self.config.verifier_signing_config,
1949                )?,
1950                Some(gas_object_id),
1951            )
1952        } else {
1953            (
1954                iota_transaction_checks::check_transaction_input(
1955                    epoch_store.protocol_config(),
1956                    epoch_store.reference_gas_price(),
1957                    &transaction,
1958                    input_objects,
1959                    &receiving_objects,
1960                    &self.metrics.bytecode_verifier_metrics,
1961                    &self.config.verifier_signing_config,
1962                )?,
1963                None,
1964            )
1965        };
1966
1967        let protocol_config = epoch_store.protocol_config();
1968        let (kind, signer, _) = transaction.execution_parts();
1969
1970        let silent = true;
1971        let executor = iota_execution::executor(protocol_config, silent, None)
1972            .expect("Creating an executor should not fail here");
1973
1974        let expensive_checks = false;
1975        let (inner_temp_store, _, effects, _execution_error) = executor
1976            .execute_transaction_to_effects(
1977                self.get_backing_store().as_ref(),
1978                protocol_config,
1979                self.metrics.limits_metrics.clone(),
1980                expensive_checks,
1981                self.config.certificate_deny_config.certificate_deny_set(),
1982                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1983                epoch_store
1984                    .epoch_start_config()
1985                    .epoch_data()
1986                    .epoch_start_timestamp(),
1987                checked_input_objects,
1988                gas_object_refs,
1989                gas_status,
1990                kind,
1991                signer,
1992                transaction.digest(),
1993                &mut None,
1994            );
1995
1996        Ok(SimulateTransactionResult {
1997            input_objects: inner_temp_store.input_objects,
1998            output_objects: inner_temp_store.written,
1999            events: effects.events_digest().map(|_| inner_temp_store.events),
2000            effects,
2001            mock_gas_id: mock_gas,
2002        })
2003    }
2004
2005    /// The object ID for gas can be any object ID, even for an uncreated object
2006    pub async fn dev_inspect_transaction_block(
2007        &self,
2008        sender: IotaAddress,
2009        transaction_kind: TransactionKind,
2010        gas_price: Option<u64>,
2011        gas_budget: Option<u64>,
2012        gas_sponsor: Option<IotaAddress>,
2013        gas_objects: Option<Vec<ObjectRef>>,
2014        show_raw_txn_data_and_effects: Option<bool>,
2015        skip_checks: Option<bool>,
2016    ) -> IotaResult<DevInspectResults> {
2017        let epoch_store = self.load_epoch_store_one_call_per_task();
2018
2019        if !self.is_fullnode(&epoch_store) {
2020            return Err(IotaError::UnsupportedFeature {
2021                error: "dev-inspect is only supported on fullnodes".to_string(),
2022            });
2023        }
2024
2025        if transaction_kind.is_system_tx() {
2026            return Err(IotaError::UnsupportedFeature {
2027                error: "system transactions are not supported".to_string(),
2028            });
2029        }
2030
2031        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2032        let skip_checks = skip_checks.unwrap_or(true);
2033        let reference_gas_price = epoch_store.reference_gas_price();
2034        let protocol_config = epoch_store.protocol_config();
2035        let max_tx_gas = protocol_config.max_tx_gas();
2036
2037        let price = gas_price.unwrap_or(reference_gas_price);
2038        let budget = gas_budget.unwrap_or(max_tx_gas);
2039        let owner = gas_sponsor.unwrap_or(sender);
2040        // Payment might be empty here, but it's fine we'll have to deal with it later
2041        // after reading all the input objects.
2042        let payment = gas_objects.unwrap_or_default();
2043        let transaction = TransactionData::V1(TransactionDataV1 {
2044            kind: transaction_kind.clone(),
2045            sender,
2046            gas_data: GasData {
2047                payment,
2048                owner,
2049                price,
2050                budget,
2051            },
2052            expiration: TransactionExpiration::None,
2053        });
2054
2055        let raw_txn_data = if show_raw_txn_data_and_effects {
2056            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2057                error: "Failed to serialize transaction during dev inspect".to_string(),
2058            })?
2059        } else {
2060            vec![]
2061        };
2062
2063        transaction.validity_check_no_gas_check(protocol_config)?;
2064
2065        let input_object_kinds = transaction.input_objects()?;
2066        let receiving_object_refs = transaction.receiving_objects();
2067
2068        iota_transaction_checks::deny::check_transaction_for_signing(
2069            &transaction,
2070            &[],
2071            &input_object_kinds,
2072            &receiving_object_refs,
2073            &self.config.transaction_deny_config,
2074            self.get_backing_package_store().as_ref(),
2075        )?;
2076
2077        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2078            // We don't want to cache this transaction since it's a dev inspect.
2079            None,
2080            &input_object_kinds,
2081            &receiving_object_refs,
2082            epoch_store.epoch(),
2083        )?;
2084
2085        // Create and use a dummy gas object if there is no gas object provided.
2086        let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2087            DEV_INSPECT_GAS_COIN_VALUE,
2088            transaction.gas_owner(),
2089        );
2090
2091        let gas_objects = if transaction.gas().is_empty() {
2092            let gas_object_ref = dummy_gas_object.compute_object_reference();
2093            vec![gas_object_ref]
2094        } else {
2095            transaction.gas().to_vec()
2096        };
2097
2098        let (gas_status, checked_input_objects) = if skip_checks {
2099            // If we are skipping checks, then we call the check_dev_inspect_input function
2100            // which will perform only lightweight checks on the transaction
2101            // input. And if the gas field is empty, that means we will
2102            // use the dummy gas object so we need to add it to the input objects vector.
2103            if transaction.gas().is_empty() {
2104                input_objects.push(ObjectReadResult::new(
2105                    InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2106                    dummy_gas_object.into(),
2107                ));
2108            }
2109            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2110                protocol_config,
2111                &transaction_kind,
2112                input_objects,
2113                receiving_objects,
2114            )?;
2115            let gas_status = IotaGasStatus::new(
2116                max_tx_gas,
2117                transaction.gas_price(),
2118                reference_gas_price,
2119                protocol_config,
2120            )?;
2121
2122            (gas_status, checked_input_objects)
2123        } else {
2124            // If we are not skipping checks, then we call the check_transaction_input
2125            // function and its dummy gas variant which will perform full
2126            // fledged checks just like a real transaction execution.
2127            if transaction.gas().is_empty() {
2128                iota_transaction_checks::check_transaction_input_with_given_gas(
2129                    epoch_store.protocol_config(),
2130                    reference_gas_price,
2131                    &transaction,
2132                    input_objects,
2133                    receiving_objects,
2134                    dummy_gas_object,
2135                    &self.metrics.bytecode_verifier_metrics,
2136                    &self.config.verifier_signing_config,
2137                )?
2138            } else {
2139                iota_transaction_checks::check_transaction_input(
2140                    epoch_store.protocol_config(),
2141                    reference_gas_price,
2142                    &transaction,
2143                    input_objects,
2144                    &receiving_objects,
2145                    &self.metrics.bytecode_verifier_metrics,
2146                    &self.config.verifier_signing_config,
2147                )?
2148            }
2149        };
2150
2151        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2152            .expect("Creating an executor should not fail here");
2153        let intent_msg = IntentMessage::new(
2154            Intent {
2155                version: IntentVersion::V0,
2156                scope: IntentScope::TransactionData,
2157                app_id: AppId::Iota,
2158            },
2159            transaction,
2160        );
2161        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2162        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2163            self.get_backing_store().as_ref(),
2164            protocol_config,
2165            self.metrics.limits_metrics.clone(),
2166            // expensive checks
2167            false,
2168            self.config.certificate_deny_config.certificate_deny_set(),
2169            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2170            epoch_store
2171                .epoch_start_config()
2172                .epoch_data()
2173                .epoch_start_timestamp(),
2174            checked_input_objects,
2175            gas_objects,
2176            gas_status,
2177            transaction_kind,
2178            sender,
2179            transaction_digest,
2180            skip_checks,
2181        );
2182
2183        let raw_effects = if show_raw_txn_data_and_effects {
2184            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2185                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2186            })?
2187        } else {
2188            vec![]
2189        };
2190
2191        let mut layout_resolver =
2192            epoch_store
2193                .executor()
2194                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2195                    &inner_temp_store,
2196                    self.get_backing_package_store(),
2197                )));
2198
2199        DevInspectResults::new(
2200            effects,
2201            inner_temp_store.events.clone(),
2202            execution_result,
2203            raw_txn_data,
2204            raw_effects,
2205            layout_resolver.as_mut(),
2206        )
2207    }
2208
2209    // Only used for testing because of how epoch store is loaded.
2210    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2211        let epoch_store = self.epoch_store_for_testing();
2212        Ok(epoch_store.reference_gas_price())
2213    }
2214
2215    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2216        self.get_transaction_cache_reader()
2217            .is_tx_already_executed(digest)
2218    }
2219
2220    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2221    #[instrument(level = "debug", skip_all, err)]
2222    async fn index_tx(
2223        &self,
2224        indexes: &IndexStore,
2225        digest: &TransactionDigest,
2226        // TODO: index_tx really just need the transaction data here.
2227        cert: &VerifiedExecutableTransaction,
2228        effects: &TransactionEffects,
2229        events: &TransactionEvents,
2230        timestamp_ms: u64,
2231        tx_coins: Option<TxCoins>,
2232        written: &WrittenObjects,
2233        inner_temporary_store: &InnerTemporaryStore,
2234    ) -> IotaResult<u64> {
2235        let changes = self
2236            .process_object_index(effects, written, inner_temporary_store)
2237            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2238
2239        indexes
2240            .index_tx(
2241                cert.data().intent_message().value.sender(),
2242                cert.data()
2243                    .intent_message()
2244                    .value
2245                    .input_objects()?
2246                    .iter()
2247                    .map(|o| o.object_id()),
2248                effects
2249                    .all_changed_objects()
2250                    .into_iter()
2251                    .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2252                cert.data()
2253                    .intent_message()
2254                    .value
2255                    .move_calls()
2256                    .into_iter()
2257                    .map(|(package, module, function)| {
2258                        (*package, module.to_owned(), function.to_owned())
2259                    }),
2260                events,
2261                changes,
2262                digest,
2263                timestamp_ms,
2264                tx_coins,
2265            )
2266            .await
2267    }
2268
2269    #[cfg(msim)]
2270    fn create_fail_state(
2271        &self,
2272        certificate: &VerifiedExecutableTransaction,
2273        epoch_store: &Arc<AuthorityPerEpochStore>,
2274        effects: &mut TransactionEffects,
2275    ) {
2276        use std::cell::RefCell;
2277        thread_local! {
2278            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2279        }
2280        if !certificate.data().intent_message().value.is_system_tx() {
2281            let committee = epoch_store.committee();
2282            let cur_stake = (**committee).weight(&self.name);
2283            if cur_stake > 0 {
2284                FAIL_STATE.with_borrow_mut(|fail_state| {
2285                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2286                    if fail_state.0 < committee.validity_threshold() {
2287                        fail_state.0 += cur_stake;
2288                        fail_state.1.insert(self.name);
2289                    }
2290
2291                    if fail_state.1.contains(&self.name) {
2292                        info!("cp_exec failing tx");
2293                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2294                    }
2295                });
2296            }
2297        }
2298    }
2299
2300    fn process_object_index(
2301        &self,
2302        effects: &TransactionEffects,
2303        written: &WrittenObjects,
2304        inner_temporary_store: &InnerTemporaryStore,
2305    ) -> IotaResult<ObjectIndexChanges> {
2306        let epoch_store = self.load_epoch_store_one_call_per_task();
2307        let mut layout_resolver =
2308            epoch_store
2309                .executor()
2310                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2311                    inner_temporary_store,
2312                    self.get_backing_package_store(),
2313                )));
2314
2315        let modified_at_version = effects
2316            .modified_at_versions()
2317            .into_iter()
2318            .collect::<HashMap<_, _>>();
2319
2320        let tx_digest = effects.transaction_digest();
2321        let mut deleted_owners = vec![];
2322        let mut deleted_dynamic_fields = vec![];
2323        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2324            let old_version = modified_at_version.get(&id).unwrap();
2325            // When we process the index, the latest object hasn't been written yet so
2326            // the old object must be present.
2327            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2328                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2329            ) {
2330                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2331                Owner::ObjectOwner(object_id) => {
2332                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2333                }
2334                _ => {}
2335            }
2336        }
2337
2338        let mut new_owners = vec![];
2339        let mut new_dynamic_fields = vec![];
2340
2341        for (oref, owner, kind) in effects.all_changed_objects() {
2342            let id = &oref.0;
2343            // For mutated objects, retrieve old owner and delete old index if there is a
2344            // owner change.
2345            if let WriteKind::Mutate = kind {
2346                let Some(old_version) = modified_at_version.get(id) else {
2347                    panic!(
2348                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2349                    );
2350                };
2351                // When we process the index, the latest object hasn't been written yet so
2352                // the old object must be present.
2353                let Some(old_object) = self
2354                    .get_object_store()
2355                    .get_object_by_key(id, *old_version)?
2356                else {
2357                    panic!(
2358                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2359                    );
2360                };
2361                if old_object.owner != owner {
2362                    match old_object.owner {
2363                        Owner::AddressOwner(addr) => {
2364                            deleted_owners.push((addr, *id));
2365                        }
2366                        Owner::ObjectOwner(object_id) => {
2367                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2368                        }
2369                        _ => {}
2370                    }
2371                }
2372            }
2373
2374            match owner {
2375                Owner::AddressOwner(addr) => {
2376                    // TODO: We can remove the object fetching after we added ObjectType to
2377                    // TransactionEffects
2378                    let new_object = written.get(id).unwrap_or_else(
2379                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2380                    );
2381                    assert_eq!(
2382                        new_object.version(),
2383                        oref.1,
2384                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2385                        tx_digest,
2386                        id,
2387                        new_object.version(),
2388                        oref.1
2389                    );
2390
2391                    let type_ = new_object
2392                        .type_()
2393                        .map(|type_| ObjectType::Struct(type_.clone()))
2394                        .unwrap_or(ObjectType::Package);
2395
2396                    new_owners.push((
2397                        (addr, *id),
2398                        ObjectInfo {
2399                            object_id: *id,
2400                            version: oref.1,
2401                            digest: oref.2,
2402                            type_,
2403                            owner,
2404                            previous_transaction: *effects.transaction_digest(),
2405                        },
2406                    ));
2407                }
2408                Owner::ObjectOwner(owner) => {
2409                    let new_object = written.get(id).unwrap_or_else(
2410                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2411                    );
2412                    assert_eq!(
2413                        new_object.version(),
2414                        oref.1,
2415                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2416                        tx_digest,
2417                        id,
2418                        new_object.version(),
2419                        oref.1
2420                    );
2421
2422                    let Some(df_info) = self
2423                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2424                        .unwrap_or_else(|e| {
2425                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2426                            None
2427                        }
2428                    )
2429                        else {
2430                            // Skip indexing for non dynamic field objects.
2431                            continue;
2432                        };
2433                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2434                }
2435                _ => {}
2436            }
2437        }
2438
2439        Ok(ObjectIndexChanges {
2440            deleted_owners,
2441            deleted_dynamic_fields,
2442            new_owners,
2443            new_dynamic_fields,
2444        })
2445    }
2446
2447    fn try_create_dynamic_field_info(
2448        &self,
2449        o: &Object,
2450        written: &WrittenObjects,
2451        resolver: &mut dyn LayoutResolver,
2452        get_latest_object_version: bool,
2453    ) -> IotaResult<Option<DynamicFieldInfo>> {
2454        // Skip if not a move object
2455        let Some(move_object) = o.data.try_as_move().cloned() else {
2456            return Ok(None);
2457        };
2458
2459        // We only index dynamic field objects
2460        if !move_object.type_().is_dynamic_field() {
2461            return Ok(None);
2462        }
2463
2464        let layout = resolver
2465            .get_annotated_layout(&move_object.type_().clone().into())?
2466            .into_layout();
2467
2468        let field =
2469            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2470                IotaError::ObjectDeserialization {
2471                    error: e.to_string(),
2472                }
2473            })?;
2474
2475        let type_ = field.kind;
2476        let name_type: TypeTag = field.name_layout.into();
2477        let bcs_name = field.name_bytes.to_owned();
2478
2479        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2480            .map_err(|e| {
2481                warn!("{e}");
2482                IotaError::ObjectDeserialization {
2483                    error: e.to_string(),
2484                }
2485            })?;
2486
2487        let name = DynamicFieldName {
2488            type_: name_type,
2489            value: IotaMoveValue::from(name_value).to_json_value(),
2490        };
2491
2492        let value_metadata = field.value_metadata().map_err(|e| {
2493            warn!("{e}");
2494            IotaError::ObjectDeserialization {
2495                error: e.to_string(),
2496            }
2497        })?;
2498
2499        Ok(Some(match value_metadata {
2500            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2501                name,
2502                bcs_name,
2503                type_,
2504                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2505                object_id: o.id(),
2506                version: o.version(),
2507                digest: o.digest(),
2508            },
2509
2510            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2511                // Find the actual object from storage using the object id obtained from the
2512                // wrapper.
2513
2514                // Try to find the object in the written objects first.
2515                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2516                    (
2517                        object.version(),
2518                        object.digest(),
2519                        object.data.type_().unwrap().clone(),
2520                    )
2521                } else {
2522                    // If not found, try to find it in the database.
2523                    let object = if get_latest_object_version {
2524                        // Loading genesis object could meet a condition that the version of the
2525                        // genesis object is behind the one in the snapshot.
2526                        // In this case, the object can not be found with get_object_by_key, we need
2527                        // to use get_object instead. Since get_object is a heavier operation, we
2528                        // only allow to use it for genesis.
2529                        // reference: https://github.com/iotaledger/iota/issues/7267
2530                        self.get_object_store()
2531                            .get_object(&object_id)?
2532                            .ok_or_else(|| UserInputError::ObjectNotFound {
2533                                object_id,
2534                                version: Some(o.version()),
2535                            })?
2536                    } else {
2537                        // Non-genesis object should be in the database with the given version.
2538                        self.get_object_store()
2539                            .get_object_by_key(&object_id, o.version())?
2540                            .ok_or_else(|| UserInputError::ObjectNotFound {
2541                                object_id,
2542                                version: Some(o.version()),
2543                            })?
2544                    };
2545
2546                    (
2547                        object.version(),
2548                        object.digest(),
2549                        object.data.type_().unwrap().clone(),
2550                    )
2551                };
2552
2553                DynamicFieldInfo {
2554                    name,
2555                    bcs_name,
2556                    type_,
2557                    object_type: object_type.to_string(),
2558                    object_id,
2559                    version,
2560                    digest,
2561                }
2562            }
2563        }))
2564    }
2565
2566    #[instrument(level = "trace", skip_all, err)]
2567    async fn post_process_one_tx(
2568        &self,
2569        certificate: &VerifiedExecutableTransaction,
2570        effects: &TransactionEffects,
2571        inner_temporary_store: &InnerTemporaryStore,
2572        epoch_store: &Arc<AuthorityPerEpochStore>,
2573    ) -> IotaResult {
2574        if self.indexes.is_none() {
2575            return Ok(());
2576        }
2577
2578        let tx_digest = certificate.digest();
2579        let timestamp_ms = Self::unixtime_now_ms();
2580        let events = &inner_temporary_store.events;
2581        let written = &inner_temporary_store.written;
2582        let tx_coins =
2583            self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2584
2585        // Index tx
2586        if let Some(indexes) = &self.indexes {
2587            let _ = self
2588                .index_tx(
2589                    indexes.as_ref(),
2590                    tx_digest,
2591                    certificate,
2592                    effects,
2593                    events,
2594                    timestamp_ms,
2595                    tx_coins,
2596                    written,
2597                    inner_temporary_store,
2598                )
2599                .await
2600                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2601                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2602                .expect("Indexing tx should not fail");
2603
2604            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2605            let events = self.make_transaction_block_events(
2606                events.clone(),
2607                *tx_digest,
2608                timestamp_ms,
2609                epoch_store,
2610                inner_temporary_store,
2611            )?;
2612            // Emit events
2613            self.subscription_handler
2614                .process_tx(certificate.data().transaction_data(), &effects, &events)
2615                .tap_ok(|_| {
2616                    self.metrics
2617                        .post_processing_total_tx_had_event_processed
2618                        .inc()
2619                })
2620                .tap_err(|e| {
2621                    warn!(
2622                        ?tx_digest,
2623                        "Post processing - Couldn't process events for tx: {}", e
2624                    )
2625                })?;
2626
2627            self.metrics
2628                .post_processing_total_events_emitted
2629                .inc_by(events.data.len() as u64);
2630        };
2631        Ok(())
2632    }
2633
2634    fn make_transaction_block_events(
2635        &self,
2636        transaction_events: TransactionEvents,
2637        digest: TransactionDigest,
2638        timestamp_ms: u64,
2639        epoch_store: &Arc<AuthorityPerEpochStore>,
2640        inner_temporary_store: &InnerTemporaryStore,
2641    ) -> IotaResult<IotaTransactionBlockEvents> {
2642        let mut layout_resolver =
2643            epoch_store
2644                .executor()
2645                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2646                    inner_temporary_store,
2647                    self.get_backing_package_store(),
2648                )));
2649        IotaTransactionBlockEvents::try_from(
2650            transaction_events,
2651            digest,
2652            Some(timestamp_ms),
2653            layout_resolver.as_mut(),
2654        )
2655    }
2656
2657    pub fn unixtime_now_ms() -> u64 {
2658        let ts_ms = Utc::now().timestamp_millis();
2659        u64::try_from(ts_ms).expect("Travelling in time machine")
2660    }
2661
2662    #[instrument(level = "trace", skip_all)]
2663    pub async fn handle_transaction_info_request(
2664        &self,
2665        request: TransactionInfoRequest,
2666    ) -> IotaResult<TransactionInfoResponse> {
2667        let epoch_store = self.load_epoch_store_one_call_per_task();
2668        let (transaction, status) = self
2669            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2670            .ok_or(IotaError::TransactionNotFound {
2671                digest: request.transaction_digest,
2672            })?;
2673        Ok(TransactionInfoResponse {
2674            transaction,
2675            status,
2676        })
2677    }
2678
2679    #[instrument(level = "trace", skip_all)]
2680    pub async fn handle_object_info_request(
2681        &self,
2682        request: ObjectInfoRequest,
2683    ) -> IotaResult<ObjectInfoResponse> {
2684        let epoch_store = self.load_epoch_store_one_call_per_task();
2685
2686        let requested_object_seq = match request.request_kind {
2687            ObjectInfoRequestKind::LatestObjectInfo => {
2688                let (_, seq, _) = self
2689                    .get_object_or_tombstone(request.object_id)
2690                    .await?
2691                    .ok_or_else(|| {
2692                        IotaError::from(UserInputError::ObjectNotFound {
2693                            object_id: request.object_id,
2694                            version: None,
2695                        })
2696                    })?;
2697                seq
2698            }
2699            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2700        };
2701
2702        let object = self
2703            .get_object_store()
2704            .get_object_by_key(&request.object_id, requested_object_seq)?
2705            .ok_or_else(|| {
2706                IotaError::from(UserInputError::ObjectNotFound {
2707                    object_id: request.object_id,
2708                    version: Some(requested_object_seq),
2709                })
2710            })?;
2711
2712        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2713            (request.generate_layout, object.data.try_as_move())
2714        {
2715            Some(into_struct_layout(
2716                self.load_epoch_store_one_call_per_task()
2717                    .executor()
2718                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2719                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2720            )?)
2721        } else {
2722            None
2723        };
2724
2725        let lock = if !object.is_address_owned() {
2726            // Only address owned objects have locks.
2727            None
2728        } else {
2729            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2730                .await?
2731                .map(|s| s.into_inner())
2732        };
2733
2734        Ok(ObjectInfoResponse {
2735            object,
2736            layout,
2737            lock_for_debugging: lock,
2738        })
2739    }
2740
2741    #[instrument(level = "trace", skip_all)]
2742    pub fn handle_checkpoint_request(
2743        &self,
2744        request: &CheckpointRequest,
2745    ) -> IotaResult<CheckpointResponse> {
2746        let summary = if request.certified {
2747            let summary = match request.sequence_number {
2748                Some(seq) => self
2749                    .checkpoint_store
2750                    .get_checkpoint_by_sequence_number(seq)?,
2751                None => self.checkpoint_store.get_latest_certified_checkpoint(),
2752            }
2753            .map(|v| v.into_inner());
2754            summary.map(CheckpointSummaryResponse::Certified)
2755        } else {
2756            let summary = match request.sequence_number {
2757                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2758                None => self
2759                    .checkpoint_store
2760                    .get_latest_locally_computed_checkpoint(),
2761            };
2762            summary.map(CheckpointSummaryResponse::Pending)
2763        };
2764        let contents = match &summary {
2765            Some(s) => self
2766                .checkpoint_store
2767                .get_checkpoint_contents(&s.content_digest())?,
2768            None => None,
2769        };
2770        Ok(CheckpointResponse {
2771            checkpoint: summary,
2772            contents,
2773        })
2774    }
2775
2776    fn check_protocol_version(
2777        supported_protocol_versions: SupportedProtocolVersions,
2778        current_version: ProtocolVersion,
2779    ) {
2780        info!("current protocol version is now {:?}", current_version);
2781        info!("supported versions are: {:?}", supported_protocol_versions);
2782        if !supported_protocol_versions.is_version_supported(current_version) {
2783            let msg = format!(
2784                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2785            );
2786
2787            error!("{}", msg);
2788            eprintln!("{msg}");
2789
2790            #[cfg(not(msim))]
2791            std::process::exit(1);
2792
2793            #[cfg(msim)]
2794            iota_simulator::task::shutdown_current_node();
2795        }
2796    }
2797
2798    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
2799    pub async fn new(
2800        name: AuthorityName,
2801        secret: StableSyncAuthoritySigner,
2802        supported_protocol_versions: SupportedProtocolVersions,
2803        store: Arc<AuthorityStore>,
2804        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2805        epoch_store: Arc<AuthorityPerEpochStore>,
2806        committee_store: Arc<CommitteeStore>,
2807        indexes: Option<Arc<IndexStore>>,
2808        rest_index: Option<Arc<RestIndexStore>>,
2809        checkpoint_store: Arc<CheckpointStore>,
2810        prometheus_registry: &Registry,
2811        genesis_objects: &[Object],
2812        db_checkpoint_config: &DBCheckpointConfig,
2813        config: NodeConfig,
2814        indirect_objects_threshold: usize,
2815        archive_readers: ArchiveReaderBalancer,
2816        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2817    ) -> Arc<Self> {
2818        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2819
2820        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2821        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2822        let transaction_manager = Arc::new(TransactionManager::new(
2823            execution_cache_trait_pointers.object_cache_reader.clone(),
2824            execution_cache_trait_pointers
2825                .transaction_cache_reader
2826                .clone(),
2827            &epoch_store,
2828            tx_ready_certificates,
2829            metrics.clone(),
2830        ));
2831        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2832
2833        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2834            epoch_store.get_parent_path(),
2835            &config.authority_store_pruning_config,
2836        );
2837        let _pruner = AuthorityStorePruner::new(
2838            store.perpetual_tables.clone(),
2839            checkpoint_store.clone(),
2840            rest_index.clone(),
2841            store.objects_lock_table.clone(),
2842            config.authority_store_pruning_config.clone(),
2843            epoch_store.committee().authority_exists(&name),
2844            epoch_store.epoch_start_state().epoch_duration_ms(),
2845            prometheus_registry,
2846            indirect_objects_threshold,
2847            archive_readers,
2848        );
2849        let input_loader =
2850            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2851        let epoch = epoch_store.epoch();
2852        let state = Arc::new(AuthorityState {
2853            name,
2854            secret,
2855            execution_lock: RwLock::new(epoch),
2856            epoch_store: ArcSwap::new(epoch_store.clone()),
2857            input_loader,
2858            execution_cache_trait_pointers,
2859            indexes,
2860            rest_index,
2861            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2862            checkpoint_store,
2863            committee_store,
2864            transaction_manager,
2865            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2866            metrics,
2867            _pruner,
2868            _authority_per_epoch_pruner,
2869            db_checkpoint_config: db_checkpoint_config.clone(),
2870            config,
2871            overload_info: AuthorityOverloadInfo::default(),
2872            validator_tx_finalizer,
2873        });
2874
2875        // Start a task to execute ready certificates.
2876        let authority_state = Arc::downgrade(&state);
2877        spawn_monitored_task!(execution_process(
2878            authority_state,
2879            rx_ready_certificates,
2880            rx_execution_shutdown,
2881        ));
2882
2883        // TODO: This doesn't belong to the constructor of AuthorityState.
2884        state
2885            .create_owner_index_if_empty(genesis_objects, &epoch_store)
2886            .expect("Error indexing genesis objects.");
2887
2888        state
2889    }
2890
2891    // TODO: Consolidate our traits to reduce the number of methods here.
2892    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2893        &self.execution_cache_trait_pointers.object_cache_reader
2894    }
2895
2896    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2897        &self.execution_cache_trait_pointers.transaction_cache_reader
2898    }
2899
2900    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2901        &self.execution_cache_trait_pointers.cache_writer
2902    }
2903
2904    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2905        &self.execution_cache_trait_pointers.backing_store
2906    }
2907
2908    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2909        &self.execution_cache_trait_pointers.backing_package_store
2910    }
2911
2912    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2913        &self.execution_cache_trait_pointers.object_store
2914    }
2915
2916    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2917        &self.execution_cache_trait_pointers.reconfig_api
2918    }
2919
2920    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2921        &self.execution_cache_trait_pointers.accumulator_store
2922    }
2923
2924    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2925        &self.execution_cache_trait_pointers.checkpoint_cache
2926    }
2927
2928    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2929        &self.execution_cache_trait_pointers.state_sync_store
2930    }
2931
2932    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2933        &self.execution_cache_trait_pointers.cache_commit
2934    }
2935
2936    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2937        self.execution_cache_trait_pointers
2938            .testing_api
2939            .database_for_testing()
2940    }
2941
2942    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2943        &self,
2944        config: NodeConfig,
2945        metrics: Arc<AuthorityStorePruningMetrics>,
2946    ) -> anyhow::Result<()> {
2947        let archive_readers =
2948            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
2949        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
2950            &self.database_for_testing().perpetual_tables,
2951            &self.checkpoint_store,
2952            self.rest_index.as_deref(),
2953            &self.database_for_testing().objects_lock_table,
2954            config.authority_store_pruning_config,
2955            metrics,
2956            config.indirect_objects_threshold,
2957            archive_readers,
2958            EPOCH_DURATION_MS_FOR_TESTING,
2959        )
2960        .await
2961    }
2962
2963    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
2964        &self.transaction_manager
2965    }
2966
2967    /// Adds certificates to transaction manager for ordered execution.
2968    pub fn enqueue_certificates_for_execution(
2969        &self,
2970        certs: Vec<VerifiedCertificate>,
2971        epoch_store: &Arc<AuthorityPerEpochStore>,
2972    ) {
2973        self.transaction_manager
2974            .enqueue_certificates(certs, epoch_store)
2975    }
2976
2977    pub fn enqueue_with_expected_effects_digest(
2978        &self,
2979        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
2980        epoch_store: &AuthorityPerEpochStore,
2981    ) {
2982        self.transaction_manager
2983            .enqueue_with_expected_effects_digest(certs, epoch_store)
2984    }
2985
2986    fn create_owner_index_if_empty(
2987        &self,
2988        genesis_objects: &[Object],
2989        epoch_store: &Arc<AuthorityPerEpochStore>,
2990    ) -> IotaResult {
2991        let Some(index_store) = &self.indexes else {
2992            return Ok(());
2993        };
2994        if !index_store.is_empty() {
2995            return Ok(());
2996        }
2997
2998        let mut new_owners = vec![];
2999        let mut new_dynamic_fields = vec![];
3000        let mut layout_resolver = epoch_store
3001            .executor()
3002            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3003        for o in genesis_objects.iter() {
3004            match o.owner {
3005                Owner::AddressOwner(addr) => new_owners.push((
3006                    (addr, o.id()),
3007                    ObjectInfo::new(&o.compute_object_reference(), o),
3008                )),
3009                Owner::ObjectOwner(object_id) => {
3010                    let id = o.id();
3011                    let Some(info) = self.try_create_dynamic_field_info(
3012                        o,
3013                        &BTreeMap::new(),
3014                        layout_resolver.as_mut(),
3015                        true,
3016                    )?
3017                    else {
3018                        continue;
3019                    };
3020                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3021                }
3022                _ => {}
3023            }
3024        }
3025
3026        index_store.insert_genesis_objects(ObjectIndexChanges {
3027            deleted_owners: vec![],
3028            deleted_dynamic_fields: vec![],
3029            new_owners,
3030            new_dynamic_fields,
3031        })
3032    }
3033
3034    /// Attempts to acquire execution lock for an executable transaction.
3035    /// Returns the lock if the transaction is matching current executed epoch
3036    /// Returns None otherwise
3037    pub async fn execution_lock_for_executable_transaction(
3038        &self,
3039        transaction: &VerifiedExecutableTransaction,
3040    ) -> IotaResult<ExecutionLockReadGuard> {
3041        let lock = self.execution_lock.read().await;
3042        if *lock == transaction.auth_sig().epoch() {
3043            Ok(lock)
3044        } else {
3045            Err(IotaError::WrongEpoch {
3046                expected_epoch: *lock,
3047                actual_epoch: transaction.auth_sig().epoch(),
3048            })
3049        }
3050    }
3051
3052    /// Acquires the execution lock for the duration of a transaction signing
3053    /// request. This prevents reconfiguration from starting until we are
3054    /// finished handling the signing request. Otherwise, in-memory lock
3055    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3056    /// while we are attempting to acquire locks for the transaction.
3057    pub async fn execution_lock_for_signing(&self) -> ExecutionLockReadGuard {
3058        self.execution_lock.read().await
3059    }
3060
3061    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard {
3062        self.execution_lock.write().await
3063    }
3064
3065    #[instrument(level = "error", skip_all)]
3066    pub async fn reconfigure(
3067        &self,
3068        cur_epoch_store: &AuthorityPerEpochStore,
3069        supported_protocol_versions: SupportedProtocolVersions,
3070        new_committee: Committee,
3071        epoch_start_configuration: EpochStartConfiguration,
3072        accumulator: Arc<StateAccumulator>,
3073        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3074        epoch_supply_change: i64,
3075    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3076        Self::check_protocol_version(
3077            supported_protocol_versions,
3078            epoch_start_configuration
3079                .epoch_start_state()
3080                .protocol_version(),
3081        );
3082        self.metrics.reset_on_reconfigure();
3083        self.committee_store.insert_new_committee(&new_committee)?;
3084
3085        // Wait until no transactions are being executed.
3086        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3087
3088        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3089        cur_epoch_store.epoch_terminated().await;
3090
3091        // Safe to reconfigure now. No transactions are being executed,
3092        // and no epoch-specific tasks are running.
3093
3094        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3095        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3096        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3097            .await?;
3098        self.get_reconfig_api()
3099            .clear_state_end_of_epoch(&execution_lock);
3100        self.check_system_consistency(
3101            cur_epoch_store,
3102            accumulator,
3103            expensive_safety_check_config,
3104            epoch_supply_change,
3105        )?;
3106        self.get_reconfig_api()
3107            .set_epoch_start_configuration(&epoch_start_configuration)?;
3108        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3109            if self
3110                .db_checkpoint_config
3111                .perform_db_checkpoints_at_epoch_end
3112            {
3113                let checkpoint_indexes = self
3114                    .db_checkpoint_config
3115                    .perform_index_db_checkpoints_at_epoch_end
3116                    .unwrap_or(false);
3117                let current_epoch = cur_epoch_store.epoch();
3118                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3119                self.checkpoint_all_dbs(
3120                    &epoch_checkpoint_path,
3121                    cur_epoch_store,
3122                    checkpoint_indexes,
3123                )?;
3124            }
3125        }
3126
3127        self.get_reconfig_api()
3128            .reconfigure_cache(&epoch_start_configuration)
3129            .await;
3130
3131        let new_epoch = new_committee.epoch;
3132        let new_epoch_store = self
3133            .reopen_epoch_db(
3134                cur_epoch_store,
3135                new_committee,
3136                epoch_start_configuration,
3137                expensive_safety_check_config,
3138            )
3139            .await?;
3140        assert_eq!(new_epoch_store.epoch(), new_epoch);
3141        self.transaction_manager.reconfigure(new_epoch);
3142        *execution_lock = new_epoch;
3143        // drop execution_lock after epoch store was updated
3144        // see also assert in AuthorityState::process_certificate
3145        // on the epoch store and execution lock epoch match
3146        Ok(new_epoch_store)
3147    }
3148
3149    /// Advance the epoch store to the next epoch for testing only.
3150    /// This only manually sets all the places where we have the epoch number.
3151    /// It doesn't properly reconfigure the node, hence should be only used for
3152    /// testing.
3153    pub async fn reconfigure_for_testing(&self) {
3154        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3155        let epoch_store = self.epoch_store_for_testing().clone();
3156        let protocol_config = epoch_store.protocol_config().clone();
3157        // The current protocol config used in the epoch store may have been overridden
3158        // and diverged from the protocol config definitions. That override may
3159        // have now been dropped when the initial guard was dropped. We reapply
3160        // the override before creating the new epoch store, to make sure that
3161        // the new epoch store has the same protocol config as the current one.
3162        // Since this is for testing only, we mostly like to keep the protocol config
3163        // the same across epochs.
3164        let _guard =
3165            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3166        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3167            self.get_backing_package_store().clone(),
3168            self.get_object_store().clone(),
3169            &self.config.expensive_safety_check_config,
3170        );
3171        let new_epoch = new_epoch_store.epoch();
3172        self.transaction_manager.reconfigure(new_epoch);
3173        self.epoch_store.store(new_epoch_store);
3174        epoch_store.epoch_terminated().await;
3175        *execution_lock = new_epoch;
3176    }
3177
3178    #[instrument(level = "error", skip_all)]
3179    fn check_system_consistency(
3180        &self,
3181        cur_epoch_store: &AuthorityPerEpochStore,
3182        accumulator: Arc<StateAccumulator>,
3183        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3184        epoch_supply_change: i64,
3185    ) -> IotaResult<()> {
3186        info!(
3187            "Performing iota conservation consistency check for epoch {}",
3188            cur_epoch_store.epoch()
3189        );
3190
3191        if cfg!(debug_assertions) {
3192            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3193        }
3194
3195        self.get_reconfig_api()
3196            .expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3197
3198        // check for root state hash consistency with live object set
3199        if expensive_safety_check_config.enable_state_consistency_check() {
3200            info!(
3201                "Performing state consistency check for epoch {}",
3202                cur_epoch_store.epoch()
3203            );
3204            self.expensive_check_is_consistent_state(
3205                accumulator,
3206                cur_epoch_store,
3207                cfg!(debug_assertions), // panic in debug mode only
3208            );
3209        }
3210
3211        if expensive_safety_check_config.enable_secondary_index_checks() {
3212            if let Some(indexes) = self.indexes.clone() {
3213                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3214                    .expect("secondary indexes are inconsistent");
3215            }
3216        }
3217
3218        Ok(())
3219    }
3220
3221    fn expensive_check_is_consistent_state(
3222        &self,
3223        accumulator: Arc<StateAccumulator>,
3224        cur_epoch_store: &AuthorityPerEpochStore,
3225        panic: bool,
3226    ) {
3227        let live_object_set_hash = accumulator.digest_live_object_set();
3228
3229        let root_state_hash: ECMHLiveObjectSetDigest = self
3230            .get_accumulator_store()
3231            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3232            .expect("Retrieving root state hash cannot fail")
3233            .expect("Root state hash for epoch must exist")
3234            .1
3235            .digest()
3236            .into();
3237
3238        let is_inconsistent = root_state_hash != live_object_set_hash;
3239        if is_inconsistent {
3240            if panic {
3241                panic!(
3242                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3243                );
3244            } else {
3245                error!(
3246                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3247                    root_state_hash, live_object_set_hash
3248                );
3249            }
3250        } else {
3251            info!("State consistency check passed");
3252        }
3253
3254        if !panic {
3255            accumulator.set_inconsistent_state(is_inconsistent);
3256        }
3257    }
3258
3259    pub fn current_epoch_for_testing(&self) -> EpochId {
3260        self.epoch_store_for_testing().epoch()
3261    }
3262
3263    #[instrument(level = "error", skip_all)]
3264    pub fn checkpoint_all_dbs(
3265        &self,
3266        checkpoint_path: &Path,
3267        cur_epoch_store: &AuthorityPerEpochStore,
3268        checkpoint_indexes: bool,
3269    ) -> IotaResult {
3270        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3271        let current_epoch = cur_epoch_store.epoch();
3272
3273        if checkpoint_path.exists() {
3274            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3275            return Ok(());
3276        }
3277
3278        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3279        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3280
3281        if checkpoint_path_tmp.exists() {
3282            fs::remove_dir_all(&checkpoint_path_tmp)
3283                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3284        }
3285
3286        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3287        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3288
3289        // NOTE: Do not change the order of invoking these checkpoint calls
3290        // We want to snapshot checkpoint db first to not race with state sync
3291        self.checkpoint_store
3292            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3293
3294        self.get_reconfig_api()
3295            .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3296
3297        self.committee_store
3298            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3299
3300        if checkpoint_indexes {
3301            if let Some(indexes) = self.indexes.as_ref() {
3302                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3303            }
3304        }
3305
3306        fs::rename(checkpoint_path_tmp, checkpoint_path)
3307            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3308        Ok(())
3309    }
3310
3311    /// Load the current epoch store. This can change during reconfiguration. To
3312    /// ensure that we never end up accessing different epoch stores in a
3313    /// single task, we need to make sure that this is called once per task.
3314    /// Each call needs to be carefully audited to ensure it is
3315    /// the case. This also means we should minimize the number of call-sites.
3316    /// Only call it when there is no way to obtain it from somewhere else.
3317    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3318        self.epoch_store.load()
3319    }
3320
3321    // Load the epoch store, should be used in tests only.
3322    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3323        self.load_epoch_store_one_call_per_task()
3324    }
3325
3326    pub fn clone_committee_for_testing(&self) -> Committee {
3327        Committee::clone(self.epoch_store_for_testing().committee())
3328    }
3329
3330    #[instrument(level = "trace", skip_all)]
3331    pub async fn get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3332        self.get_object_store()
3333            .get_object(object_id)
3334            .map_err(Into::into)
3335    }
3336
3337    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3338        Ok(self
3339            .get_object(&IOTA_SYSTEM_ADDRESS.into())
3340            .await?
3341            .expect("framework object should always exist")
3342            .compute_object_reference())
3343    }
3344
3345    // This function is only used for testing.
3346    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3347        self.get_object_cache_reader()
3348            .get_iota_system_state_object_unsafe()
3349    }
3350
3351    #[instrument(level = "trace", skip_all)]
3352    pub fn get_checkpoint_by_sequence_number(
3353        &self,
3354        sequence_number: CheckpointSequenceNumber,
3355    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3356        Ok(self
3357            .checkpoint_store
3358            .get_checkpoint_by_sequence_number(sequence_number)?)
3359    }
3360
3361    #[instrument(level = "trace", skip_all)]
3362    pub fn get_transaction_checkpoint_for_tests(
3363        &self,
3364        digest: &TransactionDigest,
3365        epoch_store: &AuthorityPerEpochStore,
3366    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3367        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3368        let Some(checkpoint) = checkpoint else {
3369            return Ok(None);
3370        };
3371        let checkpoint = self
3372            .checkpoint_store
3373            .get_checkpoint_by_sequence_number(checkpoint)?;
3374        Ok(checkpoint)
3375    }
3376
3377    #[instrument(level = "trace", skip_all)]
3378    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3379        Ok(
3380            match self
3381                .get_object_cache_reader()
3382                .get_latest_object_or_tombstone(*object_id)?
3383            {
3384                Some((_, ObjectOrTombstone::Object(object))) => {
3385                    let layout = self.get_object_layout(&object)?;
3386                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3387                }
3388                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3389                None => ObjectRead::NotExists(*object_id),
3390            },
3391        )
3392    }
3393
3394    /// Chain Identifier is the digest of the genesis checkpoint.
3395    pub fn get_chain_identifier(&self) -> Option<ChainIdentifier> {
3396        if let Some(digest) = CHAIN_IDENTIFIER.get() {
3397            return Some(*digest);
3398        }
3399
3400        let checkpoint = self
3401            .get_checkpoint_by_sequence_number(0)
3402            .tap_err(|e| error!("Failed to get genesis checkpoint: {:?}", e))
3403            .ok()?
3404            .tap_none(|| error!("Genesis checkpoint is missing from DB"))?;
3405        // It's ok if the value is already set due to data races.
3406        let _ = CHAIN_IDENTIFIER.set(ChainIdentifier::from(*checkpoint.digest()));
3407        Some(ChainIdentifier::from(*checkpoint.digest()))
3408    }
3409
3410    #[instrument(level = "trace", skip_all)]
3411    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3412    where
3413        T: DeserializeOwned,
3414    {
3415        let o = self.get_object_read(object_id)?.into_object()?;
3416        if let Some(move_object) = o.data.try_as_move() {
3417            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3418                IotaError::ObjectDeserialization {
3419                    error: format!("{e}"),
3420                }
3421            })?)
3422        } else {
3423            Err(IotaError::ObjectDeserialization {
3424                error: format!("Provided object : [{object_id}] is not a Move object."),
3425            })
3426        }
3427    }
3428
3429    /// This function aims to serve rpc reads on past objects and
3430    /// we don't expect it to be called for other purposes.
3431    /// Depending on the object pruning policies that will be enforced in the
3432    /// future there is no software-level guarantee/SLA to retrieve an object
3433    /// with an old version even if it exists/existed.
3434    #[instrument(level = "trace", skip_all)]
3435    pub fn get_past_object_read(
3436        &self,
3437        object_id: &ObjectID,
3438        version: SequenceNumber,
3439    ) -> IotaResult<PastObjectRead> {
3440        // Firstly we see if the object ever existed by getting its latest data
3441        let Some(obj_ref) = self
3442            .get_object_cache_reader()
3443            .get_latest_object_ref_or_tombstone(*object_id)?
3444        else {
3445            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3446        };
3447
3448        if version > obj_ref.1 {
3449            return Ok(PastObjectRead::VersionTooHigh {
3450                object_id: *object_id,
3451                asked_version: version,
3452                latest_version: obj_ref.1,
3453            });
3454        }
3455
3456        if version < obj_ref.1 {
3457            // Read past objects
3458            return Ok(match self.read_object_at_version(object_id, version)? {
3459                Some((object, layout)) => {
3460                    let obj_ref = object.compute_object_reference();
3461                    PastObjectRead::VersionFound(obj_ref, object, layout)
3462                }
3463
3464                None => PastObjectRead::VersionNotFound(*object_id, version),
3465            });
3466        }
3467
3468        if !obj_ref.2.is_alive() {
3469            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3470        }
3471
3472        match self.read_object_at_version(object_id, obj_ref.1)? {
3473            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3474            None => {
3475                error!(
3476                    "Object with in parent_entry is missing from object store, datastore is \
3477                     inconsistent",
3478                );
3479                Err(UserInputError::ObjectNotFound {
3480                    object_id: *object_id,
3481                    version: Some(obj_ref.1),
3482                }
3483                .into())
3484            }
3485        }
3486    }
3487
3488    #[instrument(level = "trace", skip_all)]
3489    fn read_object_at_version(
3490        &self,
3491        object_id: &ObjectID,
3492        version: SequenceNumber,
3493    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3494        let Some(object) = self
3495            .get_object_cache_reader()
3496            .get_object_by_key(object_id, version)?
3497        else {
3498            return Ok(None);
3499        };
3500
3501        let layout = self.get_object_layout(&object)?;
3502        Ok(Some((object, layout)))
3503    }
3504
3505    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3506        let layout = object
3507            .data
3508            .try_as_move()
3509            .map(|object| {
3510                into_struct_layout(
3511                    self.load_epoch_store_one_call_per_task()
3512                        .executor()
3513                        // TODO(cache) - must read through cache
3514                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3515                        .get_annotated_layout(&object.type_().clone().into())?,
3516                )
3517            })
3518            .transpose()?;
3519        Ok(layout)
3520    }
3521
3522    fn get_owner_at_version(
3523        &self,
3524        object_id: &ObjectID,
3525        version: SequenceNumber,
3526    ) -> IotaResult<Owner> {
3527        self.get_object_store()
3528            .get_object_by_key(object_id, version)?
3529            .ok_or_else(|| {
3530                IotaError::from(UserInputError::ObjectNotFound {
3531                    object_id: *object_id,
3532                    version: Some(version),
3533                })
3534            })
3535            .map(|o| o.owner)
3536    }
3537
3538    #[instrument(level = "trace", skip_all)]
3539    pub fn get_owner_objects(
3540        &self,
3541        owner: IotaAddress,
3542        // If `Some`, the query will start from the next item after the specified cursor
3543        cursor: Option<ObjectID>,
3544        limit: usize,
3545        filter: Option<IotaObjectDataFilter>,
3546    ) -> IotaResult<Vec<ObjectInfo>> {
3547        if let Some(indexes) = &self.indexes {
3548            indexes.get_owner_objects(owner, cursor, limit, filter)
3549        } else {
3550            Err(IotaError::IndexStoreNotAvailable)
3551        }
3552    }
3553
3554    #[instrument(level = "trace", skip_all)]
3555    pub fn get_owned_coins_iterator_with_cursor(
3556        &self,
3557        owner: IotaAddress,
3558        // If `Some`, the query will start from the next item after the specified cursor
3559        cursor: (String, ObjectID),
3560        limit: usize,
3561        one_coin_type_only: bool,
3562    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3563        if let Some(indexes) = &self.indexes {
3564            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3565        } else {
3566            Err(IotaError::IndexStoreNotAvailable)
3567        }
3568    }
3569
3570    #[instrument(level = "trace", skip_all)]
3571    pub fn get_owner_objects_iterator(
3572        &self,
3573        owner: IotaAddress,
3574        // If `Some`, the query will start from the next item after the specified cursor
3575        cursor: Option<ObjectID>,
3576        filter: Option<IotaObjectDataFilter>,
3577    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3578        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3579        if let Some(indexes) = &self.indexes {
3580            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3581        } else {
3582            Err(IotaError::IndexStoreNotAvailable)
3583        }
3584    }
3585
3586    #[instrument(level = "trace", skip_all)]
3587    pub async fn get_move_objects<T>(
3588        &self,
3589        owner: IotaAddress,
3590        type_: MoveObjectType,
3591    ) -> IotaResult<Vec<T>>
3592    where
3593        T: DeserializeOwned,
3594    {
3595        let object_ids = self
3596            .get_owner_objects_iterator(owner, None, None)?
3597            .filter(|o| match &o.type_ {
3598                ObjectType::Struct(s) => &type_ == s,
3599                ObjectType::Package => false,
3600            })
3601            .map(|info| ObjectKey(info.object_id, info.version))
3602            .collect::<Vec<_>>();
3603        let mut move_objects = vec![];
3604
3605        let objects = self
3606            .get_object_store()
3607            .multi_get_objects_by_key(&object_ids)?;
3608
3609        for (o, id) in objects.into_iter().zip(object_ids) {
3610            let object = o.ok_or_else(|| {
3611                IotaError::from(UserInputError::ObjectNotFound {
3612                    object_id: id.0,
3613                    version: Some(id.1),
3614                })
3615            })?;
3616            let move_object = object.data.try_as_move().ok_or_else(|| {
3617                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3618            })?;
3619            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3620                IotaError::ObjectDeserialization {
3621                    error: format!("{e}"),
3622                }
3623            })?);
3624        }
3625        Ok(move_objects)
3626    }
3627
3628    #[instrument(level = "trace", skip_all)]
3629    pub fn get_dynamic_fields(
3630        &self,
3631        owner: ObjectID,
3632        // If `Some`, the query will start from the next item after the specified cursor
3633        cursor: Option<ObjectID>,
3634        limit: usize,
3635    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3636        Ok(self
3637            .get_dynamic_fields_iterator(owner, cursor)?
3638            .take(limit)
3639            .collect::<Result<Vec<_>, _>>()?)
3640    }
3641
3642    fn get_dynamic_fields_iterator(
3643        &self,
3644        owner: ObjectID,
3645        // If `Some`, the query will start from the next item after the specified cursor
3646        cursor: Option<ObjectID>,
3647    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3648    {
3649        if let Some(indexes) = &self.indexes {
3650            indexes.get_dynamic_fields_iterator(owner, cursor)
3651        } else {
3652            Err(IotaError::IndexStoreNotAvailable)
3653        }
3654    }
3655
3656    #[instrument(level = "trace", skip_all)]
3657    pub fn get_dynamic_field_object_id(
3658        &self,
3659        owner: ObjectID,
3660        name_type: TypeTag,
3661        name_bcs_bytes: &[u8],
3662    ) -> IotaResult<Option<ObjectID>> {
3663        if let Some(indexes) = &self.indexes {
3664            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3665        } else {
3666            Err(IotaError::IndexStoreNotAvailable)
3667        }
3668    }
3669
3670    #[instrument(level = "trace", skip_all)]
3671    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3672        Ok(self.get_indexes()?.next_sequence_number())
3673    }
3674
3675    #[instrument(level = "trace", skip_all)]
3676    pub async fn get_executed_transaction_and_effects(
3677        &self,
3678        digest: TransactionDigest,
3679        kv_store: Arc<TransactionKeyValueStore>,
3680    ) -> IotaResult<(Transaction, TransactionEffects)> {
3681        let transaction = kv_store.get_tx(digest).await?;
3682        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3683        Ok((transaction, effects))
3684    }
3685
3686    #[instrument(level = "trace", skip_all)]
3687    pub fn multi_get_checkpoint_by_sequence_number(
3688        &self,
3689        sequence_numbers: &[CheckpointSequenceNumber],
3690    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3691        Ok(self
3692            .checkpoint_store
3693            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3694    }
3695
3696    #[instrument(level = "trace", skip_all)]
3697    pub fn get_transaction_events(
3698        &self,
3699        digest: &TransactionEventsDigest,
3700    ) -> IotaResult<TransactionEvents> {
3701        self.get_transaction_cache_reader()
3702            .get_events(digest)?
3703            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3704    }
3705
3706    pub fn get_transaction_input_objects(
3707        &self,
3708        effects: &TransactionEffects,
3709    ) -> anyhow::Result<Vec<Object>> {
3710        let input_object_keys = effects
3711            .modified_at_versions()
3712            .into_iter()
3713            .map(|(object_id, version)| ObjectKey(object_id, version))
3714            .collect::<Vec<_>>();
3715
3716        let input_objects = self
3717            .get_object_store()
3718            .multi_get_objects_by_key(&input_object_keys)?
3719            .into_iter()
3720            .enumerate()
3721            .map(|(idx, maybe_object)| {
3722                maybe_object.ok_or_else(|| {
3723                    anyhow::anyhow!(
3724                        "missing input object key {:?} from tx {}",
3725                        input_object_keys[idx],
3726                        effects.transaction_digest()
3727                    )
3728                })
3729            })
3730            .collect::<anyhow::Result<Vec<_>>>()?;
3731        Ok(input_objects)
3732    }
3733
3734    pub fn get_transaction_output_objects(
3735        &self,
3736        effects: &TransactionEffects,
3737    ) -> anyhow::Result<Vec<Object>> {
3738        let output_object_keys = effects
3739            .all_changed_objects()
3740            .into_iter()
3741            .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3742            .collect::<Vec<_>>();
3743
3744        let output_objects = self
3745            .get_object_store()
3746            .multi_get_objects_by_key(&output_object_keys)?
3747            .into_iter()
3748            .enumerate()
3749            .map(|(idx, maybe_object)| {
3750                maybe_object.ok_or_else(|| {
3751                    anyhow::anyhow!(
3752                        "missing output object key {:?} from tx {}",
3753                        output_object_keys[idx],
3754                        effects.transaction_digest()
3755                    )
3756                })
3757            })
3758            .collect::<anyhow::Result<Vec<_>>>()?;
3759        Ok(output_objects)
3760    }
3761
3762    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3763        match &self.indexes {
3764            Some(i) => Ok(i.clone()),
3765            None => Err(IotaError::UnsupportedFeature {
3766                error: "extended object indexing is not enabled on this server".into(),
3767            }),
3768        }
3769    }
3770
3771    pub async fn get_transactions_for_tests(
3772        self: &Arc<Self>,
3773        filter: Option<TransactionFilter>,
3774        cursor: Option<TransactionDigest>,
3775        limit: Option<usize>,
3776        reverse: bool,
3777    ) -> IotaResult<Vec<TransactionDigest>> {
3778        let metrics = KeyValueStoreMetrics::new_for_tests();
3779        let kv_store = Arc::new(TransactionKeyValueStore::new(
3780            "rocksdb",
3781            metrics,
3782            self.clone(),
3783        ));
3784        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3785            .await
3786    }
3787
3788    #[instrument(level = "trace", skip_all)]
3789    pub async fn get_transactions(
3790        &self,
3791        kv_store: &Arc<TransactionKeyValueStore>,
3792        filter: Option<TransactionFilter>,
3793        // If `Some`, the query will start from the next item after the specified cursor
3794        cursor: Option<TransactionDigest>,
3795        limit: Option<usize>,
3796        reverse: bool,
3797    ) -> IotaResult<Vec<TransactionDigest>> {
3798        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3799            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3800            let iter = checkpoint_contents.iter().map(|c| c.transaction);
3801            if reverse {
3802                let iter = iter
3803                    .rev()
3804                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3805                    .skip(usize::from(cursor.is_some()));
3806                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3807            } else {
3808                let iter = iter
3809                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3810                    .skip(usize::from(cursor.is_some()));
3811                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3812            }
3813        }
3814        self.get_indexes()?
3815            .get_transactions(filter, cursor, limit, reverse)
3816    }
3817
3818    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3819        &self.checkpoint_store
3820    }
3821
3822    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3823        self.get_checkpoint_store()
3824            .get_highest_executed_checkpoint_seq_number()?
3825            .ok_or(IotaError::UserInput {
3826                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3827            })
3828    }
3829
3830    #[cfg(msim)]
3831    pub fn get_highest_pruned_checkpoint_for_testing(
3832        &self,
3833    ) -> IotaResult<CheckpointSequenceNumber> {
3834        self.database_for_testing()
3835            .perpetual_tables
3836            .get_highest_pruned_checkpoint()
3837    }
3838
3839    #[instrument(level = "trace", skip_all)]
3840    pub fn get_checkpoint_summary_by_sequence_number(
3841        &self,
3842        sequence_number: CheckpointSequenceNumber,
3843    ) -> IotaResult<CheckpointSummary> {
3844        let verified_checkpoint = self
3845            .get_checkpoint_store()
3846            .get_checkpoint_by_sequence_number(sequence_number)?;
3847        match verified_checkpoint {
3848            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3849            None => Err(IotaError::UserInput {
3850                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3851            }),
3852        }
3853    }
3854
3855    #[instrument(level = "trace", skip_all)]
3856    pub fn get_checkpoint_summary_by_digest(
3857        &self,
3858        digest: CheckpointDigest,
3859    ) -> IotaResult<CheckpointSummary> {
3860        let verified_checkpoint = self
3861            .get_checkpoint_store()
3862            .get_checkpoint_by_digest(&digest)?;
3863        match verified_checkpoint {
3864            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3865            None => Err(IotaError::UserInput {
3866                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3867            }),
3868        }
3869    }
3870
3871    #[instrument(level = "trace", skip_all)]
3872    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3873        if is_system_package(package_id) {
3874            return self.find_genesis_txn_digest();
3875        }
3876        Ok(self
3877            .get_object_read(&package_id)?
3878            .into_object()?
3879            .previous_transaction)
3880    }
3881
3882    #[instrument(level = "trace", skip_all)]
3883    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3884        let summary = self
3885            .get_verified_checkpoint_by_sequence_number(0)?
3886            .into_message();
3887        let content = self.get_checkpoint_contents(summary.content_digest)?;
3888        let genesis_transaction = content.enumerate_transactions(&summary).next();
3889        Ok(genesis_transaction
3890            .ok_or(IotaError::UserInput {
3891                error: UserInputError::GenesisTransactionNotFound,
3892            })?
3893            .1
3894            .transaction)
3895    }
3896
3897    #[instrument(level = "trace", skip_all)]
3898    pub fn get_verified_checkpoint_by_sequence_number(
3899        &self,
3900        sequence_number: CheckpointSequenceNumber,
3901    ) -> IotaResult<VerifiedCheckpoint> {
3902        let verified_checkpoint = self
3903            .get_checkpoint_store()
3904            .get_checkpoint_by_sequence_number(sequence_number)?;
3905        match verified_checkpoint {
3906            Some(verified_checkpoint) => Ok(verified_checkpoint),
3907            None => Err(IotaError::UserInput {
3908                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3909            }),
3910        }
3911    }
3912
3913    #[instrument(level = "trace", skip_all)]
3914    pub fn get_verified_checkpoint_summary_by_digest(
3915        &self,
3916        digest: CheckpointDigest,
3917    ) -> IotaResult<VerifiedCheckpoint> {
3918        let verified_checkpoint = self
3919            .get_checkpoint_store()
3920            .get_checkpoint_by_digest(&digest)?;
3921        match verified_checkpoint {
3922            Some(verified_checkpoint) => Ok(verified_checkpoint),
3923            None => Err(IotaError::UserInput {
3924                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3925            }),
3926        }
3927    }
3928
3929    #[instrument(level = "trace", skip_all)]
3930    pub fn get_checkpoint_contents(
3931        &self,
3932        digest: CheckpointContentsDigest,
3933    ) -> IotaResult<CheckpointContents> {
3934        self.get_checkpoint_store()
3935            .get_checkpoint_contents(&digest)?
3936            .ok_or(IotaError::UserInput {
3937                error: UserInputError::CheckpointContentsNotFound(digest),
3938            })
3939    }
3940
3941    #[instrument(level = "trace", skip_all)]
3942    pub fn get_checkpoint_contents_by_sequence_number(
3943        &self,
3944        sequence_number: CheckpointSequenceNumber,
3945    ) -> IotaResult<CheckpointContents> {
3946        let verified_checkpoint = self
3947            .get_checkpoint_store()
3948            .get_checkpoint_by_sequence_number(sequence_number)?;
3949        match verified_checkpoint {
3950            Some(verified_checkpoint) => {
3951                let content_digest = verified_checkpoint.into_inner().content_digest;
3952                self.get_checkpoint_contents(content_digest)
3953            }
3954            None => Err(IotaError::UserInput {
3955                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3956            }),
3957        }
3958    }
3959
3960    #[instrument(level = "trace", skip_all)]
3961    pub async fn query_events(
3962        &self,
3963        kv_store: &Arc<TransactionKeyValueStore>,
3964        query: EventFilter,
3965        // If `Some`, the query will start from the next item after the specified cursor
3966        cursor: Option<EventID>,
3967        limit: usize,
3968        descending: bool,
3969    ) -> IotaResult<Vec<IotaEvent>> {
3970        let index_store = self.get_indexes()?;
3971
3972        // Get the tx_num from tx_digest
3973        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
3974            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
3975                IotaError::TransactionNotFound {
3976                    digest: cursor.tx_digest,
3977                },
3978            )?;
3979            (tx_seq, cursor.event_seq as usize)
3980        } else if descending {
3981            (u64::MAX, usize::MAX)
3982        } else {
3983            (0, 0)
3984        };
3985
3986        let limit = limit + 1;
3987        let mut event_keys = match query {
3988            EventFilter::All(filters) => {
3989                if filters.is_empty() {
3990                    index_store.all_events(tx_num, event_num, limit, descending)?
3991                } else {
3992                    return Err(IotaError::UserInput {
3993                        error: UserInputError::Unsupported(
3994                            "This query type does not currently support filter combinations"
3995                                .to_string(),
3996                        ),
3997                    });
3998                }
3999            }
4000            EventFilter::Transaction(digest) => {
4001                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4002            }
4003            EventFilter::MoveModule { package, module } => {
4004                let module_id = ModuleId::new(package.into(), module);
4005                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4006            }
4007            EventFilter::MoveEventType(struct_name) => index_store
4008                .events_by_move_event_struct_name(
4009                    &struct_name,
4010                    tx_num,
4011                    event_num,
4012                    limit,
4013                    descending,
4014                )?,
4015            EventFilter::Sender(sender) => {
4016                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4017            }
4018            EventFilter::TimeRange {
4019                start_time,
4020                end_time,
4021            } => index_store
4022                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4023            EventFilter::MoveEventModule { package, module } => index_store
4024                .events_by_move_event_module(
4025                    &ModuleId::new(package.into(), module),
4026                    tx_num,
4027                    event_num,
4028                    limit,
4029                    descending,
4030                )?,
4031            // not using "_ =>" because we want to make sure we remember to add new variants here
4032            EventFilter::Package(_)
4033            | EventFilter::MoveEventField { .. }
4034            | EventFilter::Any(_)
4035            | EventFilter::And(_, _)
4036            | EventFilter::Or(_, _) => {
4037                return Err(IotaError::UserInput {
4038                    error: UserInputError::Unsupported(
4039                        "This query type is not supported by the full node.".to_string(),
4040                    ),
4041                });
4042            }
4043        };
4044
4045        // skip one event if exclusive cursor is provided,
4046        // otherwise truncate to the original limit.
4047        if cursor.is_some() {
4048            if !event_keys.is_empty() {
4049                event_keys.remove(0);
4050            }
4051        } else {
4052            event_keys.truncate(limit - 1);
4053        }
4054
4055        // get the unique set of digests from the event_keys
4056        let transaction_digests = event_keys
4057            .iter()
4058            .map(|(_, digest, _, _)| *digest)
4059            .collect::<HashSet<_>>()
4060            .into_iter()
4061            .collect::<Vec<_>>();
4062
4063        let events = kv_store
4064            .multi_get_events_by_tx_digests(&transaction_digests)
4065            .await?;
4066
4067        let events_map: HashMap<_, _> =
4068            transaction_digests.iter().zip(events.into_iter()).collect();
4069
4070        let stored_events = event_keys
4071            .into_iter()
4072            .map(|k| {
4073                (
4074                    k,
4075                    events_map
4076                        .get(&k.1)
4077                        .expect("fetched digest is missing")
4078                        .clone()
4079                        .and_then(|e| e.data.get(k.2).cloned()),
4080                )
4081            })
4082            .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4083                event
4084                    .map(|e| (e, tx_digest, event_seq, timestamp))
4085                    .ok_or(IotaError::TransactionEventsNotFound { digest })
4086            })
4087            .collect::<Result<Vec<_>, _>>()?;
4088
4089        let epoch_store = self.load_epoch_store_one_call_per_task();
4090        let backing_store = self.get_backing_package_store().as_ref();
4091        let mut layout_resolver = epoch_store
4092            .executor()
4093            .type_layout_resolver(Box::new(backing_store));
4094        let mut events = vec![];
4095        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4096            events.push(IotaEvent::try_from(
4097                e.clone(),
4098                tx_digest,
4099                event_seq as u64,
4100                Some(timestamp),
4101                layout_resolver.get_annotated_layout(&e.type_)?,
4102            )?)
4103        }
4104        Ok(events)
4105    }
4106
4107    pub async fn insert_genesis_object(&self, object: Object) {
4108        self.get_reconfig_api()
4109            .insert_genesis_object(object)
4110            .expect("Cannot insert genesis object")
4111    }
4112
4113    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4114        futures::future::join_all(
4115            objects
4116                .iter()
4117                .map(|o| self.insert_genesis_object(o.clone())),
4118        )
4119        .await;
4120    }
4121
4122    /// Make a status response for a transaction
4123    #[instrument(level = "trace", skip_all)]
4124    pub fn get_transaction_status(
4125        &self,
4126        transaction_digest: &TransactionDigest,
4127        epoch_store: &Arc<AuthorityPerEpochStore>,
4128    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4129        // TODO: In the case of read path, we should not have to re-sign the effects.
4130        if let Some(effects) =
4131            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4132        {
4133            if let Some(transaction) = self
4134                .get_transaction_cache_reader()
4135                .get_transaction_block(transaction_digest)?
4136            {
4137                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4138                let events = if let Some(digest) = effects.events_digest() {
4139                    self.get_transaction_events(digest)?
4140                } else {
4141                    TransactionEvents::default()
4142                };
4143                return Ok(Some((
4144                    (*transaction).clone().into_message(),
4145                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4146                )));
4147            } else {
4148                // The read of effects and read of transaction are not atomic. It's possible
4149                // that we reverted the transaction (during epoch change) in
4150                // between the above two reads, and we end up having effects but
4151                // not transaction. In this case, we just fall through.
4152                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4153            }
4154        }
4155        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4156            self.metrics.tx_already_processed.inc();
4157            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4158            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4159        } else {
4160            Ok(None)
4161        }
4162    }
4163
4164    /// Get the signed effects of the given transaction. If the effects was
4165    /// signed in a previous epoch, re-sign it so that the caller is able to
4166    /// form a cert of the effects in the current epoch.
4167    #[instrument(level = "trace", skip_all)]
4168    pub fn get_signed_effects_and_maybe_resign(
4169        &self,
4170        transaction_digest: &TransactionDigest,
4171        epoch_store: &Arc<AuthorityPerEpochStore>,
4172    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4173        let effects = self
4174            .get_transaction_cache_reader()
4175            .get_executed_effects(transaction_digest)?;
4176        match effects {
4177            Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4178            None => Ok(None),
4179        }
4180    }
4181
4182    #[instrument(level = "trace", skip_all)]
4183    pub(crate) fn sign_effects(
4184        &self,
4185        effects: TransactionEffects,
4186        epoch_store: &Arc<AuthorityPerEpochStore>,
4187    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4188        let tx_digest = *effects.transaction_digest();
4189        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4190            Some(sig) if sig.epoch == epoch_store.epoch() => {
4191                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4192            }
4193            _ => {
4194                // If the transaction was executed in previous epochs, the validator will
4195                // re-sign the effects with new current epoch so that a client is always able to
4196                // obtain an effects certificate at the current epoch.
4197                //
4198                // Why is this necessary? Consider the following case:
4199                // - assume there are 4 validators
4200                // - Quorum driver gets 2 signed effects before reconfig halt
4201                // - The tx makes it into final checkpoint.
4202                // - 2 validators go away and are replaced in the new epoch.
4203                // - The new epoch begins.
4204                // - The quorum driver cannot complete the partial effects cert from the
4205                //   previous epoch, because it may not be able to reach either of the 2 former
4206                //   validators.
4207                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4208                //   the new epoch, the QD can make a new effects cert and return it to the
4209                //   client.
4210                //
4211                // This is a considered a short-term workaround. Eventually, Quorum Driver
4212                // should be able to return either an effects certificate, -or-
4213                // a proof of inclusion in a checkpoint. In the case above, the
4214                // Quorum Driver would return a proof of inclusion in the final
4215                // checkpoint, and this code would no longer be necessary.
4216                debug!(
4217                    ?tx_digest,
4218                    epoch=?epoch_store.epoch(),
4219                    "Re-signing the effects with the current epoch"
4220                );
4221
4222                let sig = AuthoritySignInfo::new(
4223                    epoch_store.epoch(),
4224                    &effects,
4225                    Intent::iota_app(IntentScope::TransactionEffects),
4226                    self.name,
4227                    &*self.secret,
4228                );
4229
4230                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4231
4232                epoch_store.insert_effects_digest_and_signature(
4233                    &tx_digest,
4234                    effects.digest(),
4235                    &sig,
4236                )?;
4237
4238                effects
4239            }
4240        };
4241
4242        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4243            signed_effects,
4244        ))
4245    }
4246
4247    // Returns coin objects for indexing for fullnode if indexing is enabled.
4248    #[instrument(level = "trace", skip_all)]
4249    fn fullnode_only_get_tx_coins_for_indexing(
4250        &self,
4251        inner_temporary_store: &InnerTemporaryStore,
4252        epoch_store: &Arc<AuthorityPerEpochStore>,
4253    ) -> Option<TxCoins> {
4254        if self.indexes.is_none() || self.is_validator(epoch_store) {
4255            return None;
4256        }
4257        let written_coin_objects = inner_temporary_store
4258            .written
4259            .iter()
4260            .filter_map(|(k, v)| {
4261                if v.is_coin() {
4262                    Some((*k, v.clone()))
4263                } else {
4264                    None
4265                }
4266            })
4267            .collect();
4268        let input_coin_objects = inner_temporary_store
4269            .input_objects
4270            .iter()
4271            .filter_map(|(k, v)| {
4272                if v.is_coin() {
4273                    Some((*k, v.clone()))
4274                } else {
4275                    None
4276                }
4277            })
4278            .collect::<ObjectMap>();
4279        Some((input_coin_objects, written_coin_objects))
4280    }
4281
4282    /// Get the TransactionEnvelope that currently locks the given object, if
4283    /// any. Since object locks are only valid for one epoch, we also need
4284    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4285    /// no lock records for the given object can be found.
4286    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4287    /// object record is at a different version.
4288    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4289    /// certain transaction. Returns None if the a lock record is
4290    /// initialized for the given ObjectRef but not yet locked by any
4291    /// transaction,     or cannot find the transaction in transaction
4292    /// table, because of data race etc.
4293    #[instrument(level = "trace", skip_all)]
4294    pub async fn get_transaction_lock(
4295        &self,
4296        object_ref: &ObjectRef,
4297        epoch_store: &AuthorityPerEpochStore,
4298    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4299        let lock_info = self
4300            .get_object_cache_reader()
4301            .get_lock(*object_ref, epoch_store)?;
4302        let lock_info = match lock_info {
4303            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4304                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4305                    provided_obj_ref: *object_ref,
4306                    current_version: locked_ref.1,
4307                }
4308                .into());
4309            }
4310            ObjectLockStatus::Initialized => {
4311                return Ok(None);
4312            }
4313            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4314        };
4315
4316        epoch_store.get_signed_transaction(&lock_info)
4317    }
4318
4319    pub async fn get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4320        self.get_object_cache_reader().get_objects(objects)
4321    }
4322
4323    pub async fn get_object_or_tombstone(
4324        &self,
4325        object_id: ObjectID,
4326    ) -> IotaResult<Option<ObjectRef>> {
4327        self.get_object_cache_reader()
4328            .get_latest_object_ref_or_tombstone(object_id)
4329    }
4330
4331    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4332    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4333    /// upgrade.
4334    ///
4335    /// This method can be used to dynamic adjust the amount of buffer. If set
4336    /// to 0, the upgrade will go through with only 2f+1 votes.
4337    ///
4338    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4339    /// should have the same value), or you risk halting the chain.
4340    pub fn set_override_protocol_upgrade_buffer_stake(
4341        &self,
4342        expected_epoch: EpochId,
4343        buffer_stake_bps: u64,
4344    ) -> IotaResult {
4345        let epoch_store = self.load_epoch_store_one_call_per_task();
4346        let actual_epoch = epoch_store.epoch();
4347        if actual_epoch != expected_epoch {
4348            return Err(IotaError::WrongEpoch {
4349                expected_epoch,
4350                actual_epoch,
4351            });
4352        }
4353
4354        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4355    }
4356
4357    pub fn clear_override_protocol_upgrade_buffer_stake(
4358        &self,
4359        expected_epoch: EpochId,
4360    ) -> IotaResult {
4361        let epoch_store = self.load_epoch_store_one_call_per_task();
4362        let actual_epoch = epoch_store.epoch();
4363        if actual_epoch != expected_epoch {
4364            return Err(IotaError::WrongEpoch {
4365                expected_epoch,
4366                actual_epoch,
4367            });
4368        }
4369
4370        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4371    }
4372
4373    /// Get the set of system packages that are compiled in to this build, if
4374    /// those packages are compatible with the current versions of those
4375    /// packages on-chain.
4376    pub async fn get_available_system_packages(
4377        &self,
4378        binary_config: &BinaryConfig,
4379    ) -> Vec<ObjectRef> {
4380        let mut results = vec![];
4381
4382        let system_packages = BuiltInFramework::iter_system_packages();
4383
4384        // Add extra framework packages during simtest
4385        #[cfg(msim)]
4386        let extra_packages = framework_injection::get_extra_packages(self.name);
4387        #[cfg(msim)]
4388        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4389
4390        for system_package in system_packages {
4391            let modules = system_package.modules().to_vec();
4392            // In simtests, we could override the current built-in framework packages.
4393            #[cfg(msim)]
4394            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4395                .unwrap_or(modules);
4396
4397            let Some(obj_ref) = iota_framework::compare_system_package(
4398                &self.get_object_store(),
4399                &system_package.id,
4400                &modules,
4401                system_package.dependencies.to_vec(),
4402                binary_config,
4403            )
4404            .await
4405            else {
4406                return vec![];
4407            };
4408            results.push(obj_ref);
4409        }
4410
4411        results
4412    }
4413
4414    /// Return the new versions, module bytes, and dependencies for the packages
4415    /// that have been committed to for a framework upgrade, in
4416    /// `system_packages`.  Loads the module contents from the binary, and
4417    /// performs the following checks:
4418    ///
4419    /// - Whether its contents matches what is on-chain already, in which case
4420    ///   no upgrade is required, and its contents are omitted from the output.
4421    /// - Whether the contents in the binary can form a package whose digest
4422    ///   matches the input, meaning the framework will be upgraded, and this
4423    ///   authority can satisfy that upgrade, in which case the contents are
4424    ///   included in the output.
4425    ///
4426    /// If the current version of the framework can't be loaded, the binary does
4427    /// not contain the bytes for that framework ID, or the resulting
4428    /// package fails the digest check, `None` is returned indicating that
4429    /// this authority cannot run the upgrade that the network voted on.
4430    async fn get_system_package_bytes(
4431        &self,
4432        system_packages: Vec<ObjectRef>,
4433        binary_config: &BinaryConfig,
4434    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4435        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4436        let objects = self.get_objects(&ids).await.expect("read cannot fail");
4437
4438        let mut res = Vec::with_capacity(system_packages.len());
4439        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4440            let prev_transaction = match object {
4441                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4442                    // Skip this one because it doesn't need to be upgraded.
4443                    info!("Framework {} does not need updating", system_package_ref.0);
4444                    continue;
4445                }
4446
4447                Some(cur_object) => cur_object.previous_transaction,
4448                None => TransactionDigest::genesis_marker(),
4449            };
4450
4451            #[cfg(msim)]
4452            let SystemPackage {
4453                id: _,
4454                bytes,
4455                dependencies,
4456            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4457                .unwrap_or_else(|| {
4458                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4459                });
4460
4461            #[cfg(not(msim))]
4462            let SystemPackage {
4463                id: _,
4464                bytes,
4465                dependencies,
4466            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4467
4468            let modules: Vec<_> = bytes
4469                .iter()
4470                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4471                .collect();
4472
4473            let new_object = Object::new_system_package(
4474                &modules,
4475                system_package_ref.1,
4476                dependencies.clone(),
4477                prev_transaction,
4478            );
4479
4480            let new_ref = new_object.compute_object_reference();
4481            if new_ref != system_package_ref {
4482                error!(
4483                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4484                );
4485                return None;
4486            }
4487
4488            res.push((system_package_ref.1, bytes, dependencies));
4489        }
4490
4491        Some(res)
4492    }
4493
4494    /// Returns the new protocol version and system packages that the network
4495    /// has voted to upgrade to. If the proposed protocol version is not
4496    /// supported, None is returned.
4497    fn is_protocol_version_supported_v1(
4498        proposed_protocol_version: ProtocolVersion,
4499        committee: &Committee,
4500        capabilities: Vec<AuthorityCapabilitiesV1>,
4501        mut buffer_stake_bps: u64,
4502    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
4503        if buffer_stake_bps > 10000 {
4504            warn!("clamping buffer_stake_bps to 10000");
4505            buffer_stake_bps = 10000;
4506        }
4507
4508        // For each validator, gather the protocol version and system packages that it
4509        // would like to upgrade to in the next epoch.
4510        let mut desired_upgrades: Vec<_> = capabilities
4511            .into_iter()
4512            .filter_map(|mut cap| {
4513                // A validator that lists no packages is voting against any change at all.
4514                if cap.available_system_packages.is_empty() {
4515                    return None;
4516                }
4517
4518                cap.available_system_packages.sort();
4519
4520                info!(
4521                    "validator {:?} supports {:?} with system packages: {:?}",
4522                    cap.authority.concise(),
4523                    cap.supported_protocol_versions,
4524                    cap.available_system_packages,
4525                );
4526
4527                // A validator that only supports the current protocol version is also voting
4528                // against any change, because framework upgrades always require a protocol
4529                // version bump.
4530                cap.supported_protocol_versions
4531                    .get_version_digest(proposed_protocol_version)
4532                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4533            })
4534            .collect();
4535
4536        // There can only be one set of votes that have a majority, find one if it
4537        // exists.
4538        desired_upgrades.sort();
4539        desired_upgrades
4540            .into_iter()
4541            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4542            .into_iter()
4543            .find_map(|((digest, packages), group)| {
4544                // should have been filtered out earlier.
4545                assert!(!packages.is_empty());
4546
4547                let mut stake_aggregator: StakeAggregator<(), true> =
4548                    StakeAggregator::new(Arc::new(committee.clone()));
4549
4550                for (_, _, authority) in group {
4551                    stake_aggregator.insert_generic(authority, ());
4552                }
4553
4554                let total_votes = stake_aggregator.total_votes();
4555                let quorum_threshold = committee.quorum_threshold();
4556                let f = committee.total_votes() - committee.quorum_threshold();
4557
4558                // multiple by buffer_stake_bps / 10000, rounded up.
4559                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
4560                let effective_threshold = quorum_threshold + buffer_stake;
4561
4562                info!(
4563                    protocol_config_digest = ?digest,
4564                    ?total_votes,
4565                    ?quorum_threshold,
4566                    ?buffer_stake_bps,
4567                    ?effective_threshold,
4568                    ?proposed_protocol_version,
4569                    ?packages,
4570                    "support for upgrade"
4571                );
4572
4573                let has_support = total_votes >= effective_threshold;
4574                has_support.then_some((proposed_protocol_version, packages))
4575            })
4576    }
4577
4578    /// Selects the highest supported protocol version and system packages that
4579    /// the network has voted to upgrade to. If no upgrade is supported,
4580    /// returns the current protocol version and system packages.
4581    fn choose_protocol_version_and_system_packages_v1(
4582        current_protocol_version: ProtocolVersion,
4583        committee: &Committee,
4584        capabilities: Vec<AuthorityCapabilitiesV1>,
4585        buffer_stake_bps: u64,
4586    ) -> (ProtocolVersion, Vec<ObjectRef>) {
4587        let mut next_protocol_version = current_protocol_version;
4588        let mut system_packages = vec![];
4589
4590        // Finds the highest supported protocol version and system packages by
4591        // incrementing the proposed protocol version by one until no further
4592        // upgrades are supported.
4593        while let Some((version, packages)) = Self::is_protocol_version_supported_v1(
4594            next_protocol_version + 1,
4595            committee,
4596            capabilities.clone(),
4597            buffer_stake_bps,
4598        ) {
4599            next_protocol_version = version;
4600            system_packages = packages;
4601        }
4602
4603        (next_protocol_version, system_packages)
4604    }
4605
4606    #[instrument(level = "debug", skip_all)]
4607    fn create_authenticator_state_tx(
4608        &self,
4609        epoch_store: &Arc<AuthorityPerEpochStore>,
4610    ) -> Option<EndOfEpochTransactionKind> {
4611        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4612            info!("authenticator state transactions not enabled");
4613            return None;
4614        }
4615
4616        let authenticator_state_exists = epoch_store.authenticator_state_exists();
4617        let tx = if authenticator_state_exists {
4618            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4619            let min_epoch =
4620                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4621            let authenticator_obj_initial_shared_version = epoch_store
4622                .epoch_start_config()
4623                .authenticator_obj_initial_shared_version()
4624                .expect("initial version must exist");
4625
4626            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4627                min_epoch,
4628                authenticator_obj_initial_shared_version,
4629            );
4630
4631            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4632
4633            tx
4634        } else {
4635            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4636            info!("Creating AuthenticatorStateCreate tx");
4637            tx
4638        };
4639        Some(tx)
4640    }
4641
4642    /// Creates and execute the advance epoch transaction to effects without
4643    /// committing it to the database. The effects of the change epoch tx
4644    /// are only written to the database after a certified checkpoint has been
4645    /// formed and executed by CheckpointExecutor.
4646    ///
4647    /// When a framework upgraded has been decided on, but the validator does
4648    /// not have the new versions of the packages locally, the validator
4649    /// cannot form the ChangeEpochTx. In this case it returns Err,
4650    /// indicating that the checkpoint builder should give up trying to make the
4651    /// final checkpoint. As long as the network is able to create a certified
4652    /// checkpoint (which should be ensured by the capabilities vote), it
4653    /// will arrive via state sync and be executed by CheckpointExecutor.
4654    #[instrument(level = "error", skip_all)]
4655    pub async fn create_and_execute_advance_epoch_tx(
4656        &self,
4657        epoch_store: &Arc<AuthorityPerEpochStore>,
4658        gas_cost_summary: &GasCostSummary,
4659        checkpoint: CheckpointSequenceNumber,
4660        epoch_start_timestamp_ms: CheckpointTimestamp,
4661    ) -> anyhow::Result<(
4662        IotaSystemState,
4663        Option<SystemEpochInfoEvent>,
4664        TransactionEffects,
4665    )> {
4666        let mut txns = Vec::new();
4667
4668        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4669            txns.push(tx);
4670        }
4671
4672        let next_epoch = epoch_store.epoch() + 1;
4673
4674        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4675
4676        let (next_epoch_protocol_version, next_epoch_system_packages) =
4677            Self::choose_protocol_version_and_system_packages_v1(
4678                epoch_store.protocol_version(),
4679                epoch_store.committee(),
4680                epoch_store
4681                    .get_capabilities_v1()
4682                    .expect("read capabilities from db cannot fail"),
4683                buffer_stake_bps,
4684            );
4685
4686        // since system packages are created during the current epoch, they should abide
4687        // by the rules of the current epoch, including the current epoch's max
4688        // Move binary format version
4689        let config = epoch_store.protocol_config();
4690        let binary_config = to_binary_config(config);
4691        let Some(next_epoch_system_package_bytes) = self
4692            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4693            .await
4694        else {
4695            error!(
4696                "upgraded system packages {:?} are not locally available, cannot create \
4697                ChangeEpochTx. validator binary must be upgraded to the correct version!",
4698                next_epoch_system_packages
4699            );
4700            // the checkpoint builder will keep retrying forever when it hits this error.
4701            // Eventually, one of two things will happen:
4702            // - The operator will upgrade this binary to one that has the new packages
4703            //   locally, and this function will succeed.
4704            // - The final checkpoint will be certified by other validators, we will receive
4705            //   it via state sync, and execute it. This will upgrade the framework
4706            //   packages, reconfigure, and most likely shut down in the new epoch (this
4707            //   validator likely doesn't support the new protocol version, or else it
4708            //   should have had the packages.)
4709            bail!("missing system packages: cannot form ChangeEpochTx");
4710        };
4711
4712        // ChangeEpochV2 requires that both options are set - ProtocolDefinedBaseFee and
4713        // MaxCommitteeMembersCount.
4714        if config.protocol_defined_base_fee()
4715            && config.max_committee_members_count_as_option().is_some()
4716        {
4717            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4718                next_epoch,
4719                next_epoch_protocol_version,
4720                gas_cost_summary.storage_cost,
4721                gas_cost_summary.computation_cost,
4722                gas_cost_summary.computation_cost_burned,
4723                gas_cost_summary.storage_rebate,
4724                gas_cost_summary.non_refundable_storage_fee,
4725                epoch_start_timestamp_ms,
4726                next_epoch_system_package_bytes,
4727            ));
4728        } else {
4729            txns.push(EndOfEpochTransactionKind::new_change_epoch(
4730                next_epoch,
4731                next_epoch_protocol_version,
4732                gas_cost_summary.storage_cost,
4733                gas_cost_summary.computation_cost,
4734                gas_cost_summary.storage_rebate,
4735                gas_cost_summary.non_refundable_storage_fee,
4736                epoch_start_timestamp_ms,
4737                next_epoch_system_package_bytes,
4738            ));
4739        }
4740
4741        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4742
4743        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4744            tx.clone(),
4745            epoch_store.epoch(),
4746            checkpoint,
4747        );
4748
4749        let tx_digest = executable_tx.digest();
4750
4751        info!(
4752            ?next_epoch,
4753            ?next_epoch_protocol_version,
4754            ?next_epoch_system_packages,
4755            computation_cost=?gas_cost_summary.computation_cost,
4756            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4757            storage_cost=?gas_cost_summary.storage_cost,
4758            storage_rebate=?gas_cost_summary.storage_rebate,
4759            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4760            ?tx_digest,
4761            "Creating advance epoch transaction"
4762        );
4763
4764        fail_point_async!("change_epoch_tx_delay");
4765        let tx_lock = epoch_store.acquire_tx_lock(tx_digest).await;
4766
4767        // The tx could have been executed by state sync already - if so simply return
4768        // an error. The checkpoint builder will shortly be terminated by
4769        // reconfiguration anyway.
4770        if self
4771            .get_transaction_cache_reader()
4772            .is_tx_already_executed(tx_digest)
4773            .expect("read cannot fail")
4774        {
4775            warn!("change epoch tx has already been executed via state sync");
4776            bail!("change epoch tx has already been executed via state sync",);
4777        }
4778
4779        let execution_guard = self
4780            .execution_lock_for_executable_transaction(&executable_tx)
4781            .await?;
4782
4783        // We must manually assign the shared object versions to the transaction before
4784        // executing it. This is because we do not sequence end-of-epoch
4785        // transactions through consensus.
4786        epoch_store
4787            .assign_shared_object_versions_idempotent(
4788                self.get_object_cache_reader().as_ref(),
4789                &[executable_tx.clone()],
4790            )
4791            .await?;
4792
4793        let input_objects =
4794            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4795
4796        let (temporary_store, effects, _execution_error_opt) =
4797            self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4798        let system_obj = get_iota_system_state(&temporary_store.written)
4799            .expect("change epoch tx must write to system object");
4800        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
4801        let system_epoch_info_event = temporary_store
4802            .events
4803            .data
4804            .into_iter()
4805            .find(|event| event.is_system_epoch_info_event())
4806            .map(SystemEpochInfoEvent::from);
4807        // The system epoch info event can be `None` in case if the `advance_epoch`
4808        // Move function call failed and was executed in the safe mode.
4809        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4810
4811        // We must write tx and effects to the state sync tables so that state sync is
4812        // able to deliver to the transaction to CheckpointExecutor after it is
4813        // included in a certified checkpoint.
4814        self.get_state_sync_store()
4815            .insert_transaction_and_effects(&tx, &effects)
4816            .map_err(|err| {
4817                let err: anyhow::Error = err.into();
4818                err
4819            })?;
4820
4821        info!(
4822            "Effects summary of the change epoch transaction: {:?}",
4823            effects.summary_for_debug()
4824        );
4825        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
4826        // The change epoch transaction cannot fail to execute.
4827        assert!(effects.status().is_ok());
4828        Ok((system_obj, system_epoch_info_event, effects))
4829    }
4830
4831    /// This function is called at the very end of the epoch.
4832    /// This step is required before updating new epoch in the db and calling
4833    /// reopen_epoch_db.
4834    #[instrument(level = "error", skip_all)]
4835    async fn revert_uncommitted_epoch_transactions(
4836        &self,
4837        epoch_store: &AuthorityPerEpochStore,
4838    ) -> IotaResult {
4839        {
4840            let state = epoch_store.get_reconfig_state_write_lock_guard();
4841            if state.should_accept_user_certs() {
4842                // Need to change this so that consensus adapter do not accept certificates from
4843                // user. This can happen if our local validator did not initiate
4844                // epoch change locally, but 2f+1 nodes already concluded the
4845                // epoch.
4846                //
4847                // This lock is essentially a barrier for
4848                // `epoch_store.pending_consensus_certificates` table we are reading on the line
4849                // after this block
4850                epoch_store.close_user_certs(state);
4851            }
4852            // lock is dropped here
4853        }
4854        let pending_certificates = epoch_store.pending_consensus_certificates();
4855        info!(
4856            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
4857            pending_certificates.len(),
4858            pending_certificates,
4859        );
4860        for digest in pending_certificates {
4861            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
4862                info!(
4863                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
4864                    digest
4865                );
4866                continue;
4867            }
4868            info!("Reverting {:?} at the end of epoch", digest);
4869            epoch_store.revert_executed_transaction(&digest)?;
4870            self.get_reconfig_api().revert_state_update(&digest)?;
4871        }
4872        info!("All uncommitted local transactions reverted");
4873        Ok(())
4874    }
4875
4876    #[instrument(level = "error", skip_all)]
4877    async fn reopen_epoch_db(
4878        &self,
4879        cur_epoch_store: &AuthorityPerEpochStore,
4880        new_committee: Committee,
4881        epoch_start_configuration: EpochStartConfiguration,
4882        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4883    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
4884        let new_epoch = new_committee.epoch;
4885        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
4886        assert_eq!(
4887            epoch_start_configuration.epoch_start_state().epoch(),
4888            new_committee.epoch
4889        );
4890        fail_point!("before-open-new-epoch-store");
4891        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
4892            self.name,
4893            new_committee,
4894            epoch_start_configuration,
4895            self.get_backing_package_store().clone(),
4896            self.get_object_store().clone(),
4897            expensive_safety_check_config,
4898            cur_epoch_store.get_chain_identifier(),
4899        );
4900        self.epoch_store.store(new_epoch_store.clone());
4901        Ok(new_epoch_store)
4902    }
4903
4904    #[cfg(test)]
4905    pub(crate) fn iter_live_object_set_for_testing(
4906        &self,
4907    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
4908        self.get_accumulator_store()
4909            .iter_cached_live_object_set_for_testing()
4910    }
4911
4912    #[cfg(test)]
4913    pub(crate) fn shutdown_execution_for_test(&self) {
4914        self.tx_execution_shutdown
4915            .lock()
4916            .take()
4917            .unwrap()
4918            .send(())
4919            .unwrap();
4920    }
4921
4922    /// NOTE: this function is only to be used for fuzzing and testing. Never
4923    /// use in prod
4924    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> IotaResult {
4925        self.get_reconfig_api()
4926            .bulk_insert_genesis_objects(objects)?;
4927        self.get_object_cache_reader()
4928            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
4929        self.get_reconfig_api()
4930            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
4931        Ok(())
4932    }
4933}
4934
4935pub struct RandomnessRoundReceiver {
4936    authority_state: Arc<AuthorityState>,
4937    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
4938}
4939
4940impl RandomnessRoundReceiver {
4941    pub fn spawn(
4942        authority_state: Arc<AuthorityState>,
4943        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
4944    ) -> JoinHandle<()> {
4945        let rrr = RandomnessRoundReceiver {
4946            authority_state,
4947            randomness_rx,
4948        };
4949        spawn_monitored_task!(rrr.run())
4950    }
4951
4952    async fn run(mut self) {
4953        info!("RandomnessRoundReceiver event loop started");
4954
4955        loop {
4956            tokio::select! {
4957                maybe_recv = self.randomness_rx.recv() => {
4958                    if let Some((epoch, round, bytes)) = maybe_recv {
4959                        self.handle_new_randomness(epoch, round, bytes);
4960                    } else {
4961                        break;
4962                    }
4963                },
4964            }
4965        }
4966
4967        info!("RandomnessRoundReceiver event loop ended");
4968    }
4969
4970    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
4971    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
4972        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
4973        if epoch_store.epoch() != epoch {
4974            warn!(
4975                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
4976                epoch_store.epoch()
4977            );
4978            return;
4979        }
4980        let transaction = VerifiedTransaction::new_randomness_state_update(
4981            epoch,
4982            round,
4983            bytes,
4984            epoch_store
4985                .epoch_start_config()
4986                .randomness_obj_initial_shared_version(),
4987        );
4988        debug!(
4989            "created randomness state update transaction with digest: {:?}",
4990            transaction.digest()
4991        );
4992        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
4993        let digest = *transaction.digest();
4994
4995        // Send transaction to TransactionManager for execution.
4996        self.authority_state
4997            .transaction_manager()
4998            .enqueue(vec![transaction], &epoch_store);
4999
5000        let authority_state = self.authority_state.clone();
5001        spawn_monitored_task!(async move {
5002            // Wait for transaction execution in a separate task, to avoid deadlock in case
5003            // of out-of-order randomness generation. (Each
5004            // RandomnessStateUpdate depends on the output of the
5005            // RandomnessStateUpdate from the previous round.)
5006            //
5007            // We set a very long timeout so that in case this gets stuck for some reason,
5008            // the validator will eventually crash rather than continuing in a
5009            // zombie mode.
5010            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5011            let result = tokio::time::timeout(
5012                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5013                authority_state
5014                    .get_transaction_cache_reader()
5015                    .notify_read_executed_effects(&[digest]),
5016            )
5017            .await;
5018            let result = match result {
5019                Ok(result) => result,
5020                Err(_) => {
5021                    if cfg!(debug_assertions) {
5022                        // Crash on randomness update execution timeout in debug builds.
5023                        panic!(
5024                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5025                        );
5026                    }
5027                    warn!(
5028                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5029                    );
5030                    // Continue waiting as long as necessary in non-debug builds.
5031                    authority_state
5032                        .get_transaction_cache_reader()
5033                        .notify_read_executed_effects(&[digest])
5034                        .await
5035                }
5036            };
5037
5038            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5039            let effects = effects.pop().expect("should return effects");
5040            if *effects.status() != ExecutionStatus::Success {
5041                panic!(
5042                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5043                );
5044            }
5045            debug!(
5046                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5047            );
5048        });
5049    }
5050}
5051
5052#[async_trait]
5053impl TransactionKeyValueStoreTrait for AuthorityState {
5054    async fn multi_get(
5055        &self,
5056        transaction_keys: &[TransactionDigest],
5057        effects_keys: &[TransactionDigest],
5058    ) -> IotaResult<KVStoreTransactionData> {
5059        let txns = if !transaction_keys.is_empty() {
5060            self.get_transaction_cache_reader()
5061                .multi_get_transaction_blocks(transaction_keys)?
5062                .into_iter()
5063                .map(|t| t.map(|t| (*t).clone().into_inner()))
5064                .collect()
5065        } else {
5066            vec![]
5067        };
5068
5069        let fx = if !effects_keys.is_empty() {
5070            self.get_transaction_cache_reader()
5071                .multi_get_executed_effects(effects_keys)?
5072        } else {
5073            vec![]
5074        };
5075
5076        Ok((txns, fx))
5077    }
5078
5079    async fn multi_get_checkpoints(
5080        &self,
5081        checkpoint_summaries: &[CheckpointSequenceNumber],
5082        checkpoint_contents: &[CheckpointSequenceNumber],
5083        checkpoint_summaries_by_digest: &[CheckpointDigest],
5084    ) -> IotaResult<(
5085        Vec<Option<CertifiedCheckpointSummary>>,
5086        Vec<Option<CheckpointContents>>,
5087        Vec<Option<CertifiedCheckpointSummary>>,
5088    )> {
5089        // TODO: use multi-get methods if it ever becomes important (unlikely)
5090        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5091        let store = self.get_checkpoint_store();
5092        for seq in checkpoint_summaries {
5093            let checkpoint = store
5094                .get_checkpoint_by_sequence_number(*seq)?
5095                .map(|c| c.into_inner());
5096
5097            summaries.push(checkpoint);
5098        }
5099
5100        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5101        for seq in checkpoint_contents {
5102            let checkpoint = store
5103                .get_checkpoint_by_sequence_number(*seq)?
5104                .and_then(|summary| {
5105                    store
5106                        .get_checkpoint_contents(&summary.content_digest)
5107                        .expect("db read cannot fail")
5108                });
5109            contents.push(checkpoint);
5110        }
5111
5112        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5113        for digest in checkpoint_summaries_by_digest {
5114            let checkpoint = store
5115                .get_checkpoint_by_digest(digest)?
5116                .map(|c| c.into_inner());
5117            summaries_by_digest.push(checkpoint);
5118        }
5119
5120        Ok((summaries, contents, summaries_by_digest))
5121    }
5122
5123    async fn get_transaction_perpetual_checkpoint(
5124        &self,
5125        digest: TransactionDigest,
5126    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5127        self.get_checkpoint_cache()
5128            .get_transaction_perpetual_checkpoint(&digest)
5129            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5130    }
5131
5132    async fn get_object(
5133        &self,
5134        object_id: ObjectID,
5135        version: VersionNumber,
5136    ) -> IotaResult<Option<Object>> {
5137        self.get_object_cache_reader()
5138            .get_object_by_key(&object_id, version)
5139    }
5140
5141    async fn multi_get_transactions_perpetual_checkpoints(
5142        &self,
5143        digests: &[TransactionDigest],
5144    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5145        let res = self
5146            .get_checkpoint_cache()
5147            .multi_get_transactions_perpetual_checkpoints(digests)?;
5148
5149        Ok(res
5150            .into_iter()
5151            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5152            .collect())
5153    }
5154
5155    #[instrument(skip(self))]
5156    async fn multi_get_events_by_tx_digests(
5157        &self,
5158        digests: &[TransactionDigest],
5159    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5160        if digests.is_empty() {
5161            return Ok(vec![]);
5162        }
5163        let events_digests: Vec<_> = self
5164            .get_transaction_cache_reader()
5165            .multi_get_executed_effects(digests)?
5166            .into_iter()
5167            .map(|t| t.and_then(|t| t.events_digest().cloned()))
5168            .collect();
5169        let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5170        let mut events = self
5171            .get_transaction_cache_reader()
5172            .multi_get_events(&non_empty_events)?
5173            .into_iter();
5174        Ok(events_digests
5175            .into_iter()
5176            .map(|ev| ev.and_then(|_| events.next()?))
5177            .collect())
5178    }
5179}
5180
5181#[cfg(msim)]
5182pub mod framework_injection {
5183    use std::{
5184        cell::RefCell,
5185        collections::{BTreeMap, BTreeSet},
5186    };
5187
5188    use iota_framework::{BuiltInFramework, SystemPackage};
5189    use iota_types::{
5190        base_types::{AuthorityName, ObjectID},
5191        is_system_package,
5192    };
5193    use move_binary_format::CompiledModule;
5194
5195    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5196
5197    // Thread local cache because all simtests run in a single unique thread.
5198    thread_local! {
5199        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5200    }
5201
5202    type Framework = Vec<CompiledModule>;
5203
5204    pub type PackageUpgradeCallback =
5205        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5206
5207    enum PackageOverrideConfig {
5208        Global(Framework),
5209        PerValidator(PackageUpgradeCallback),
5210    }
5211
5212    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5213        modules
5214            .iter()
5215            .map(|m| {
5216                let mut buf = Vec::new();
5217                m.serialize_with_version(m.version, &mut buf).unwrap();
5218                buf
5219            })
5220            .collect()
5221    }
5222
5223    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5224        OVERRIDE.with(|bs| {
5225            bs.borrow_mut()
5226                .insert(package_id, PackageOverrideConfig::Global(modules))
5227        });
5228    }
5229
5230    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5231        OVERRIDE.with(|bs| {
5232            bs.borrow_mut()
5233                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5234        });
5235    }
5236
5237    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5238        OVERRIDE.with(|cfg| {
5239            cfg.borrow().get(package_id).and_then(|entry| match entry {
5240                PackageOverrideConfig::Global(framework) => {
5241                    Some(compiled_modules_to_bytes(framework))
5242                }
5243                PackageOverrideConfig::PerValidator(func) => {
5244                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5245                }
5246            })
5247        })
5248    }
5249
5250    pub fn get_override_modules(
5251        package_id: &ObjectID,
5252        name: AuthorityName,
5253    ) -> Option<Vec<CompiledModule>> {
5254        OVERRIDE.with(|cfg| {
5255            cfg.borrow().get(package_id).and_then(|entry| match entry {
5256                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5257                PackageOverrideConfig::PerValidator(func) => func(name),
5258            })
5259        })
5260    }
5261
5262    pub fn get_override_system_package(
5263        package_id: &ObjectID,
5264        name: AuthorityName,
5265    ) -> Option<SystemPackage> {
5266        let bytes = get_override_bytes(package_id, name)?;
5267        let dependencies = if is_system_package(*package_id) {
5268            BuiltInFramework::get_package_by_id(package_id)
5269                .dependencies
5270                .to_vec()
5271        } else {
5272            // Assume that entirely new injected packages depend on all existing system
5273            // packages.
5274            BuiltInFramework::all_package_ids()
5275        };
5276        Some(SystemPackage {
5277            id: *package_id,
5278            bytes,
5279            dependencies,
5280        })
5281    }
5282
5283    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5284        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5285        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5286            cfg.borrow()
5287                .keys()
5288                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5289                .collect()
5290        });
5291
5292        extra
5293            .into_iter()
5294            .map(|package| SystemPackage {
5295                id: package,
5296                bytes: get_override_bytes(&package, name).unwrap(),
5297                dependencies: BuiltInFramework::all_package_ids(),
5298            })
5299            .collect()
5300    }
5301}
5302
5303#[derive(Debug, Serialize, Deserialize, Clone)]
5304pub struct ObjDumpFormat {
5305    pub id: ObjectID,
5306    pub version: VersionNumber,
5307    pub digest: ObjectDigest,
5308    pub object: Object,
5309}
5310
5311impl ObjDumpFormat {
5312    fn new(object: Object) -> Self {
5313        let oref = object.compute_object_reference();
5314        Self {
5315            id: oref.0,
5316            version: oref.1,
5317            digest: oref.2,
5318            object,
5319        }
5320    }
5321}
5322
5323#[derive(Debug, Serialize, Deserialize, Clone)]
5324pub struct NodeStateDump {
5325    pub tx_digest: TransactionDigest,
5326    pub sender_signed_data: SenderSignedData,
5327    pub executed_epoch: u64,
5328    pub reference_gas_price: u64,
5329    pub protocol_version: u64,
5330    pub epoch_start_timestamp_ms: u64,
5331    pub computed_effects: TransactionEffects,
5332    pub expected_effects_digest: TransactionEffectsDigest,
5333    pub relevant_system_packages: Vec<ObjDumpFormat>,
5334    pub shared_objects: Vec<ObjDumpFormat>,
5335    pub loaded_child_objects: Vec<ObjDumpFormat>,
5336    pub modified_at_versions: Vec<ObjDumpFormat>,
5337    pub runtime_reads: Vec<ObjDumpFormat>,
5338    pub input_objects: Vec<ObjDumpFormat>,
5339}
5340
5341impl NodeStateDump {
5342    pub fn new(
5343        tx_digest: &TransactionDigest,
5344        effects: &TransactionEffects,
5345        expected_effects_digest: TransactionEffectsDigest,
5346        object_store: &dyn ObjectStore,
5347        epoch_store: &Arc<AuthorityPerEpochStore>,
5348        inner_temporary_store: &InnerTemporaryStore,
5349        certificate: &VerifiedExecutableTransaction,
5350    ) -> IotaResult<Self> {
5351        // Epoch info
5352        let executed_epoch = epoch_store.epoch();
5353        let reference_gas_price = epoch_store.reference_gas_price();
5354        let epoch_start_config = epoch_store.epoch_start_config();
5355        let protocol_version = epoch_store.protocol_version().as_u64();
5356        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5357
5358        // Record all system packages at this version
5359        let mut relevant_system_packages = Vec::new();
5360        for sys_package_id in BuiltInFramework::all_package_ids() {
5361            if let Some(w) = object_store.get_object(&sys_package_id)? {
5362                relevant_system_packages.push(ObjDumpFormat::new(w))
5363            }
5364        }
5365
5366        // Record all the shared objects
5367        let mut shared_objects = Vec::new();
5368        for kind in effects.input_shared_objects() {
5369            match kind {
5370                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5371                    if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1)? {
5372                        shared_objects.push(ObjDumpFormat::new(w))
5373                    }
5374                }
5375                InputSharedObject::ReadDeleted(..)
5376                | InputSharedObject::MutateDeleted(..)
5377                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
5378                                                           * objects. */
5379            }
5380        }
5381
5382        // Record all loaded child objects
5383        // Child objects which are read but not mutated are not tracked anywhere else
5384        let mut loaded_child_objects = Vec::new();
5385        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5386            if let Some(w) = object_store.get_object_by_key(id, meta.version)? {
5387                loaded_child_objects.push(ObjDumpFormat::new(w))
5388            }
5389        }
5390
5391        // Record all modified objects
5392        let mut modified_at_versions = Vec::new();
5393        for (id, ver) in effects.modified_at_versions() {
5394            if let Some(w) = object_store.get_object_by_key(&id, ver)? {
5395                modified_at_versions.push(ObjDumpFormat::new(w))
5396            }
5397        }
5398
5399        // Packages read at runtime, which were not previously loaded into the temoorary
5400        // store Some packages may be fetched at runtime and wont show up in
5401        // input objects
5402        let mut runtime_reads = Vec::new();
5403        for obj in inner_temporary_store
5404            .runtime_packages_loaded_from_db
5405            .values()
5406        {
5407            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5408        }
5409
5410        // All other input objects should already be in `inner_temporary_store.objects`
5411
5412        Ok(Self {
5413            tx_digest: *tx_digest,
5414            executed_epoch,
5415            reference_gas_price,
5416            epoch_start_timestamp_ms,
5417            protocol_version,
5418            relevant_system_packages,
5419            shared_objects,
5420            loaded_child_objects,
5421            modified_at_versions,
5422            runtime_reads,
5423            sender_signed_data: certificate.clone().into_message(),
5424            input_objects: inner_temporary_store
5425                .input_objects
5426                .values()
5427                .map(|o| ObjDumpFormat::new(o.clone()))
5428                .collect(),
5429            computed_effects: effects.clone(),
5430            expected_effects_digest,
5431        })
5432    }
5433
5434    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5435        let mut objects = Vec::new();
5436        objects.extend(self.relevant_system_packages.clone());
5437        objects.extend(self.shared_objects.clone());
5438        objects.extend(self.loaded_child_objects.clone());
5439        objects.extend(self.modified_at_versions.clone());
5440        objects.extend(self.runtime_reads.clone());
5441        objects.extend(self.input_objects.clone());
5442        objects
5443    }
5444
5445    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5446        let file_name = format!(
5447            "{}_{}_NODE_DUMP.json",
5448            self.tx_digest,
5449            AuthorityState::unixtime_now_ms()
5450        );
5451        let mut path = path.to_path_buf();
5452        path.push(&file_name);
5453        let mut file = File::create(path.clone())?;
5454        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5455        Ok(path)
5456    }
5457
5458    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5459        let file = File::open(path)?;
5460        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5461    }
5462}