iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::Duration,
15    vec,
16};
17
18use anyhow::anyhow;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use chrono::prelude::*;
24use fastcrypto::{
25    encoding::{Base58, Encoding},
26    hash::MultisetHash,
27};
28use iota_archival::reader::ArchiveReaderBalancer;
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_storage::{
48    IndexStore,
49    indexes::{CoinInfo, ObjectIndexChanges},
50    key_value_store::{
51        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
52    },
53    key_value_store_metrics::KeyValueStoreMetrics,
54};
55#[cfg(msim)]
56use iota_types::committee::CommitteeTrait;
57use iota_types::{
58    IOTA_SYSTEM_ADDRESS, TypeTag,
59    authenticator_state::get_authenticator_state,
60    base_types::*,
61    committee::{Committee, EpochId, ProtocolVersion},
62    crypto::{AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer, default_hash},
63    deny_list_v1::check_coin_deny_list_v1_during_signing,
64    digests::{ChainIdentifier, TransactionEventsDigest},
65    dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
66    effects::{
67        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
68        TransactionEvents, VerifiedCertifiedTransactionEffects, VerifiedSignedTransactionEffects,
69    },
70    error::{ExecutionError, IotaError, IotaResult, UserInputError},
71    event::{Event, EventID, SystemEpochInfoEvent},
72    executable_transaction::VerifiedExecutableTransaction,
73    execution_config_utils::to_binary_config,
74    execution_status::ExecutionStatus,
75    fp_ensure,
76    gas::{GasCostSummary, IotaGasStatus},
77    inner_temporary_store::{
78        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
79        WrittenObjects,
80    },
81    iota_system_state::{
82        IotaSystemState, IotaSystemStateTrait,
83        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
84    },
85    is_system_package,
86    layout_resolver::{LayoutResolver, into_struct_layout},
87    message_envelope::Message,
88    messages_checkpoint::{
89        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
90        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
91        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
92        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
93    },
94    messages_consensus::AuthorityCapabilitiesV1,
95    messages_grpc::{
96        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
97        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
98        TransactionStatus,
99    },
100    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
101    object::{
102        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
103        bounded_visitor::BoundedVisitor,
104    },
105    storage::{
106        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
107    },
108    supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions},
109    transaction::*,
110};
111use itertools::Itertools;
112use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
113use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
114use once_cell::sync::OnceCell;
115use parking_lot::Mutex;
116use prometheus::{
117    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
118    register_histogram_vec_with_registry, register_histogram_with_registry,
119    register_int_counter_vec_with_registry, register_int_counter_with_registry,
120    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
121};
122use serde::{Deserialize, Serialize, de::DeserializeOwned};
123use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
124use tap::{TapFallible, TapOptional};
125use tokio::{
126    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
127    task::JoinHandle,
128};
129use tracing::{Instrument, debug, error, info, instrument, warn};
130use typed_store::TypedStoreError;
131
132use self::{
133    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
134};
135#[cfg(msim)]
136pub use crate::checkpoints::checkpoint_executor::{
137    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
138};
139use crate::{
140    authority::{
141        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
142        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
143        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
144        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
145        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
146    },
147    authority_client::NetworkAuthorityClient,
148    checkpoints::CheckpointStore,
149    consensus_adapter::ConsensusAdapter,
150    epoch::committee_store::CommitteeStore,
151    execution_cache::{
152        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
153        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
154        TransactionCacheRead,
155    },
156    execution_driver::execution_process,
157    metrics::{LatencyObserver, RateTracker},
158    module_cache_metrics::ResolverMetrics,
159    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
160    rest_index::RestIndexStore,
161    stake_aggregator::StakeAggregator,
162    state_accumulator::{AccumulatorStore, StateAccumulator},
163    subscription_handler::SubscriptionHandler,
164    transaction_input_loader::TransactionInputLoader,
165    transaction_manager::TransactionManager,
166    transaction_outputs::TransactionOutputs,
167    validator_tx_finalizer::ValidatorTxFinalizer,
168    verify_indexes::verify_indexes,
169};
170
171#[cfg(test)]
172#[path = "unit_tests/authority_tests.rs"]
173pub mod authority_tests;
174
175#[cfg(test)]
176#[path = "unit_tests/transaction_tests.rs"]
177pub mod transaction_tests;
178
179#[cfg(test)]
180#[path = "unit_tests/batch_transaction_tests.rs"]
181mod batch_transaction_tests;
182
183#[cfg(test)]
184#[path = "unit_tests/move_integration_tests.rs"]
185pub mod move_integration_tests;
186
187#[cfg(test)]
188#[path = "unit_tests/gas_tests.rs"]
189mod gas_tests;
190
191#[cfg(test)]
192#[path = "unit_tests/batch_verification_tests.rs"]
193mod batch_verification_tests;
194
195#[cfg(test)]
196#[path = "unit_tests/coin_deny_list_tests.rs"]
197mod coin_deny_list_tests;
198
199#[cfg(any(test, feature = "test-utils"))]
200pub mod authority_test_utils;
201
202pub mod authority_per_epoch_store;
203pub mod authority_per_epoch_store_pruner;
204
205pub mod authority_store_pruner;
206pub mod authority_store_tables;
207pub mod authority_store_types;
208pub mod epoch_start_configuration;
209pub mod shared_object_congestion_tracker;
210pub mod shared_object_version_manager;
211#[cfg(any(test, feature = "test-utils"))]
212pub mod test_authority_builder;
213pub mod transaction_deferral;
214
215pub(crate) mod authority_store;
216
217pub static CHAIN_IDENTIFIER: OnceCell<ChainIdentifier> = OnceCell::new();
218
219/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
220pub struct AuthorityMetrics {
221    tx_orders: IntCounter,
222    total_certs: IntCounter,
223    total_cert_attempts: IntCounter,
224    total_effects: IntCounter,
225    pub shared_obj_tx: IntCounter,
226    sponsored_tx: IntCounter,
227    tx_already_processed: IntCounter,
228    num_input_objs: Histogram,
229    num_shared_objects: Histogram,
230    batch_size: Histogram,
231
232    authority_state_handle_transaction_latency: Histogram,
233
234    execute_certificate_latency_single_writer: Histogram,
235    execute_certificate_latency_shared_object: Histogram,
236
237    execute_certificate_with_effects_latency: Histogram,
238    internal_execution_latency: Histogram,
239    execution_load_input_objects_latency: Histogram,
240    prepare_certificate_latency: Histogram,
241    commit_certificate_latency: Histogram,
242    db_checkpoint_latency: Histogram,
243
244    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
245    pub(crate) transaction_manager_num_missing_objects: IntGauge,
246    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
247    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
248    pub(crate) transaction_manager_num_ready: IntGauge,
249    pub(crate) transaction_manager_object_cache_size: IntGauge,
250    pub(crate) transaction_manager_object_cache_hits: IntCounter,
251    pub(crate) transaction_manager_object_cache_misses: IntCounter,
252    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
253    pub(crate) transaction_manager_package_cache_size: IntGauge,
254    pub(crate) transaction_manager_package_cache_hits: IntCounter,
255    pub(crate) transaction_manager_package_cache_misses: IntCounter,
256    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
257    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
258
259    pub(crate) execution_driver_executed_transactions: IntCounter,
260    pub(crate) execution_driver_dispatch_queue: IntGauge,
261    pub(crate) execution_queueing_delay_s: Histogram,
262    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
263    pub(crate) execution_gas_latency_ratio: Histogram,
264
265    pub(crate) skipped_consensus_txns: IntCounter,
266    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
267
268    pub(crate) authority_overload_status: IntGauge,
269    pub(crate) authority_load_shedding_percentage: IntGauge,
270
271    pub(crate) transaction_overload_sources: IntCounterVec,
272
273    /// Post processing metrics
274    post_processing_total_events_emitted: IntCounter,
275    post_processing_total_tx_indexed: IntCounter,
276    post_processing_total_tx_had_event_processed: IntCounter,
277    post_processing_total_failures: IntCounter,
278
279    /// Consensus handler metrics
280    pub consensus_handler_processed: IntCounterVec,
281    pub consensus_handler_transaction_sizes: HistogramVec,
282    pub consensus_handler_num_low_scoring_authorities: IntGauge,
283    pub consensus_handler_scores: IntGaugeVec,
284    pub consensus_handler_deferred_transactions: IntCounter,
285    pub consensus_handler_congested_transactions: IntCounter,
286    pub consensus_handler_cancelled_transactions: IntCounter,
287    pub consensus_handler_max_object_costs: IntGaugeVec,
288    pub consensus_committed_subdags: IntCounterVec,
289    pub consensus_committed_messages: IntGaugeVec,
290    pub consensus_committed_user_transactions: IntGaugeVec,
291    pub consensus_calculated_throughput: IntGauge,
292    pub consensus_calculated_throughput_profile: IntGauge,
293
294    pub limits_metrics: Arc<LimitsMetrics>,
295
296    /// bytecode verifier metrics for tracking timeouts
297    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
298
299    pub authenticator_state_update_failed: IntCounter,
300
301    /// Count of zklogin signatures
302    pub zklogin_sig_count: IntCounter,
303    /// Count of multisig signatures
304    pub multisig_sig_count: IntCounter,
305
306    // Tracks recent average txn queueing delay between when it is ready for execution
307    // until it starts executing.
308    pub execution_queueing_latency: LatencyObserver,
309
310    // Tracks the rate of transactions become ready for execution in transaction manager.
311    // The need for the Mutex is that the tracker is updated in transaction manager and read
312    // in the overload_monitor. There should be low mutex contention because
313    // transaction manager is single threaded and the read rate in overload_monitor is
314    // low. In the case where transaction manager becomes multi-threaded, we can
315    // create one rate tracker per thread.
316    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
317
318    // Tracks the rate of transactions starts execution in execution driver.
319    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
320    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
321}
322
323// Override default Prom buckets for positive numbers in 0-10M range
324const POSITIVE_INT_BUCKETS: &[f64] = &[
325    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
326    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
327    7000000., 10000000.,
328];
329
330const LATENCY_SEC_BUCKETS: &[f64] = &[
331    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
332    10., 20., 30., 60., 90.,
333];
334
335// Buckets for low latency samples. Starts from 10us.
336const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
337    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
338    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
339];
340
341const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
342    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
343    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
344];
345
346pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000;
347
348impl AuthorityMetrics {
349    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
350        let execute_certificate_latency = register_histogram_vec_with_registry!(
351            "authority_state_execute_certificate_latency",
352            "Latency of executing certificates, including waiting for inputs",
353            &["tx_type"],
354            LATENCY_SEC_BUCKETS.to_vec(),
355            registry,
356        )
357        .unwrap();
358
359        let execute_certificate_latency_single_writer =
360            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
361        let execute_certificate_latency_shared_object =
362            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
363
364        Self {
365            tx_orders: register_int_counter_with_registry!(
366                "total_transaction_orders",
367                "Total number of transaction orders",
368                registry,
369            )
370            .unwrap(),
371            total_certs: register_int_counter_with_registry!(
372                "total_transaction_certificates",
373                "Total number of transaction certificates handled",
374                registry,
375            )
376            .unwrap(),
377            total_cert_attempts: register_int_counter_with_registry!(
378                "total_handle_certificate_attempts",
379                "Number of calls to handle_certificate",
380                registry,
381            )
382            .unwrap(),
383            // total_effects == total transactions finished
384            total_effects: register_int_counter_with_registry!(
385                "total_transaction_effects",
386                "Total number of transaction effects produced",
387                registry,
388            )
389            .unwrap(),
390
391            shared_obj_tx: register_int_counter_with_registry!(
392                "num_shared_obj_tx",
393                "Number of transactions involving shared objects",
394                registry,
395            )
396            .unwrap(),
397
398            sponsored_tx: register_int_counter_with_registry!(
399                "num_sponsored_tx",
400                "Number of sponsored transactions",
401                registry,
402            )
403            .unwrap(),
404
405            tx_already_processed: register_int_counter_with_registry!(
406                "num_tx_already_processed",
407                "Number of transaction orders already processed previously",
408                registry,
409            )
410            .unwrap(),
411            num_input_objs: register_histogram_with_registry!(
412                "num_input_objects",
413                "Distribution of number of input TX objects per TX",
414                POSITIVE_INT_BUCKETS.to_vec(),
415                registry,
416            )
417            .unwrap(),
418            num_shared_objects: register_histogram_with_registry!(
419                "num_shared_objects",
420                "Number of shared input objects per TX",
421                POSITIVE_INT_BUCKETS.to_vec(),
422                registry,
423            )
424            .unwrap(),
425            batch_size: register_histogram_with_registry!(
426                "batch_size",
427                "Distribution of size of transaction batch",
428                POSITIVE_INT_BUCKETS.to_vec(),
429                registry,
430            )
431            .unwrap(),
432            authority_state_handle_transaction_latency: register_histogram_with_registry!(
433                "authority_state_handle_transaction_latency",
434                "Latency of handling transactions",
435                LATENCY_SEC_BUCKETS.to_vec(),
436                registry,
437            )
438            .unwrap(),
439            execute_certificate_latency_single_writer,
440            execute_certificate_latency_shared_object,
441            execute_certificate_with_effects_latency: register_histogram_with_registry!(
442                "authority_state_execute_certificate_with_effects_latency",
443                "Latency of executing certificates with effects, including waiting for inputs",
444                LATENCY_SEC_BUCKETS.to_vec(),
445                registry,
446            )
447            .unwrap(),
448            internal_execution_latency: register_histogram_with_registry!(
449                "authority_state_internal_execution_latency",
450                "Latency of actual certificate executions",
451                LATENCY_SEC_BUCKETS.to_vec(),
452                registry,
453            )
454            .unwrap(),
455            execution_load_input_objects_latency: register_histogram_with_registry!(
456                "authority_state_execution_load_input_objects_latency",
457                "Latency of loading input objects for execution",
458                LOW_LATENCY_SEC_BUCKETS.to_vec(),
459                registry,
460            )
461            .unwrap(),
462            prepare_certificate_latency: register_histogram_with_registry!(
463                "authority_state_prepare_certificate_latency",
464                "Latency of executing certificates, before committing the results",
465                LATENCY_SEC_BUCKETS.to_vec(),
466                registry,
467            )
468            .unwrap(),
469            commit_certificate_latency: register_histogram_with_registry!(
470                "authority_state_commit_certificate_latency",
471                "Latency of committing certificate execution results",
472                LATENCY_SEC_BUCKETS.to_vec(),
473                registry,
474            )
475            .unwrap(),
476            db_checkpoint_latency: register_histogram_with_registry!(
477                "db_checkpoint_latency",
478                "Latency of checkpointing dbs",
479                LATENCY_SEC_BUCKETS.to_vec(),
480                registry,
481            ).unwrap(),
482            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
483                "transaction_manager_num_enqueued_certificates",
484                "Current number of certificates enqueued to TransactionManager",
485                &["result"],
486                registry,
487            )
488            .unwrap(),
489            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
490                "transaction_manager_num_missing_objects",
491                "Current number of missing objects in TransactionManager",
492                registry,
493            )
494            .unwrap(),
495            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
496                "transaction_manager_num_pending_certificates",
497                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
498                registry,
499            )
500            .unwrap(),
501            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
502                "transaction_manager_num_executing_certificates",
503                "Number of executing certificates, including queued and actually running certificates",
504                registry,
505            )
506            .unwrap(),
507            transaction_manager_num_ready: register_int_gauge_with_registry!(
508                "transaction_manager_num_ready",
509                "Number of ready transactions in TransactionManager",
510                registry,
511            )
512            .unwrap(),
513            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
514                "transaction_manager_object_cache_size",
515                "Current size of object-availability cache in TransactionManager",
516                registry,
517            )
518            .unwrap(),
519            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
520                "transaction_manager_object_cache_hits",
521                "Number of object-availability cache hits in TransactionManager",
522                registry,
523            )
524            .unwrap(),
525            authority_overload_status: register_int_gauge_with_registry!(
526                "authority_overload_status",
527                "Whether authority is current experiencing overload and enters load shedding mode.",
528                registry)
529            .unwrap(),
530            authority_load_shedding_percentage: register_int_gauge_with_registry!(
531                "authority_load_shedding_percentage",
532                "The percentage of transactions is shed when the authority is in load shedding mode.",
533                registry)
534            .unwrap(),
535            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
536                "transaction_manager_object_cache_misses",
537                "Number of object-availability cache misses in TransactionManager",
538                registry,
539            )
540            .unwrap(),
541            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
542                "transaction_manager_object_cache_evictions",
543                "Number of object-availability cache evictions in TransactionManager",
544                registry,
545            )
546            .unwrap(),
547            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
548                "transaction_manager_package_cache_size",
549                "Current size of package-availability cache in TransactionManager",
550                registry,
551            )
552            .unwrap(),
553            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
554                "transaction_manager_package_cache_hits",
555                "Number of package-availability cache hits in TransactionManager",
556                registry,
557            )
558            .unwrap(),
559            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
560                "transaction_manager_package_cache_misses",
561                "Number of package-availability cache misses in TransactionManager",
562                registry,
563            )
564            .unwrap(),
565            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
566                "transaction_manager_package_cache_evictions",
567                "Number of package-availability cache evictions in TransactionManager",
568                registry,
569            )
570            .unwrap(),
571            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
572                "transaction_manager_transaction_queue_age_s",
573                "Time spent in waiting for transaction in the queue",
574                LATENCY_SEC_BUCKETS.to_vec(),
575                registry,
576            )
577            .unwrap(),
578            transaction_overload_sources: register_int_counter_vec_with_registry!(
579                "transaction_overload_sources",
580                "Number of times each source indicates transaction overload.",
581                &["source"],
582                registry)
583            .unwrap(),
584            execution_driver_executed_transactions: register_int_counter_with_registry!(
585                "execution_driver_executed_transactions",
586                "Cumulative number of transaction executed by execution driver",
587                registry,
588            )
589            .unwrap(),
590            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
591                "execution_driver_dispatch_queue",
592                "Number of transaction pending in execution driver dispatch queue",
593                registry,
594            )
595            .unwrap(),
596            execution_queueing_delay_s: register_histogram_with_registry!(
597                "execution_queueing_delay_s",
598                "Queueing delay between a transaction is ready for execution until it starts executing.",
599                LATENCY_SEC_BUCKETS.to_vec(),
600                registry
601            )
602            .unwrap(),
603            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
604                "prepare_cert_gas_latency_ratio",
605                "The ratio of computation gas divided by VM execution latency.",
606                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
607                registry
608            )
609            .unwrap(),
610            execution_gas_latency_ratio: register_histogram_with_registry!(
611                "execution_gas_latency_ratio",
612                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
613                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
614                registry
615            )
616            .unwrap(),
617            skipped_consensus_txns: register_int_counter_with_registry!(
618                "skipped_consensus_txns",
619                "Total number of consensus transactions skipped",
620                registry,
621            )
622            .unwrap(),
623            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
624                "skipped_consensus_txns_cache_hit",
625                "Total number of consensus transactions skipped because of local cache hit",
626                registry,
627            )
628            .unwrap(),
629            post_processing_total_events_emitted: register_int_counter_with_registry!(
630                "post_processing_total_events_emitted",
631                "Total number of events emitted in post processing",
632                registry,
633            )
634            .unwrap(),
635            post_processing_total_tx_indexed: register_int_counter_with_registry!(
636                "post_processing_total_tx_indexed",
637                "Total number of txes indexed in post processing",
638                registry,
639            )
640            .unwrap(),
641            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
642                "post_processing_total_tx_had_event_processed",
643                "Total number of txes finished event processing in post processing",
644                registry,
645            )
646            .unwrap(),
647            post_processing_total_failures: register_int_counter_with_registry!(
648                "post_processing_total_failures",
649                "Total number of failure in post processing",
650                registry,
651            )
652            .unwrap(),
653            consensus_handler_processed: register_int_counter_vec_with_registry!(
654                "consensus_handler_processed",
655                "Number of transactions processed by consensus handler",
656                &["class"],
657                registry
658            ).unwrap(),
659            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
660                "consensus_handler_transaction_sizes",
661                "Sizes of each type of transactions processed by consensus handler",
662                &["class"],
663                POSITIVE_INT_BUCKETS.to_vec(),
664                registry
665            ).unwrap(),
666            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
667                "consensus_handler_num_low_scoring_authorities",
668                "Number of low scoring authorities based on reputation scores from consensus",
669                registry
670            ).unwrap(),
671            consensus_handler_scores: register_int_gauge_vec_with_registry!(
672                "consensus_handler_scores",
673                "scores from consensus for each authority",
674                &["authority"],
675                registry,
676            ).unwrap(),
677            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
678                "consensus_handler_deferred_transactions",
679                "Number of transactions deferred by consensus handler",
680                registry,
681            ).unwrap(),
682            consensus_handler_congested_transactions: register_int_counter_with_registry!(
683                "consensus_handler_congested_transactions",
684                "Number of transactions deferred by consensus handler due to congestion",
685                registry,
686            ).unwrap(),
687            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
688                "consensus_handler_cancelled_transactions",
689                "Number of transactions cancelled by consensus handler",
690                registry,
691            ).unwrap(),
692            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
693                "consensus_handler_max_congestion_control_object_costs",
694                "Max object costs for congestion control in the current consensus commit",
695                &["commit_type"],
696                registry,
697            ).unwrap(),
698            consensus_committed_subdags: register_int_counter_vec_with_registry!(
699                "consensus_committed_subdags",
700                "Number of committed subdags, sliced by author",
701                &["authority"],
702                registry,
703            ).unwrap(),
704            consensus_committed_messages: register_int_gauge_vec_with_registry!(
705                "consensus_committed_messages",
706                "Total number of committed consensus messages, sliced by author",
707                &["authority"],
708                registry,
709            ).unwrap(),
710            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
711                "consensus_committed_user_transactions",
712                "Number of committed user transactions, sliced by submitter",
713                &["authority"],
714                registry,
715            ).unwrap(),
716            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
717            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
718            authenticator_state_update_failed: register_int_counter_with_registry!(
719                "authenticator_state_update_failed",
720                "Number of failed authenticator state updates",
721                registry,
722            )
723            .unwrap(),
724            zklogin_sig_count: register_int_counter_with_registry!(
725                "zklogin_sig_count",
726                "Count of zkLogin signatures",
727                registry,
728            )
729            .unwrap(),
730            multisig_sig_count: register_int_counter_with_registry!(
731                "multisig_sig_count",
732                "Count of zkLogin signatures",
733                registry,
734            )
735            .unwrap(),
736            consensus_calculated_throughput: register_int_gauge_with_registry!(
737                "consensus_calculated_throughput",
738                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
739                registry,
740            ).unwrap(),
741            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
742                "consensus_calculated_throughput_profile",
743                "The current active calculated throughput profile",
744                registry
745            ).unwrap(),
746            execution_queueing_latency: LatencyObserver::new(),
747            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
748            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
749        }
750    }
751
752    /// Reset metrics that contain `hostname` as one of the labels. This is
753    /// needed to avoid retaining metrics for long-gone committee members and
754    /// only exposing metrics for the committee in the current epoch.
755    pub fn reset_on_reconfigure(&self) {
756        self.consensus_committed_messages.reset();
757        self.consensus_handler_scores.reset();
758        self.consensus_committed_user_transactions.reset();
759    }
760}
761
762/// a Trait object for `Signer` that is:
763/// - Pin, i.e. confined to one place in memory (we don't want to copy private
764///   keys).
765/// - Sync, i.e. can be safely shared between threads.
766///
767/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
768pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
769
770pub struct AuthorityState {
771    // Fixed size, static, identity of the authority
772    /// The name of this authority.
773    pub name: AuthorityName,
774    /// The signature key of the authority.
775    pub secret: StableSyncAuthoritySigner,
776
777    /// The database
778    input_loader: TransactionInputLoader,
779    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
780
781    epoch_store: ArcSwap<AuthorityPerEpochStore>,
782
783    /// This lock denotes current 'execution epoch'.
784    /// Execution acquires read lock, checks certificate epoch and holds it
785    /// until all writes are complete. Reconfiguration acquires write lock,
786    /// changes the epoch and revert all transactions from previous epoch
787    /// that are executed but did not make into checkpoint.
788    execution_lock: RwLock<EpochId>,
789
790    pub indexes: Option<Arc<IndexStore>>,
791    pub rest_index: Option<Arc<RestIndexStore>>,
792
793    pub subscription_handler: Arc<SubscriptionHandler>,
794    checkpoint_store: Arc<CheckpointStore>,
795
796    committee_store: Arc<CommitteeStore>,
797
798    /// Manages pending certificates and their missing input objects.
799    transaction_manager: Arc<TransactionManager>,
800
801    /// Shuts down the execution task. Used only in testing.
802    #[cfg_attr(not(test), expect(unused))]
803    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
804
805    pub metrics: Arc<AuthorityMetrics>,
806    _pruner: AuthorityStorePruner,
807    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
808
809    /// Take db checkpoints of different dbs
810    db_checkpoint_config: DBCheckpointConfig,
811
812    pub config: NodeConfig,
813
814    /// Current overload status in this authority. Updated periodically.
815    pub overload_info: AuthorityOverloadInfo,
816
817    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
818}
819
820/// The authority state encapsulates all state, drives execution, and ensures
821/// safety.
822///
823/// Note the authority operations can be accessed through a read ref (&) and do
824/// not require &mut. Internally a database is synchronized through a mutex
825/// lock.
826///
827/// Repeating valid commands should produce no changes and return no error.
828impl AuthorityState {
829    pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
830        epoch_store.committee().authority_exists(&self.name)
831    }
832
833    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
834        !self.is_validator(epoch_store)
835    }
836
837    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
838        &self.committee_store
839    }
840
841    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
842        self.committee_store.clone()
843    }
844
845    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
846        &self.config.authority_overload_config
847    }
848
849    pub fn get_epoch_state_commitments(
850        &self,
851        epoch: EpochId,
852    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
853        self.checkpoint_store.get_epoch_state_commitments(epoch)
854    }
855
856    /// This is a private method and should be kept that way. It doesn't check
857    /// whether the provided transaction is a system transaction, and hence
858    /// can only be called internally.
859    #[instrument(level = "trace", skip_all)]
860    async fn handle_transaction_impl(
861        &self,
862        transaction: VerifiedTransaction,
863        epoch_store: &Arc<AuthorityPerEpochStore>,
864    ) -> IotaResult<VerifiedSignedTransaction> {
865        // Ensure that validator cannot reconfigure while we are signing the tx
866        let _execution_lock = self.execution_lock_for_signing().await;
867
868        let tx_digest = transaction.digest();
869        let tx_data = transaction.data().transaction_data();
870
871        let input_object_kinds = tx_data.input_objects()?;
872        let receiving_objects_refs = tx_data.receiving_objects();
873
874        // Note: the deny checks may do redundant package loads but:
875        // - they only load packages when there is an active package deny map
876        // - the loads are cached anyway
877        iota_transaction_checks::deny::check_transaction_for_signing(
878            tx_data,
879            transaction.tx_signatures(),
880            &input_object_kinds,
881            &receiving_objects_refs,
882            &self.config.transaction_deny_config,
883            self.get_backing_package_store().as_ref(),
884        )?;
885
886        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
887            Some(tx_digest),
888            &input_object_kinds,
889            &receiving_objects_refs,
890            epoch_store.epoch(),
891        )?;
892
893        let (_gas_status, checked_input_objects) =
894            iota_transaction_checks::check_transaction_input(
895                epoch_store.protocol_config(),
896                epoch_store.reference_gas_price(),
897                tx_data,
898                input_objects,
899                &receiving_objects,
900                &self.metrics.bytecode_verifier_metrics,
901                &self.config.verifier_signing_config,
902            )?;
903
904        check_coin_deny_list_v1_during_signing(
905            tx_data.sender(),
906            &checked_input_objects,
907            &receiving_objects,
908            &self.get_object_store(),
909        )?;
910
911        let owned_objects = checked_input_objects.inner().filter_owned_objects();
912
913        let signed_transaction = VerifiedSignedTransaction::new(
914            epoch_store.epoch(),
915            transaction,
916            self.name,
917            &*self.secret,
918        );
919
920        // Check and write locks, to signed transaction, into the database
921        // The call to self.set_transaction_lock checks the lock is not conflicting,
922        // and returns ConflictingTransaction error in case there is a lock on a
923        // different existing transaction.
924        self.get_cache_writer()
925            .acquire_transaction_locks(epoch_store, &owned_objects, signed_transaction.clone())
926            .await?;
927
928        Ok(signed_transaction)
929    }
930
931    /// Initiate a new transaction.
932    #[instrument(level = "trace", skip_all)]
933    pub async fn handle_transaction(
934        &self,
935        epoch_store: &Arc<AuthorityPerEpochStore>,
936        transaction: VerifiedTransaction,
937    ) -> IotaResult<HandleTransactionResponse> {
938        let tx_digest = *transaction.digest();
939        debug!("handle_transaction");
940
941        // Ensure an idempotent answer.
942        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
943            return Ok(HandleTransactionResponse { status });
944        }
945
946        let _metrics_guard = self
947            .metrics
948            .authority_state_handle_transaction_latency
949            .start_timer();
950        self.metrics.tx_orders.inc();
951
952        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
953        match signed {
954            Ok(s) => {
955                if self.is_validator(epoch_store) {
956                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
957                        let tx = s.clone();
958                        let validator_tx_finalizer = validator_tx_finalizer.clone();
959                        let cache_reader = self.get_transaction_cache_reader().clone();
960                        let epoch_store = epoch_store.clone();
961                        spawn_monitored_task!(epoch_store.within_alive_epoch(
962                            validator_tx_finalizer.track_signed_tx(cache_reader, tx)
963                        ));
964                    }
965                }
966                Ok(HandleTransactionResponse {
967                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
968                })
969            }
970            // It happens frequently that while we are checking the validity of the transaction, it
971            // has just been executed.
972            // In that case, we could still return Ok to avoid showing confusing errors.
973            Err(err) => Ok(HandleTransactionResponse {
974                status: self
975                    .get_transaction_status(&tx_digest, epoch_store)?
976                    .ok_or(err)?
977                    .1,
978            }),
979        }
980    }
981
982    pub fn check_system_overload_at_signing(&self) -> bool {
983        self.config
984            .authority_overload_config
985            .check_system_overload_at_signing
986    }
987
988    pub fn check_system_overload_at_execution(&self) -> bool {
989        self.config
990            .authority_overload_config
991            .check_system_overload_at_execution
992    }
993
994    pub(crate) fn check_system_overload(
995        &self,
996        consensus_adapter: &Arc<ConsensusAdapter>,
997        tx_data: &SenderSignedData,
998        do_authority_overload_check: bool,
999    ) -> IotaResult {
1000        if do_authority_overload_check {
1001            self.check_authority_overload(tx_data).tap_err(|_| {
1002                self.update_overload_metrics("execution_queue");
1003            })?;
1004        }
1005        self.transaction_manager
1006            .check_execution_overload(self.overload_config(), tx_data)
1007            .tap_err(|_| {
1008                self.update_overload_metrics("execution_pending");
1009            })?;
1010        consensus_adapter.check_consensus_overload().tap_err(|_| {
1011            self.update_overload_metrics("consensus");
1012        })?;
1013        Ok(())
1014    }
1015
1016    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1017        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1018            return Ok(());
1019        }
1020
1021        let load_shedding_percentage = self
1022            .overload_info
1023            .load_shedding_percentage
1024            .load(Ordering::Relaxed);
1025        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1026    }
1027
1028    fn update_overload_metrics(&self, source: &str) {
1029        self.metrics
1030            .transaction_overload_sources
1031            .with_label_values(&[source])
1032            .inc();
1033    }
1034
1035    /// Executes a transaction that's known to have correct effects.
1036    /// For such transaction, we don't have to wait for consensus to set shared
1037    /// object locks because we already know the shared object versions
1038    /// based on the effects. This function can be called by a fullnode
1039    /// only.
1040    #[instrument(level = "trace", skip_all)]
1041    pub async fn fullnode_execute_certificate_with_effects(
1042        &self,
1043        transaction: &VerifiedExecutableTransaction,
1044        // NOTE: the caller of this must promise to wait until it
1045        // knows for sure this tx is finalized, namely, it has seen a
1046        // CertifiedTransactionEffects or at least f+1 identifical effects
1047        // digests matching this TransactionEffectsEnvelope, before calling
1048        // this function, in order to prevent a byzantine validator from
1049        // giving us incorrect effects.
1050        effects: &VerifiedCertifiedTransactionEffects,
1051        epoch_store: &Arc<AuthorityPerEpochStore>,
1052    ) -> IotaResult {
1053        assert!(self.is_fullnode(epoch_store));
1054        // NOTE: the fullnode can change epoch during local execution. It should not
1055        // cause data inconsistency, but can be problematic for certain tests.
1056        // The check below mitigates the issue, but it is not a fundamental solution to
1057        // avoid race between local execution and reconfiguration.
1058        if self.epoch_store.load().epoch() != epoch_store.epoch() {
1059            return Err(IotaError::EpochEnded(epoch_store.epoch()));
1060        }
1061        let _metrics_guard = self
1062            .metrics
1063            .execute_certificate_with_effects_latency
1064            .start_timer();
1065        let digest = *transaction.digest();
1066        debug!("execute_certificate_with_effects");
1067        fp_ensure!(
1068            *effects.data().transaction_digest() == digest,
1069            IotaError::ErrorWhileProcessingCertificate {
1070                err: "effects/tx digest mismatch".to_string()
1071            }
1072        );
1073
1074        if transaction.contains_shared_object() {
1075            epoch_store
1076                .acquire_shared_locks_from_effects(
1077                    transaction,
1078                    effects.data(),
1079                    self.get_object_cache_reader().as_ref(),
1080                )
1081                .await?;
1082        }
1083
1084        let expected_effects_digest = effects.digest();
1085
1086        self.transaction_manager
1087            .enqueue(vec![transaction.clone()], epoch_store);
1088
1089        let observed_effects = self
1090            .get_transaction_cache_reader()
1091            .notify_read_executed_effects(&[digest])
1092            .instrument(tracing::debug_span!(
1093                "notify_read_effects_in_execute_certificate_with_effects"
1094            ))
1095            .await?
1096            .pop()
1097            .expect("notify_read_effects should return exactly 1 element");
1098
1099        let observed_effects_digest = observed_effects.digest();
1100        if &observed_effects_digest != expected_effects_digest {
1101            panic!(
1102                "Locally executed effects do not match canonical effects! expected_effects_digest={:?} observed_effects_digest={:?} expected_effects={:?} observed_effects={:?} input_objects={:?}",
1103                expected_effects_digest,
1104                observed_effects_digest,
1105                effects.data(),
1106                observed_effects,
1107                transaction.data().transaction_data().input_objects()
1108            );
1109        }
1110        Ok(())
1111    }
1112
1113    /// Executes a certificate for its effects.
1114    #[instrument(level = "trace", skip_all)]
1115    pub async fn execute_certificate(
1116        &self,
1117        certificate: &VerifiedCertificate,
1118        epoch_store: &Arc<AuthorityPerEpochStore>,
1119    ) -> IotaResult<TransactionEffects> {
1120        let _metrics_guard = if certificate.contains_shared_object() {
1121            self.metrics
1122                .execute_certificate_latency_shared_object
1123                .start_timer()
1124        } else {
1125            self.metrics
1126                .execute_certificate_latency_single_writer
1127                .start_timer()
1128        };
1129        debug!("execute_certificate");
1130
1131        self.metrics.total_cert_attempts.inc();
1132
1133        if !certificate.contains_shared_object() {
1134            // Shared object transactions need to be sequenced by the consensus before
1135            // enqueueing for execution, done in
1136            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1137            // object transactions, they can be enqueued for execution immediately.
1138            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1139        }
1140
1141        // tx could be reverted when epoch ends, so we must be careful not to return a
1142        // result here after the epoch ends.
1143        epoch_store
1144            .within_alive_epoch(self.notify_read_effects(certificate))
1145            .await
1146            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1147            .and_then(|r| r)
1148    }
1149
1150    /// Internal logic to execute a certificate.
1151    ///
1152    /// Guarantees that
1153    /// - If input objects are available, return no permanent failure.
1154    /// - Execution and output commit are atomic. i.e. outputs are only written
1155    ///   to storage,
1156    /// on successful execution; crashed execution has no observable effect and
1157    /// can be retried.
1158    ///
1159    /// It is caller's responsibility to ensure input objects are available and
1160    /// locks are set. If this cannot be satisfied by the caller,
1161    /// execute_certificate() should be called instead.
1162    ///
1163    /// Should only be called within iota-core.
1164    #[instrument(level = "trace", skip_all)]
1165    pub async fn try_execute_immediately(
1166        &self,
1167        certificate: &VerifiedExecutableTransaction,
1168        mut expected_effects_digest: Option<TransactionEffectsDigest>,
1169        epoch_store: &Arc<AuthorityPerEpochStore>,
1170    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1171        let _scope = monitored_scope("Execution::try_execute_immediately");
1172        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1173        debug!("execute_certificate_internal");
1174
1175        let tx_digest = certificate.digest();
1176
1177        // Acquire a lock to prevent concurrent executions of the same transaction.
1178        let tx_guard = epoch_store.acquire_tx_guard(certificate).await?;
1179
1180        // The cert could have been processed by a concurrent attempt of the same cert,
1181        // so check if the effects have already been written.
1182        if let Some(effects) = self
1183            .get_transaction_cache_reader()
1184            .get_executed_effects(tx_digest)?
1185        {
1186            tx_guard.release();
1187            return Ok((effects, None));
1188        }
1189        let input_objects =
1190            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1191
1192        if expected_effects_digest.is_none() {
1193            // We could be re-executing a previously executed but uncommitted transaction,
1194            // perhaps after restarting with a new binary. In this situation, if
1195            // we have published an effects signature, we must be sure not to
1196            // equivocate. TODO: read from cache instead of DB
1197            expected_effects_digest = epoch_store.get_signed_effects_digest(tx_digest)?;
1198        }
1199
1200        self.process_certificate(
1201            tx_guard,
1202            certificate,
1203            input_objects,
1204            expected_effects_digest,
1205            epoch_store,
1206        )
1207        .await
1208        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1209    }
1210
1211    pub fn read_objects_for_execution(
1212        &self,
1213        tx_lock: &CertLockGuard,
1214        certificate: &VerifiedExecutableTransaction,
1215        epoch_store: &Arc<AuthorityPerEpochStore>,
1216    ) -> IotaResult<InputObjects> {
1217        let _scope = monitored_scope("Execution::load_input_objects");
1218        let _metrics_guard = self
1219            .metrics
1220            .execution_load_input_objects_latency
1221            .start_timer();
1222        let input_objects = &certificate.data().transaction_data().input_objects()?;
1223        self.input_loader.read_objects_for_execution(
1224            epoch_store.as_ref(),
1225            &certificate.key(),
1226            tx_lock,
1227            input_objects,
1228            epoch_store.epoch(),
1229        )
1230    }
1231
1232    /// Test only wrapper for `try_execute_immediately()` above, useful for
1233    /// checking errors if the pre-conditions are not satisfied, and
1234    /// executing change epoch transactions.
1235    pub async fn try_execute_for_test(
1236        &self,
1237        certificate: &VerifiedCertificate,
1238    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1239        let epoch_store = self.epoch_store_for_testing();
1240        let (effects, execution_error_opt) = self
1241            .try_execute_immediately(
1242                &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1243                None,
1244                &epoch_store,
1245            )
1246            .await?;
1247        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1248        Ok((signed_effects, execution_error_opt))
1249    }
1250
1251    pub async fn notify_read_effects(
1252        &self,
1253        certificate: &VerifiedCertificate,
1254    ) -> IotaResult<TransactionEffects> {
1255        self.get_transaction_cache_reader()
1256            .notify_read_executed_effects(&[*certificate.digest()])
1257            .await
1258            .map(|mut r| r.pop().expect("must return correct number of effects"))
1259    }
1260
1261    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1262        self.get_object_cache_reader()
1263            .check_owned_objects_are_live(owned_object_refs)
1264    }
1265
1266    /// This function captures the required state to debug a forked transaction.
1267    /// The dump is written to a file in dir `path`, with name prefixed by the
1268    /// transaction digest. NOTE: Since this info escapes the validator
1269    /// context, make sure not to leak any private info here
1270    pub(crate) fn debug_dump_transaction_state(
1271        &self,
1272        tx_digest: &TransactionDigest,
1273        effects: &TransactionEffects,
1274        expected_effects_digest: TransactionEffectsDigest,
1275        inner_temporary_store: &InnerTemporaryStore,
1276        certificate: &VerifiedExecutableTransaction,
1277        debug_dump_config: &StateDebugDumpConfig,
1278    ) -> IotaResult<PathBuf> {
1279        let dump_dir = debug_dump_config
1280            .dump_file_directory
1281            .as_ref()
1282            .cloned()
1283            .unwrap_or(std::env::temp_dir());
1284        let epoch_store = self.load_epoch_store_one_call_per_task();
1285
1286        NodeStateDump::new(
1287            tx_digest,
1288            effects,
1289            expected_effects_digest,
1290            self.get_object_store().as_ref(),
1291            &epoch_store,
1292            inner_temporary_store,
1293            certificate,
1294        )?
1295        .write_to_file(&dump_dir)
1296        .map_err(|e| IotaError::FileIO(e.to_string()))
1297    }
1298
1299    #[instrument(level = "trace", skip_all)]
1300    pub(crate) async fn process_certificate(
1301        &self,
1302        tx_guard: CertTxGuard,
1303        certificate: &VerifiedExecutableTransaction,
1304        input_objects: InputObjects,
1305        expected_effects_digest: Option<TransactionEffectsDigest>,
1306        epoch_store: &Arc<AuthorityPerEpochStore>,
1307    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1308        let process_certificate_start_time = tokio::time::Instant::now();
1309        let digest = *certificate.digest();
1310
1311        fail_point_if!("correlated-crash-process-certificate", || {
1312            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1313                iota_simulator::task::kill_current_node(None);
1314            }
1315        });
1316
1317        let execution_guard = self
1318            .execution_lock_for_executable_transaction(certificate)
1319            .await;
1320        // Any caller that verifies the signatures on the certificate will have already
1321        // checked the epoch. But paths that don't verify sigs (e.g. execution
1322        // from checkpoint, reading from db) present the possibility of an epoch
1323        // mismatch. If this cert is not finalzied in previous epoch, then it's
1324        // invalid.
1325        let execution_guard = match execution_guard {
1326            Ok(execution_guard) => execution_guard,
1327            Err(err) => {
1328                tx_guard.release();
1329                return Err(err);
1330            }
1331        };
1332        // Since we obtain a reference to the epoch store before taking the execution
1333        // lock, it's possible that reconfiguration has happened and they no
1334        // longer match.
1335        if *execution_guard != epoch_store.epoch() {
1336            tx_guard.release();
1337            info!("The epoch of the execution_guard doesn't match the epoch store");
1338            return Err(IotaError::WrongEpoch {
1339                expected_epoch: epoch_store.epoch(),
1340                actual_epoch: *execution_guard,
1341            });
1342        }
1343
1344        // Errors originating from prepare_certificate may be transient (failure to read
1345        // locks) or non-transient (transaction input is invalid, move vm
1346        // errors). However, all errors from this function occur before we have
1347        // written anything to the db, so we commit the tx guard and rely on the
1348        // client to retry the tx (if it was transient).
1349        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1350            &execution_guard,
1351            certificate,
1352            input_objects,
1353            epoch_store,
1354        ) {
1355            Err(e) => {
1356                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1357                tx_guard.release();
1358                return Err(e);
1359            }
1360            Ok(res) => res,
1361        };
1362
1363        if let Some(expected_effects_digest) = expected_effects_digest {
1364            if effects.digest() != expected_effects_digest {
1365                // We dont want to mask the original error, so we log it and continue.
1366                match self.debug_dump_transaction_state(
1367                    &digest,
1368                    &effects,
1369                    expected_effects_digest,
1370                    &inner_temporary_store,
1371                    certificate,
1372                    &self.config.state_debug_dump_config,
1373                ) {
1374                    Ok(out_path) => {
1375                        info!(
1376                            "Dumped node state for transaction {} to {}",
1377                            digest,
1378                            out_path.as_path().display().to_string()
1379                        );
1380                    }
1381                    Err(e) => {
1382                        error!("Error dumping state for transaction {}: {e}", digest);
1383                    }
1384                }
1385                error!(
1386                    tx_digest = ?digest,
1387                    ?expected_effects_digest,
1388                    actual_effects = ?effects,
1389                    "fork detected!"
1390                );
1391                panic!(
1392                    "Transaction {} is expected to have effects digest {}, but got {}!",
1393                    digest,
1394                    expected_effects_digest,
1395                    effects.digest(),
1396                );
1397            }
1398        }
1399
1400        fail_point_async!("crash");
1401
1402        self.commit_certificate(
1403            certificate,
1404            inner_temporary_store,
1405            &effects,
1406            tx_guard,
1407            execution_guard,
1408            epoch_store,
1409        )
1410        .await?;
1411
1412        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1413            certificate.data().transaction_data().kind()
1414        {
1415            if let Some(err) = &execution_error_opt {
1416                error!("Authenticator state update failed: {err}");
1417                self.metrics.authenticator_state_update_failed.inc();
1418            }
1419            debug_assert!(execution_error_opt.is_none());
1420            epoch_store.update_authenticator_state(auth_state);
1421
1422            // double check that the signature verifier always matches the authenticator
1423            // state
1424            if cfg!(debug_assertions) {
1425                let authenticator_state = get_authenticator_state(self.get_object_store())
1426                    .expect("Read cannot fail")
1427                    .expect("Authenticator state must exist");
1428
1429                let mut sys_jwks: Vec<_> = authenticator_state
1430                    .active_jwks
1431                    .into_iter()
1432                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1433                    .collect();
1434                let mut active_jwks: Vec<_> = epoch_store
1435                    .signature_verifier
1436                    .get_jwks()
1437                    .into_iter()
1438                    .collect();
1439                sys_jwks.sort();
1440                active_jwks.sort();
1441
1442                assert_eq!(sys_jwks, active_jwks);
1443            }
1444        }
1445
1446        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1447        if elapsed > 0.0 {
1448            self.metrics
1449                .execution_gas_latency_ratio
1450                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1451        };
1452        Ok((effects, execution_error_opt))
1453    }
1454
1455    #[instrument(level = "trace", skip_all)]
1456    async fn commit_certificate(
1457        &self,
1458        certificate: &VerifiedExecutableTransaction,
1459        inner_temporary_store: InnerTemporaryStore,
1460        effects: &TransactionEffects,
1461        tx_guard: CertTxGuard,
1462        _execution_guard: ExecutionLockReadGuard<'_>,
1463        epoch_store: &Arc<AuthorityPerEpochStore>,
1464    ) -> IotaResult {
1465        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1466            monitored_scope("Execution::commit_certificate");
1467        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1468
1469        let tx_key = certificate.key();
1470        let tx_digest = certificate.digest();
1471        let input_object_count = inner_temporary_store.input_objects.len();
1472        let shared_object_count = effects.input_shared_objects().len();
1473
1474        let output_keys = inner_temporary_store.get_output_keys(effects);
1475
1476        // index certificate
1477        let _ = self
1478            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1479            .await
1480            .tap_err(|e| {
1481                self.metrics.post_processing_total_failures.inc();
1482                error!(?tx_digest, "tx post processing failed: {e}");
1483            });
1484
1485        // The insertion to epoch_store is not atomic with the insertion to the
1486        // perpetual store. This is OK because we insert to the epoch store
1487        // first. And during lookups we always look up in the perpetual store first.
1488        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1489
1490        // Allow testing what happens if we crash here.
1491        fail_point_async!("crash");
1492
1493        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1494            certificate.clone().into_unsigned(),
1495            effects.clone(),
1496            inner_temporary_store,
1497        );
1498        self.get_cache_writer()
1499            .write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())
1500            .await?;
1501
1502        if certificate.transaction_data().is_end_of_epoch_tx() {
1503            // At the end of epoch, since system packages may have been upgraded, force
1504            // reload them in the cache.
1505            self.get_object_cache_reader()
1506                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1507        }
1508
1509        // commit_certificate finished, the tx is fully committed to the store.
1510        tx_guard.commit_tx();
1511
1512        // Notifies transaction manager about transaction and output objects committed.
1513        // This provides necessary information to transaction manager to start executing
1514        // additional ready transactions.
1515        self.transaction_manager
1516            .notify_commit(tx_digest, output_keys, epoch_store);
1517
1518        self.update_metrics(certificate, input_object_count, shared_object_count);
1519
1520        Ok(())
1521    }
1522
1523    fn update_metrics(
1524        &self,
1525        certificate: &VerifiedExecutableTransaction,
1526        input_object_count: usize,
1527        shared_object_count: usize,
1528    ) {
1529        // count signature by scheme, for zklogin and multisig
1530        if certificate.has_zklogin_sig() {
1531            self.metrics.zklogin_sig_count.inc();
1532        } else if certificate.has_upgraded_multisig() {
1533            self.metrics.multisig_sig_count.inc();
1534        }
1535
1536        self.metrics.total_effects.inc();
1537        self.metrics.total_certs.inc();
1538
1539        if shared_object_count > 0 {
1540            self.metrics.shared_obj_tx.inc();
1541        }
1542
1543        if certificate.is_sponsored_tx() {
1544            self.metrics.sponsored_tx.inc();
1545        }
1546
1547        self.metrics
1548            .num_input_objs
1549            .observe(input_object_count as f64);
1550        self.metrics
1551            .num_shared_objects
1552            .observe(shared_object_count as f64);
1553        self.metrics.batch_size.observe(
1554            certificate
1555                .data()
1556                .intent_message()
1557                .value
1558                .kind()
1559                .num_commands() as f64,
1560        );
1561    }
1562
1563    /// prepare_certificate validates the transaction input, and executes the
1564    /// certificate, returning effects, output objects, events, etc.
1565    ///
1566    /// It reads state from the db (both owned and shared locks), but it has no
1567    /// side effects.
1568    ///
1569    /// It can be generally understood that a failure of prepare_certificate
1570    /// indicates a non-transient error, e.g. the transaction input is
1571    /// somehow invalid, the correct locks are not held, etc. However, this
1572    /// is not entirely true, as a transient db read error may also cause
1573    /// this function to fail.
1574    #[instrument(level = "trace", skip_all)]
1575    fn prepare_certificate(
1576        &self,
1577        _execution_guard: &ExecutionLockReadGuard<'_>,
1578        certificate: &VerifiedExecutableTransaction,
1579        input_objects: InputObjects,
1580        epoch_store: &Arc<AuthorityPerEpochStore>,
1581    ) -> IotaResult<(
1582        InnerTemporaryStore,
1583        TransactionEffects,
1584        Option<ExecutionError>,
1585    )> {
1586        let _scope = monitored_scope("Execution::prepare_certificate");
1587        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1588        let prepare_certificate_start_time = tokio::time::Instant::now();
1589
1590        // TODO: We need to move this to a more appropriate place to avoid redundant
1591        // checks.
1592        let tx_data = certificate.data().transaction_data();
1593        tx_data.validity_check(epoch_store.protocol_config())?;
1594
1595        // The cost of partially re-auditing a transaction before execution is
1596        // tolerated.
1597        let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1598            certificate,
1599            input_objects,
1600            epoch_store.protocol_config(),
1601            epoch_store.reference_gas_price(),
1602        )?;
1603
1604        let owned_object_refs = input_objects.inner().filter_owned_objects();
1605        self.check_owned_locks(&owned_object_refs)?;
1606        let tx_digest = *certificate.digest();
1607        let protocol_config = epoch_store.protocol_config();
1608        let transaction_data = &certificate.data().intent_message().value;
1609        let (kind, signer, gas) = transaction_data.execution_parts();
1610
1611        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1612        let (inner_temp_store, _, mut effects, execution_error_opt) =
1613            epoch_store.executor().execute_transaction_to_effects(
1614                self.get_backing_store().as_ref(),
1615                protocol_config,
1616                self.metrics.limits_metrics.clone(),
1617                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1618                // cyclic dependency w/ iota-adapter
1619                self.config
1620                    .expensive_safety_check_config
1621                    .enable_deep_per_tx_iota_conservation_check(),
1622                self.config.certificate_deny_config.certificate_deny_set(),
1623                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1624                epoch_store
1625                    .epoch_start_config()
1626                    .epoch_data()
1627                    .epoch_start_timestamp(),
1628                input_objects,
1629                gas,
1630                gas_status,
1631                kind,
1632                signer,
1633                tx_digest,
1634                &mut None,
1635            );
1636
1637        fail_point_if!("cp_execution_nondeterminism", || {
1638            #[cfg(msim)]
1639            self.create_fail_state(certificate, epoch_store, &mut effects);
1640        });
1641
1642        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1643        if elapsed > 0.0 {
1644            self.metrics
1645                .prepare_cert_gas_latency_ratio
1646                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1647        }
1648
1649        Ok((inner_temp_store, effects, execution_error_opt.err()))
1650    }
1651
1652    pub fn prepare_certificate_for_benchmark(
1653        &self,
1654        certificate: &VerifiedExecutableTransaction,
1655        input_objects: InputObjects,
1656        epoch_store: &Arc<AuthorityPerEpochStore>,
1657    ) -> IotaResult<(
1658        InnerTemporaryStore,
1659        TransactionEffects,
1660        Option<ExecutionError>,
1661    )> {
1662        let lock: RwLock<EpochId> = RwLock::new(epoch_store.epoch());
1663        let execution_guard = lock.try_read().unwrap();
1664
1665        self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1666    }
1667
1668    pub async fn dry_exec_transaction(
1669        &self,
1670        transaction: TransactionData,
1671        transaction_digest: TransactionDigest,
1672    ) -> IotaResult<(
1673        DryRunTransactionBlockResponse,
1674        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1675        TransactionEffects,
1676        Option<ObjectID>,
1677    )> {
1678        let epoch_store = self.load_epoch_store_one_call_per_task();
1679        if !self.is_fullnode(&epoch_store) {
1680            return Err(IotaError::UnsupportedFeature {
1681                error: "dry-exec is only supported on fullnodes".to_string(),
1682            });
1683        }
1684
1685        if transaction.kind().is_system_tx() {
1686            return Err(IotaError::UnsupportedFeature {
1687                error: "dry-exec does not support system transactions".to_string(),
1688            });
1689        }
1690
1691        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1692            .await
1693    }
1694
1695    pub async fn dry_exec_transaction_for_benchmark(
1696        &self,
1697        transaction: TransactionData,
1698        transaction_digest: TransactionDigest,
1699    ) -> IotaResult<(
1700        DryRunTransactionBlockResponse,
1701        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1702        TransactionEffects,
1703        Option<ObjectID>,
1704    )> {
1705        let epoch_store = self.load_epoch_store_one_call_per_task();
1706        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1707            .await
1708    }
1709
1710    async fn dry_exec_transaction_impl(
1711        &self,
1712        epoch_store: &AuthorityPerEpochStore,
1713        transaction: TransactionData,
1714        transaction_digest: TransactionDigest,
1715    ) -> IotaResult<(
1716        DryRunTransactionBlockResponse,
1717        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1718        TransactionEffects,
1719        Option<ObjectID>,
1720    )> {
1721        // Cheap validity checks for a transaction, including input size limits.
1722        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1723
1724        let input_object_kinds = transaction.input_objects()?;
1725        let receiving_object_refs = transaction.receiving_objects();
1726
1727        iota_transaction_checks::deny::check_transaction_for_signing(
1728            &transaction,
1729            &[],
1730            &input_object_kinds,
1731            &receiving_object_refs,
1732            &self.config.transaction_deny_config,
1733            self.get_backing_package_store().as_ref(),
1734        )?;
1735
1736        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1737            // We don't want to cache this transaction since it's a dry run.
1738            None,
1739            &input_object_kinds,
1740            &receiving_object_refs,
1741            epoch_store.epoch(),
1742        )?;
1743
1744        // make a gas object if one was not provided
1745        let mut gas_object_refs = transaction.gas().to_vec();
1746        let reference_gas_price = epoch_store.reference_gas_price();
1747        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1748            let sender = transaction.sender();
1749            // use a 1B iota coin
1750            const NANOS_TO_IOTA: u64 = 1_000_000_000;
1751            const DRY_RUN_IOTA: u64 = 1_000_000_000;
1752            let max_coin_value = NANOS_TO_IOTA * DRY_RUN_IOTA;
1753            let gas_object_id = ObjectID::random();
1754            let gas_object = Object::new_move(
1755                MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
1756                Owner::AddressOwner(sender),
1757                TransactionDigest::genesis_marker(),
1758            );
1759            let gas_object_ref = gas_object.compute_object_reference();
1760            gas_object_refs = vec![gas_object_ref];
1761            (
1762                iota_transaction_checks::check_transaction_input_with_given_gas(
1763                    epoch_store.protocol_config(),
1764                    reference_gas_price,
1765                    &transaction,
1766                    input_objects,
1767                    receiving_objects,
1768                    gas_object,
1769                    &self.metrics.bytecode_verifier_metrics,
1770                    &self.config.verifier_signing_config,
1771                )?,
1772                Some(gas_object_id),
1773            )
1774        } else {
1775            (
1776                iota_transaction_checks::check_transaction_input(
1777                    epoch_store.protocol_config(),
1778                    reference_gas_price,
1779                    &transaction,
1780                    input_objects,
1781                    &receiving_objects,
1782                    &self.metrics.bytecode_verifier_metrics,
1783                    &self.config.verifier_signing_config,
1784                )?,
1785                None,
1786            )
1787        };
1788
1789        let protocol_config = epoch_store.protocol_config();
1790        let (kind, signer, _) = transaction.execution_parts();
1791
1792        let silent = true;
1793        let executor = iota_execution::executor(protocol_config, silent, None)
1794            .expect("Creating an executor should not fail here");
1795
1796        let expensive_checks = false;
1797        let (inner_temp_store, _, effects, _execution_error) = executor
1798            .execute_transaction_to_effects(
1799                self.get_backing_store().as_ref(),
1800                protocol_config,
1801                self.metrics.limits_metrics.clone(),
1802                expensive_checks,
1803                self.config.certificate_deny_config.certificate_deny_set(),
1804                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1805                epoch_store
1806                    .epoch_start_config()
1807                    .epoch_data()
1808                    .epoch_start_timestamp(),
1809                checked_input_objects,
1810                gas_object_refs,
1811                gas_status,
1812                kind,
1813                signer,
1814                transaction_digest,
1815                &mut None,
1816            );
1817        let tx_digest = *effects.transaction_digest();
1818
1819        let module_cache =
1820            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1821
1822        let mut layout_resolver =
1823            epoch_store
1824                .executor()
1825                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1826                    &inner_temp_store,
1827                    self.get_backing_package_store(),
1828                )));
1829        // Returning empty vector here because we recalculate changes in the rpc layer.
1830        let object_changes = Vec::new();
1831
1832        // Returning empty vector here because we recalculate changes in the rpc layer.
1833        let balance_changes = Vec::new();
1834
1835        let written_with_kind = effects
1836            .created()
1837            .into_iter()
1838            .map(|(oref, _)| (oref, WriteKind::Create))
1839            .chain(
1840                effects
1841                    .unwrapped()
1842                    .into_iter()
1843                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1844            )
1845            .chain(
1846                effects
1847                    .mutated()
1848                    .into_iter()
1849                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
1850            )
1851            .map(|(oref, kind)| {
1852                let obj = inner_temp_store.written.get(&oref.0).unwrap();
1853                // TODO: Avoid clones.
1854                (oref.0, (oref, obj.clone(), kind))
1855            })
1856            .collect();
1857
1858        Ok((
1859            DryRunTransactionBlockResponse {
1860                input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1861                    .map_err(|e| IotaError::TransactionSerialization {
1862                        error: format!(
1863                            "Failed to convert transaction to IotaTransactionBlockData: {e}",
1864                        ),
1865                    })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
1866                effects: effects.clone().try_into()?,
1867                events: IotaTransactionBlockEvents::try_from(
1868                    inner_temp_store.events.clone(),
1869                    tx_digest,
1870                    None,
1871                    layout_resolver.as_mut(),
1872                )?,
1873                object_changes,
1874                balance_changes,
1875            },
1876            written_with_kind,
1877            effects,
1878            mock_gas,
1879        ))
1880    }
1881
1882    /// The object ID for gas can be any object ID, even for an uncreated object
1883    pub async fn dev_inspect_transaction_block(
1884        &self,
1885        sender: IotaAddress,
1886        transaction_kind: TransactionKind,
1887        gas_price: Option<u64>,
1888        gas_budget: Option<u64>,
1889        gas_sponsor: Option<IotaAddress>,
1890        gas_objects: Option<Vec<ObjectRef>>,
1891        show_raw_txn_data_and_effects: Option<bool>,
1892        skip_checks: Option<bool>,
1893    ) -> IotaResult<DevInspectResults> {
1894        let epoch_store = self.load_epoch_store_one_call_per_task();
1895
1896        if !self.is_fullnode(&epoch_store) {
1897            return Err(IotaError::UnsupportedFeature {
1898                error: "dev-inspect is only supported on fullnodes".to_string(),
1899            });
1900        }
1901
1902        if transaction_kind.is_system_tx() {
1903            return Err(IotaError::UnsupportedFeature {
1904                error: "system transactions are not supported".to_string(),
1905            });
1906        }
1907
1908        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
1909        let skip_checks = skip_checks.unwrap_or(true);
1910        let reference_gas_price = epoch_store.reference_gas_price();
1911        let protocol_config = epoch_store.protocol_config();
1912        let max_tx_gas = protocol_config.max_tx_gas();
1913
1914        let price = gas_price.unwrap_or(reference_gas_price);
1915        let budget = gas_budget.unwrap_or(max_tx_gas);
1916        let owner = gas_sponsor.unwrap_or(sender);
1917        // Payment might be empty here, but it's fine we'll have to deal with it later
1918        // after reading all the input objects.
1919        let payment = gas_objects.unwrap_or_default();
1920        let transaction = TransactionData::V1(TransactionDataV1 {
1921            kind: transaction_kind.clone(),
1922            sender,
1923            gas_data: GasData {
1924                payment,
1925                owner,
1926                price,
1927                budget,
1928            },
1929            expiration: TransactionExpiration::None,
1930        });
1931
1932        let raw_txn_data = if show_raw_txn_data_and_effects {
1933            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
1934                error: "Failed to serialize transaction during dev inspect".to_string(),
1935            })?
1936        } else {
1937            vec![]
1938        };
1939
1940        transaction.validity_check_no_gas_check(protocol_config)?;
1941
1942        let input_object_kinds = transaction.input_objects()?;
1943        let receiving_object_refs = transaction.receiving_objects();
1944
1945        iota_transaction_checks::deny::check_transaction_for_signing(
1946            &transaction,
1947            &[],
1948            &input_object_kinds,
1949            &receiving_object_refs,
1950            &self.config.transaction_deny_config,
1951            self.get_backing_package_store().as_ref(),
1952        )?;
1953
1954        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1955            // We don't want to cache this transaction since it's a dev inspect.
1956            None,
1957            &input_object_kinds,
1958            &receiving_object_refs,
1959            epoch_store.epoch(),
1960        )?;
1961
1962        // Create and use a dummy gas object if there is no gas object provided.
1963        let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
1964            DEV_INSPECT_GAS_COIN_VALUE,
1965            transaction.gas_owner(),
1966        );
1967
1968        let gas_objects = if transaction.gas().is_empty() {
1969            let gas_object_ref = dummy_gas_object.compute_object_reference();
1970            vec![gas_object_ref]
1971        } else {
1972            transaction.gas().to_vec()
1973        };
1974
1975        let (gas_status, checked_input_objects) = if skip_checks {
1976            // If we are skipping checks, then we call the check_dev_inspect_input function
1977            // which will perform only lightweight checks on the transaction
1978            // input. And if the gas field is empty, that means we will
1979            // use the dummy gas object so we need to add it to the input objects vector.
1980            if transaction.gas().is_empty() {
1981                input_objects.push(ObjectReadResult::new(
1982                    InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
1983                    dummy_gas_object.into(),
1984                ));
1985            }
1986            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
1987                protocol_config,
1988                &transaction_kind,
1989                input_objects,
1990                receiving_objects,
1991            )?;
1992            let gas_status = IotaGasStatus::new(
1993                max_tx_gas,
1994                transaction.gas_price(),
1995                reference_gas_price,
1996                protocol_config,
1997            )?;
1998
1999            (gas_status, checked_input_objects)
2000        } else {
2001            // If we are not skipping checks, then we call the check_transaction_input
2002            // function and its dummy gas variant which will perform full
2003            // fledged checks just like a real transaction execution.
2004            if transaction.gas().is_empty() {
2005                iota_transaction_checks::check_transaction_input_with_given_gas(
2006                    epoch_store.protocol_config(),
2007                    reference_gas_price,
2008                    &transaction,
2009                    input_objects,
2010                    receiving_objects,
2011                    dummy_gas_object,
2012                    &self.metrics.bytecode_verifier_metrics,
2013                    &self.config.verifier_signing_config,
2014                )?
2015            } else {
2016                iota_transaction_checks::check_transaction_input(
2017                    epoch_store.protocol_config(),
2018                    reference_gas_price,
2019                    &transaction,
2020                    input_objects,
2021                    &receiving_objects,
2022                    &self.metrics.bytecode_verifier_metrics,
2023                    &self.config.verifier_signing_config,
2024                )?
2025            }
2026        };
2027
2028        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2029            .expect("Creating an executor should not fail here");
2030        let intent_msg = IntentMessage::new(
2031            Intent {
2032                version: IntentVersion::V0,
2033                scope: IntentScope::TransactionData,
2034                app_id: AppId::Iota,
2035            },
2036            transaction,
2037        );
2038        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2039        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2040            self.get_backing_store().as_ref(),
2041            protocol_config,
2042            self.metrics.limits_metrics.clone(),
2043            // expensive checks
2044            false,
2045            self.config.certificate_deny_config.certificate_deny_set(),
2046            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2047            epoch_store
2048                .epoch_start_config()
2049                .epoch_data()
2050                .epoch_start_timestamp(),
2051            checked_input_objects,
2052            gas_objects,
2053            gas_status,
2054            transaction_kind,
2055            sender,
2056            transaction_digest,
2057            skip_checks,
2058        );
2059
2060        let raw_effects = if show_raw_txn_data_and_effects {
2061            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2062                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2063            })?
2064        } else {
2065            vec![]
2066        };
2067
2068        let mut layout_resolver =
2069            epoch_store
2070                .executor()
2071                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2072                    &inner_temp_store,
2073                    self.get_backing_package_store(),
2074                )));
2075
2076        DevInspectResults::new(
2077            effects,
2078            inner_temp_store.events.clone(),
2079            execution_result,
2080            raw_txn_data,
2081            raw_effects,
2082            layout_resolver.as_mut(),
2083        )
2084    }
2085
2086    // Only used for testing because of how epoch store is loaded.
2087    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2088        let epoch_store = self.epoch_store_for_testing();
2089        Ok(epoch_store.reference_gas_price())
2090    }
2091
2092    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2093        self.get_transaction_cache_reader()
2094            .is_tx_already_executed(digest)
2095    }
2096
2097    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2098    #[instrument(level = "debug", skip_all, err)]
2099    async fn index_tx(
2100        &self,
2101        indexes: &IndexStore,
2102        digest: &TransactionDigest,
2103        // TODO: index_tx really just need the transaction data here.
2104        cert: &VerifiedExecutableTransaction,
2105        effects: &TransactionEffects,
2106        events: &TransactionEvents,
2107        timestamp_ms: u64,
2108        tx_coins: Option<TxCoins>,
2109        written: &WrittenObjects,
2110        inner_temporary_store: &InnerTemporaryStore,
2111    ) -> IotaResult<u64> {
2112        let changes = self
2113            .process_object_index(effects, written, inner_temporary_store)
2114            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2115
2116        indexes
2117            .index_tx(
2118                cert.data().intent_message().value.sender(),
2119                cert.data()
2120                    .intent_message()
2121                    .value
2122                    .input_objects()?
2123                    .iter()
2124                    .map(|o| o.object_id()),
2125                effects
2126                    .all_changed_objects()
2127                    .into_iter()
2128                    .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2129                cert.data()
2130                    .intent_message()
2131                    .value
2132                    .move_calls()
2133                    .into_iter()
2134                    .map(|(package, module, function)| {
2135                        (*package, module.to_owned(), function.to_owned())
2136                    }),
2137                events,
2138                changes,
2139                digest,
2140                timestamp_ms,
2141                tx_coins,
2142            )
2143            .await
2144    }
2145
2146    #[cfg(msim)]
2147    fn create_fail_state(
2148        &self,
2149        certificate: &VerifiedExecutableTransaction,
2150        epoch_store: &Arc<AuthorityPerEpochStore>,
2151        effects: &mut TransactionEffects,
2152    ) {
2153        use std::cell::RefCell;
2154        thread_local! {
2155            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2156        }
2157        if !certificate.data().intent_message().value.is_system_tx() {
2158            let committee = epoch_store.committee();
2159            let cur_stake = (**committee).weight(&self.name);
2160            if cur_stake > 0 {
2161                FAIL_STATE.with_borrow_mut(|fail_state| {
2162                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2163                    if fail_state.0 < committee.validity_threshold() {
2164                        fail_state.0 += cur_stake;
2165                        fail_state.1.insert(self.name);
2166                    }
2167
2168                    if fail_state.1.contains(&self.name) {
2169                        info!("cp_exec failing tx");
2170                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2171                    }
2172                });
2173            }
2174        }
2175    }
2176
2177    fn process_object_index(
2178        &self,
2179        effects: &TransactionEffects,
2180        written: &WrittenObjects,
2181        inner_temporary_store: &InnerTemporaryStore,
2182    ) -> IotaResult<ObjectIndexChanges> {
2183        let epoch_store = self.load_epoch_store_one_call_per_task();
2184        let mut layout_resolver =
2185            epoch_store
2186                .executor()
2187                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2188                    inner_temporary_store,
2189                    self.get_backing_package_store(),
2190                )));
2191
2192        let modified_at_version = effects
2193            .modified_at_versions()
2194            .into_iter()
2195            .collect::<HashMap<_, _>>();
2196
2197        let tx_digest = effects.transaction_digest();
2198        let mut deleted_owners = vec![];
2199        let mut deleted_dynamic_fields = vec![];
2200        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2201            let old_version = modified_at_version.get(&id).unwrap();
2202            // When we process the index, the latest object hasn't been written yet so
2203            // the old object must be present.
2204            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2205                |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
2206            ) {
2207                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2208                Owner::ObjectOwner(object_id) => {
2209                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2210                }
2211                _ => {}
2212            }
2213        }
2214
2215        let mut new_owners = vec![];
2216        let mut new_dynamic_fields = vec![];
2217
2218        for (oref, owner, kind) in effects.all_changed_objects() {
2219            let id = &oref.0;
2220            // For mutated objects, retrieve old owner and delete old index if there is a
2221            // owner change.
2222            if let WriteKind::Mutate = kind {
2223                let Some(old_version) = modified_at_version.get(id) else {
2224                    panic!(
2225                        "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
2226                        tx_digest
2227                    );
2228                };
2229                // When we process the index, the latest object hasn't been written yet so
2230                // the old object must be present.
2231                let Some(old_object) = self
2232                    .get_object_store()
2233                    .get_object_by_key(id, *old_version)?
2234                else {
2235                    panic!(
2236                        "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
2237                        tx_digest, id, old_version
2238                    );
2239                };
2240                if old_object.owner != owner {
2241                    match old_object.owner {
2242                        Owner::AddressOwner(addr) => {
2243                            deleted_owners.push((addr, *id));
2244                        }
2245                        Owner::ObjectOwner(object_id) => {
2246                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2247                        }
2248                        _ => {}
2249                    }
2250                }
2251            }
2252
2253            match owner {
2254                Owner::AddressOwner(addr) => {
2255                    // TODO: We can remove the object fetching after we added ObjectType to
2256                    // TransactionEffects
2257                    let new_object = written.get(id).unwrap_or_else(
2258                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
2259                    );
2260                    assert_eq!(
2261                        new_object.version(),
2262                        oref.1,
2263                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2264                        tx_digest,
2265                        id,
2266                        new_object.version(),
2267                        oref.1
2268                    );
2269
2270                    let type_ = new_object
2271                        .type_()
2272                        .map(|type_| ObjectType::Struct(type_.clone()))
2273                        .unwrap_or(ObjectType::Package);
2274
2275                    new_owners.push((
2276                        (addr, *id),
2277                        ObjectInfo {
2278                            object_id: *id,
2279                            version: oref.1,
2280                            digest: oref.2,
2281                            type_,
2282                            owner,
2283                            previous_transaction: *effects.transaction_digest(),
2284                        },
2285                    ));
2286                }
2287                Owner::ObjectOwner(owner) => {
2288                    let new_object = written.get(id).unwrap_or_else(
2289                        || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
2290                    );
2291                    assert_eq!(
2292                        new_object.version(),
2293                        oref.1,
2294                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2295                        tx_digest,
2296                        id,
2297                        new_object.version(),
2298                        oref.1
2299                    );
2300
2301                    let Some(df_info) = self
2302                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2303                        .unwrap_or_else(|e| {
2304                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2305                            None
2306                        }
2307                    )
2308                        else {
2309                            // Skip indexing for non dynamic field objects.
2310                            continue;
2311                        };
2312                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2313                }
2314                _ => {}
2315            }
2316        }
2317
2318        Ok(ObjectIndexChanges {
2319            deleted_owners,
2320            deleted_dynamic_fields,
2321            new_owners,
2322            new_dynamic_fields,
2323        })
2324    }
2325
2326    fn try_create_dynamic_field_info(
2327        &self,
2328        o: &Object,
2329        written: &WrittenObjects,
2330        resolver: &mut dyn LayoutResolver,
2331    ) -> IotaResult<Option<DynamicFieldInfo>> {
2332        // Skip if not a move object
2333        let Some(move_object) = o.data.try_as_move().cloned() else {
2334            return Ok(None);
2335        };
2336
2337        // We only index dynamic field objects
2338        if !move_object.type_().is_dynamic_field() {
2339            return Ok(None);
2340        }
2341
2342        let layout = resolver
2343            .get_annotated_layout(&move_object.type_().clone().into())?
2344            .into_layout();
2345
2346        let field =
2347            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2348                IotaError::ObjectDeserialization {
2349                    error: e.to_string(),
2350                }
2351            })?;
2352
2353        let type_ = field.kind;
2354        let name_type: TypeTag = field.name_layout.into();
2355        let bcs_name = field.name_bytes.to_owned();
2356
2357        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2358            .map_err(|e| {
2359                warn!("{e}");
2360                IotaError::ObjectDeserialization {
2361                    error: e.to_string(),
2362                }
2363            })?;
2364
2365        let name = DynamicFieldName {
2366            type_: name_type,
2367            value: IotaMoveValue::from(name_value).to_json_value(),
2368        };
2369
2370        let value_metadata = field.value_metadata().map_err(|e| {
2371            warn!("{e}");
2372            IotaError::ObjectDeserialization {
2373                error: e.to_string(),
2374            }
2375        })?;
2376
2377        Ok(Some(match value_metadata {
2378            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2379                name,
2380                bcs_name,
2381                type_,
2382                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2383                object_id: o.id(),
2384                version: o.version(),
2385                digest: o.digest(),
2386            },
2387
2388            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2389                // Find the actual object from storage using the object id obtained from the
2390                // wrapper.
2391
2392                // Try to find the object in the written objects first.
2393                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2394                    let version = object.version();
2395                    let digest = object.digest();
2396                    let object_type = object.data.type_().unwrap().clone();
2397                    (version, digest, object_type)
2398                } else {
2399                    // If not found, try to find it in the database.
2400                    let object = self
2401                        .get_object_store()
2402                        .get_object_by_key(&object_id, o.version())?
2403                        .ok_or_else(|| UserInputError::ObjectNotFound {
2404                            object_id,
2405                            version: Some(o.version()),
2406                        })?;
2407                    let version = object.version();
2408                    let digest = object.digest();
2409                    let object_type = object.data.type_().unwrap().clone();
2410                    (version, digest, object_type)
2411                };
2412
2413                DynamicFieldInfo {
2414                    name,
2415                    bcs_name,
2416                    type_,
2417                    object_type: object_type.to_string(),
2418                    object_id,
2419                    version,
2420                    digest,
2421                }
2422            }
2423        }))
2424    }
2425
2426    #[instrument(level = "trace", skip_all, err)]
2427    async fn post_process_one_tx(
2428        &self,
2429        certificate: &VerifiedExecutableTransaction,
2430        effects: &TransactionEffects,
2431        inner_temporary_store: &InnerTemporaryStore,
2432        epoch_store: &Arc<AuthorityPerEpochStore>,
2433    ) -> IotaResult {
2434        if self.indexes.is_none() {
2435            return Ok(());
2436        }
2437
2438        let tx_digest = certificate.digest();
2439        let timestamp_ms = Self::unixtime_now_ms();
2440        let events = &inner_temporary_store.events;
2441        let written = &inner_temporary_store.written;
2442        let tx_coins =
2443            self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2444
2445        // Index tx
2446        if let Some(indexes) = &self.indexes {
2447            let _ = self
2448                .index_tx(
2449                    indexes.as_ref(),
2450                    tx_digest,
2451                    certificate,
2452                    effects,
2453                    events,
2454                    timestamp_ms,
2455                    tx_coins,
2456                    written,
2457                    inner_temporary_store,
2458                )
2459                .await
2460                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2461                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2462                .expect("Indexing tx should not fail");
2463
2464            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2465            let events = self.make_transaction_block_events(
2466                events.clone(),
2467                *tx_digest,
2468                timestamp_ms,
2469                epoch_store,
2470                inner_temporary_store,
2471            )?;
2472            // Emit events
2473            self.subscription_handler
2474                .process_tx(certificate.data().transaction_data(), &effects, &events)
2475                .await
2476                .tap_ok(|_| {
2477                    self.metrics
2478                        .post_processing_total_tx_had_event_processed
2479                        .inc()
2480                })
2481                .tap_err(|e| {
2482                    warn!(
2483                        ?tx_digest,
2484                        "Post processing - Couldn't process events for tx: {}", e
2485                    )
2486                })?;
2487
2488            self.metrics
2489                .post_processing_total_events_emitted
2490                .inc_by(events.data.len() as u64);
2491        };
2492        Ok(())
2493    }
2494
2495    fn make_transaction_block_events(
2496        &self,
2497        transaction_events: TransactionEvents,
2498        digest: TransactionDigest,
2499        timestamp_ms: u64,
2500        epoch_store: &Arc<AuthorityPerEpochStore>,
2501        inner_temporary_store: &InnerTemporaryStore,
2502    ) -> IotaResult<IotaTransactionBlockEvents> {
2503        let mut layout_resolver =
2504            epoch_store
2505                .executor()
2506                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2507                    inner_temporary_store,
2508                    self.get_backing_package_store(),
2509                )));
2510        IotaTransactionBlockEvents::try_from(
2511            transaction_events,
2512            digest,
2513            Some(timestamp_ms),
2514            layout_resolver.as_mut(),
2515        )
2516    }
2517
2518    pub fn unixtime_now_ms() -> u64 {
2519        let ts_ms = Utc::now().timestamp_millis();
2520        u64::try_from(ts_ms).expect("Travelling in time machine")
2521    }
2522
2523    #[instrument(level = "trace", skip_all)]
2524    pub async fn handle_transaction_info_request(
2525        &self,
2526        request: TransactionInfoRequest,
2527    ) -> IotaResult<TransactionInfoResponse> {
2528        let epoch_store = self.load_epoch_store_one_call_per_task();
2529        let (transaction, status) = self
2530            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2531            .ok_or(IotaError::TransactionNotFound {
2532                digest: request.transaction_digest,
2533            })?;
2534        Ok(TransactionInfoResponse {
2535            transaction,
2536            status,
2537        })
2538    }
2539
2540    #[instrument(level = "trace", skip_all)]
2541    pub async fn handle_object_info_request(
2542        &self,
2543        request: ObjectInfoRequest,
2544    ) -> IotaResult<ObjectInfoResponse> {
2545        let epoch_store = self.load_epoch_store_one_call_per_task();
2546
2547        let requested_object_seq = match request.request_kind {
2548            ObjectInfoRequestKind::LatestObjectInfo => {
2549                let (_, seq, _) = self
2550                    .get_object_or_tombstone(request.object_id)
2551                    .await?
2552                    .ok_or_else(|| {
2553                        IotaError::from(UserInputError::ObjectNotFound {
2554                            object_id: request.object_id,
2555                            version: None,
2556                        })
2557                    })?;
2558                seq
2559            }
2560            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2561        };
2562
2563        let object = self
2564            .get_object_store()
2565            .get_object_by_key(&request.object_id, requested_object_seq)?
2566            .ok_or_else(|| {
2567                IotaError::from(UserInputError::ObjectNotFound {
2568                    object_id: request.object_id,
2569                    version: Some(requested_object_seq),
2570                })
2571            })?;
2572
2573        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2574            (request.generate_layout, object.data.try_as_move())
2575        {
2576            Some(into_struct_layout(
2577                self.load_epoch_store_one_call_per_task()
2578                    .executor()
2579                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2580                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2581            )?)
2582        } else {
2583            None
2584        };
2585
2586        let lock = if !object.is_address_owned() {
2587            // Only address owned objects have locks.
2588            None
2589        } else {
2590            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2591                .await?
2592                .map(|s| s.into_inner())
2593        };
2594
2595        Ok(ObjectInfoResponse {
2596            object,
2597            layout,
2598            lock_for_debugging: lock,
2599        })
2600    }
2601
2602    #[instrument(level = "trace", skip_all)]
2603    pub fn handle_checkpoint_request(
2604        &self,
2605        request: &CheckpointRequest,
2606    ) -> IotaResult<CheckpointResponse> {
2607        let summary = if request.certified {
2608            let summary = match request.sequence_number {
2609                Some(seq) => self
2610                    .checkpoint_store
2611                    .get_checkpoint_by_sequence_number(seq)?,
2612                None => self.checkpoint_store.get_latest_certified_checkpoint(),
2613            }
2614            .map(|v| v.into_inner());
2615            summary.map(CheckpointSummaryResponse::Certified)
2616        } else {
2617            let summary = match request.sequence_number {
2618                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2619                None => self
2620                    .checkpoint_store
2621                    .get_latest_locally_computed_checkpoint(),
2622            };
2623            summary.map(CheckpointSummaryResponse::Pending)
2624        };
2625        let contents = match &summary {
2626            Some(s) => self
2627                .checkpoint_store
2628                .get_checkpoint_contents(&s.content_digest())?,
2629            None => None,
2630        };
2631        Ok(CheckpointResponse {
2632            checkpoint: summary,
2633            contents,
2634        })
2635    }
2636
2637    fn check_protocol_version(
2638        supported_protocol_versions: SupportedProtocolVersions,
2639        current_version: ProtocolVersion,
2640    ) {
2641        info!("current protocol version is now {:?}", current_version);
2642        info!("supported versions are: {:?}", supported_protocol_versions);
2643        if !supported_protocol_versions.is_version_supported(current_version) {
2644            let msg = format!(
2645                "Unsupported protocol version. The network is at {:?}, but this IotaNode only supports: {:?}. Shutting down.",
2646                current_version, supported_protocol_versions,
2647            );
2648
2649            error!("{}", msg);
2650            eprintln!("{}", msg);
2651
2652            #[cfg(not(msim))]
2653            std::process::exit(1);
2654
2655            #[cfg(msim)]
2656            iota_simulator::task::shutdown_current_node();
2657        }
2658    }
2659
2660    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
2661    pub async fn new(
2662        name: AuthorityName,
2663        secret: StableSyncAuthoritySigner,
2664        supported_protocol_versions: SupportedProtocolVersions,
2665        store: Arc<AuthorityStore>,
2666        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2667        epoch_store: Arc<AuthorityPerEpochStore>,
2668        committee_store: Arc<CommitteeStore>,
2669        indexes: Option<Arc<IndexStore>>,
2670        rest_index: Option<Arc<RestIndexStore>>,
2671        checkpoint_store: Arc<CheckpointStore>,
2672        prometheus_registry: &Registry,
2673        genesis_objects: &[Object],
2674        db_checkpoint_config: &DBCheckpointConfig,
2675        config: NodeConfig,
2676        indirect_objects_threshold: usize,
2677        archive_readers: ArchiveReaderBalancer,
2678        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2679    ) -> Arc<Self> {
2680        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2681
2682        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2683        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2684        let transaction_manager = Arc::new(TransactionManager::new(
2685            execution_cache_trait_pointers.object_cache_reader.clone(),
2686            execution_cache_trait_pointers
2687                .transaction_cache_reader
2688                .clone(),
2689            &epoch_store,
2690            tx_ready_certificates,
2691            metrics.clone(),
2692        ));
2693        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2694
2695        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2696            epoch_store.get_parent_path(),
2697            &config.authority_store_pruning_config,
2698        );
2699        let _pruner = AuthorityStorePruner::new(
2700            store.perpetual_tables.clone(),
2701            checkpoint_store.clone(),
2702            rest_index.clone(),
2703            store.objects_lock_table.clone(),
2704            config.authority_store_pruning_config.clone(),
2705            epoch_store.committee().authority_exists(&name),
2706            epoch_store.epoch_start_state().epoch_duration_ms(),
2707            prometheus_registry,
2708            indirect_objects_threshold,
2709            archive_readers,
2710        );
2711        let input_loader =
2712            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2713        let epoch = epoch_store.epoch();
2714        let state = Arc::new(AuthorityState {
2715            name,
2716            secret,
2717            execution_lock: RwLock::new(epoch),
2718            epoch_store: ArcSwap::new(epoch_store.clone()),
2719            input_loader,
2720            execution_cache_trait_pointers,
2721            indexes,
2722            rest_index,
2723            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2724            checkpoint_store,
2725            committee_store,
2726            transaction_manager,
2727            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2728            metrics,
2729            _pruner,
2730            _authority_per_epoch_pruner,
2731            db_checkpoint_config: db_checkpoint_config.clone(),
2732            config,
2733            overload_info: AuthorityOverloadInfo::default(),
2734            validator_tx_finalizer,
2735        });
2736
2737        // Start a task to execute ready certificates.
2738        let authority_state = Arc::downgrade(&state);
2739        spawn_monitored_task!(execution_process(
2740            authority_state,
2741            rx_ready_certificates,
2742            rx_execution_shutdown,
2743        ));
2744
2745        // TODO: This doesn't belong to the constructor of AuthorityState.
2746        state
2747            .create_owner_index_if_empty(genesis_objects, &epoch_store)
2748            .expect("Error indexing genesis objects.");
2749
2750        state
2751    }
2752
2753    // TODO: Consolidate our traits to reduce the number of methods here.
2754    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2755        &self.execution_cache_trait_pointers.object_cache_reader
2756    }
2757
2758    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2759        &self.execution_cache_trait_pointers.transaction_cache_reader
2760    }
2761
2762    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2763        &self.execution_cache_trait_pointers.cache_writer
2764    }
2765
2766    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2767        &self.execution_cache_trait_pointers.backing_store
2768    }
2769
2770    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2771        &self.execution_cache_trait_pointers.backing_package_store
2772    }
2773
2774    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2775        &self.execution_cache_trait_pointers.object_store
2776    }
2777
2778    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2779        &self.execution_cache_trait_pointers.reconfig_api
2780    }
2781
2782    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2783        &self.execution_cache_trait_pointers.accumulator_store
2784    }
2785
2786    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2787        &self.execution_cache_trait_pointers.checkpoint_cache
2788    }
2789
2790    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2791        &self.execution_cache_trait_pointers.state_sync_store
2792    }
2793
2794    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2795        &self.execution_cache_trait_pointers.cache_commit
2796    }
2797
2798    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2799        self.execution_cache_trait_pointers
2800            .testing_api
2801            .database_for_testing()
2802    }
2803
2804    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2805        &self,
2806        config: NodeConfig,
2807        metrics: Arc<AuthorityStorePruningMetrics>,
2808    ) -> anyhow::Result<()> {
2809        let archive_readers =
2810            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
2811        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
2812            &self.database_for_testing().perpetual_tables,
2813            &self.checkpoint_store,
2814            self.rest_index.as_deref(),
2815            &self.database_for_testing().objects_lock_table,
2816            config.authority_store_pruning_config,
2817            metrics,
2818            config.indirect_objects_threshold,
2819            archive_readers,
2820            EPOCH_DURATION_MS_FOR_TESTING,
2821        )
2822        .await
2823    }
2824
2825    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
2826        &self.transaction_manager
2827    }
2828
2829    /// Adds certificates to transaction manager for ordered execution.
2830    pub fn enqueue_certificates_for_execution(
2831        &self,
2832        certs: Vec<VerifiedCertificate>,
2833        epoch_store: &Arc<AuthorityPerEpochStore>,
2834    ) {
2835        self.transaction_manager
2836            .enqueue_certificates(certs, epoch_store)
2837    }
2838
2839    pub(crate) fn enqueue_with_expected_effects_digest(
2840        &self,
2841        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
2842        epoch_store: &AuthorityPerEpochStore,
2843    ) {
2844        self.transaction_manager
2845            .enqueue_with_expected_effects_digest(certs, epoch_store)
2846    }
2847
2848    fn create_owner_index_if_empty(
2849        &self,
2850        genesis_objects: &[Object],
2851        epoch_store: &Arc<AuthorityPerEpochStore>,
2852    ) -> IotaResult {
2853        let Some(index_store) = &self.indexes else {
2854            return Ok(());
2855        };
2856        if !index_store.is_empty() {
2857            return Ok(());
2858        }
2859
2860        let mut new_owners = vec![];
2861        let mut new_dynamic_fields = vec![];
2862        let mut layout_resolver = epoch_store
2863            .executor()
2864            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
2865        for o in genesis_objects.iter() {
2866            match o.owner {
2867                Owner::AddressOwner(addr) => new_owners.push((
2868                    (addr, o.id()),
2869                    ObjectInfo::new(&o.compute_object_reference(), o),
2870                )),
2871                Owner::ObjectOwner(object_id) => {
2872                    let id = o.id();
2873                    let Some(info) = self.try_create_dynamic_field_info(
2874                        o,
2875                        &BTreeMap::new(),
2876                        layout_resolver.as_mut(),
2877                    )?
2878                    else {
2879                        continue;
2880                    };
2881                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
2882                }
2883                _ => {}
2884            }
2885        }
2886
2887        index_store.insert_genesis_objects(ObjectIndexChanges {
2888            deleted_owners: vec![],
2889            deleted_dynamic_fields: vec![],
2890            new_owners,
2891            new_dynamic_fields,
2892        })
2893    }
2894
2895    /// Attempts to acquire execution lock for an executable transaction.
2896    /// Returns the lock if the transaction is matching current executed epoch
2897    /// Returns None otherwise
2898    pub async fn execution_lock_for_executable_transaction(
2899        &self,
2900        transaction: &VerifiedExecutableTransaction,
2901    ) -> IotaResult<ExecutionLockReadGuard> {
2902        let lock = self.execution_lock.read().await;
2903        if *lock == transaction.auth_sig().epoch() {
2904            Ok(lock)
2905        } else {
2906            Err(IotaError::WrongEpoch {
2907                expected_epoch: *lock,
2908                actual_epoch: transaction.auth_sig().epoch(),
2909            })
2910        }
2911    }
2912
2913    /// Acquires the execution lock for the duration of a transaction signing
2914    /// request. This prevents reconfiguration from starting until we are
2915    /// finished handling the signing request. Otherwise, in-memory lock
2916    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
2917    /// while we are attempting to acquire locks for the transaction.
2918    pub async fn execution_lock_for_signing(&self) -> ExecutionLockReadGuard {
2919        self.execution_lock.read().await
2920    }
2921
2922    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard {
2923        self.execution_lock.write().await
2924    }
2925
2926    #[instrument(level = "error", skip_all)]
2927    pub async fn reconfigure(
2928        &self,
2929        cur_epoch_store: &AuthorityPerEpochStore,
2930        supported_protocol_versions: SupportedProtocolVersions,
2931        new_committee: Committee,
2932        epoch_start_configuration: EpochStartConfiguration,
2933        accumulator: Arc<StateAccumulator>,
2934        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
2935        epoch_supply_change: i64,
2936    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2937        Self::check_protocol_version(
2938            supported_protocol_versions,
2939            epoch_start_configuration
2940                .epoch_start_state()
2941                .protocol_version(),
2942        );
2943        self.metrics.reset_on_reconfigure();
2944        self.committee_store.insert_new_committee(&new_committee)?;
2945
2946        // Wait until no transactions are being executed.
2947        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
2948
2949        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
2950        cur_epoch_store.epoch_terminated().await;
2951
2952        // Safe to reconfigure now. No transactions are being executed,
2953        // and no epoch-specific tasks are running.
2954
2955        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
2956        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
2957        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
2958            .await?;
2959        self.get_reconfig_api()
2960            .clear_state_end_of_epoch(&execution_lock);
2961        self.check_system_consistency(
2962            cur_epoch_store,
2963            accumulator,
2964            expensive_safety_check_config,
2965            epoch_supply_change,
2966        )?;
2967        self.get_reconfig_api()
2968            .set_epoch_start_configuration(&epoch_start_configuration)?;
2969        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
2970            if self
2971                .db_checkpoint_config
2972                .perform_db_checkpoints_at_epoch_end
2973            {
2974                let checkpoint_indexes = self
2975                    .db_checkpoint_config
2976                    .perform_index_db_checkpoints_at_epoch_end
2977                    .unwrap_or(false);
2978                let current_epoch = cur_epoch_store.epoch();
2979                let epoch_checkpoint_path =
2980                    checkpoint_path.join(format!("epoch_{}", current_epoch));
2981                self.checkpoint_all_dbs(
2982                    &epoch_checkpoint_path,
2983                    cur_epoch_store,
2984                    checkpoint_indexes,
2985                )?;
2986            }
2987        }
2988
2989        self.get_reconfig_api()
2990            .reconfigure_cache(&epoch_start_configuration)
2991            .await;
2992
2993        let new_epoch = new_committee.epoch;
2994        let new_epoch_store = self
2995            .reopen_epoch_db(
2996                cur_epoch_store,
2997                new_committee,
2998                epoch_start_configuration,
2999                expensive_safety_check_config,
3000            )
3001            .await?;
3002        assert_eq!(new_epoch_store.epoch(), new_epoch);
3003        self.transaction_manager.reconfigure(new_epoch);
3004        *execution_lock = new_epoch;
3005        // drop execution_lock after epoch store was updated
3006        // see also assert in AuthorityState::process_certificate
3007        // on the epoch store and execution lock epoch match
3008        Ok(new_epoch_store)
3009    }
3010
3011    /// Advance the epoch store to the next epoch for testing only.
3012    /// This only manually sets all the places where we have the epoch number.
3013    /// It doesn't properly reconfigure the node, hence should be only used for
3014    /// testing.
3015    pub async fn reconfigure_for_testing(&self) {
3016        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3017        let epoch_store = self.epoch_store_for_testing().clone();
3018        let protocol_config = epoch_store.protocol_config().clone();
3019        // The current protocol config used in the epoch store may have been overridden
3020        // and diverged from the protocol config definitions. That override may
3021        // have now been dropped when the initial guard was dropped. We reapply
3022        // the override before creating the new epoch store, to make sure that
3023        // the new epoch store has the same protocol config as the current one.
3024        // Since this is for testing only, we mostly like to keep the protocol config
3025        // the same across epochs.
3026        let _guard =
3027            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3028        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3029            self.get_backing_package_store().clone(),
3030            self.get_object_store().clone(),
3031            &self.config.expensive_safety_check_config,
3032        );
3033        let new_epoch = new_epoch_store.epoch();
3034        self.transaction_manager.reconfigure(new_epoch);
3035        self.epoch_store.store(new_epoch_store);
3036        epoch_store.epoch_terminated().await;
3037        *execution_lock = new_epoch;
3038    }
3039
3040    #[instrument(level = "error", skip_all)]
3041    fn check_system_consistency(
3042        &self,
3043        cur_epoch_store: &AuthorityPerEpochStore,
3044        accumulator: Arc<StateAccumulator>,
3045        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3046        epoch_supply_change: i64,
3047    ) -> IotaResult<()> {
3048        info!(
3049            "Performing iota conservation consistency check for epoch {}",
3050            cur_epoch_store.epoch()
3051        );
3052
3053        if cfg!(debug_assertions) {
3054            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3055        }
3056
3057        self.get_reconfig_api()
3058            .expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3059
3060        // check for root state hash consistency with live object set
3061        if expensive_safety_check_config.enable_state_consistency_check() {
3062            info!(
3063                "Performing state consistency check for epoch {}",
3064                cur_epoch_store.epoch()
3065            );
3066            self.expensive_check_is_consistent_state(
3067                accumulator,
3068                cur_epoch_store,
3069                cfg!(debug_assertions), // panic in debug mode only
3070            );
3071        }
3072
3073        if expensive_safety_check_config.enable_secondary_index_checks() {
3074            if let Some(indexes) = self.indexes.clone() {
3075                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3076                    .expect("secondary indexes are inconsistent");
3077            }
3078        }
3079
3080        Ok(())
3081    }
3082
3083    fn expensive_check_is_consistent_state(
3084        &self,
3085        accumulator: Arc<StateAccumulator>,
3086        cur_epoch_store: &AuthorityPerEpochStore,
3087        panic: bool,
3088    ) {
3089        let live_object_set_hash = accumulator.digest_live_object_set();
3090
3091        let root_state_hash: ECMHLiveObjectSetDigest = self
3092            .get_accumulator_store()
3093            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3094            .expect("Retrieving root state hash cannot fail")
3095            .expect("Root state hash for epoch must exist")
3096            .1
3097            .digest()
3098            .into();
3099
3100        let is_inconsistent = root_state_hash != live_object_set_hash;
3101        if is_inconsistent {
3102            if panic {
3103                panic!(
3104                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3105                    root_state_hash, live_object_set_hash
3106                );
3107            } else {
3108                error!(
3109                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3110                    root_state_hash, live_object_set_hash
3111                );
3112            }
3113        } else {
3114            info!("State consistency check passed");
3115        }
3116
3117        if !panic {
3118            accumulator.set_inconsistent_state(is_inconsistent);
3119        }
3120    }
3121
3122    pub fn current_epoch_for_testing(&self) -> EpochId {
3123        self.epoch_store_for_testing().epoch()
3124    }
3125
3126    #[instrument(level = "error", skip_all)]
3127    pub fn checkpoint_all_dbs(
3128        &self,
3129        checkpoint_path: &Path,
3130        cur_epoch_store: &AuthorityPerEpochStore,
3131        checkpoint_indexes: bool,
3132    ) -> IotaResult {
3133        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3134        let current_epoch = cur_epoch_store.epoch();
3135
3136        if checkpoint_path.exists() {
3137            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3138            return Ok(());
3139        }
3140
3141        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3142        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3143
3144        if checkpoint_path_tmp.exists() {
3145            fs::remove_dir_all(&checkpoint_path_tmp)
3146                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3147        }
3148
3149        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3150        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3151
3152        // NOTE: Do not change the order of invoking these checkpoint calls
3153        // We want to snapshot checkpoint db first to not race with state sync
3154        self.checkpoint_store
3155            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3156
3157        self.get_reconfig_api()
3158            .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3159
3160        self.committee_store
3161            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3162
3163        if checkpoint_indexes {
3164            if let Some(indexes) = self.indexes.as_ref() {
3165                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3166            }
3167        }
3168
3169        fs::rename(checkpoint_path_tmp, checkpoint_path)
3170            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3171        Ok(())
3172    }
3173
3174    /// Load the current epoch store. This can change during reconfiguration. To
3175    /// ensure that we never end up accessing different epoch stores in a
3176    /// single task, we need to make sure that this is called once per task.
3177    /// Each call needs to be carefully audited to ensure it is
3178    /// the case. This also means we should minimize the number of call-sites.
3179    /// Only call it when there is no way to obtain it from somewhere else.
3180    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3181        self.epoch_store.load()
3182    }
3183
3184    // Load the epoch store, should be used in tests only.
3185    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3186        self.load_epoch_store_one_call_per_task()
3187    }
3188
3189    pub fn clone_committee_for_testing(&self) -> Committee {
3190        Committee::clone(self.epoch_store_for_testing().committee())
3191    }
3192
3193    #[instrument(level = "trace", skip_all)]
3194    pub async fn get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3195        self.get_object_store()
3196            .get_object(object_id)
3197            .map_err(Into::into)
3198    }
3199
3200    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3201        Ok(self
3202            .get_object(&IOTA_SYSTEM_ADDRESS.into())
3203            .await?
3204            .expect("framework object should always exist")
3205            .compute_object_reference())
3206    }
3207
3208    // This function is only used for testing.
3209    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3210        self.get_object_cache_reader()
3211            .get_iota_system_state_object_unsafe()
3212    }
3213
3214    #[instrument(level = "trace", skip_all)]
3215    pub fn get_checkpoint_by_sequence_number(
3216        &self,
3217        sequence_number: CheckpointSequenceNumber,
3218    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3219        Ok(self
3220            .checkpoint_store
3221            .get_checkpoint_by_sequence_number(sequence_number)?)
3222    }
3223
3224    #[instrument(level = "trace", skip_all)]
3225    pub fn get_transaction_checkpoint_for_tests(
3226        &self,
3227        digest: &TransactionDigest,
3228        epoch_store: &AuthorityPerEpochStore,
3229    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3230        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3231        let Some(checkpoint) = checkpoint else {
3232            return Ok(None);
3233        };
3234        let checkpoint = self
3235            .checkpoint_store
3236            .get_checkpoint_by_sequence_number(checkpoint)?;
3237        Ok(checkpoint)
3238    }
3239
3240    #[instrument(level = "trace", skip_all)]
3241    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3242        Ok(
3243            match self
3244                .get_object_cache_reader()
3245                .get_latest_object_or_tombstone(*object_id)?
3246            {
3247                Some((_, ObjectOrTombstone::Object(object))) => {
3248                    let layout = self.get_object_layout(&object)?;
3249                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3250                }
3251                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3252                None => ObjectRead::NotExists(*object_id),
3253            },
3254        )
3255    }
3256
3257    /// Chain Identifier is the digest of the genesis checkpoint.
3258    pub fn get_chain_identifier(&self) -> Option<ChainIdentifier> {
3259        if let Some(digest) = CHAIN_IDENTIFIER.get() {
3260            return Some(*digest);
3261        }
3262
3263        let checkpoint = self
3264            .get_checkpoint_by_sequence_number(0)
3265            .tap_err(|e| error!("Failed to get genesis checkpoint: {:?}", e))
3266            .ok()?
3267            .tap_none(|| error!("Genesis checkpoint is missing from DB"))?;
3268        // It's ok if the value is already set due to data races.
3269        let _ = CHAIN_IDENTIFIER.set(ChainIdentifier::from(*checkpoint.digest()));
3270        Some(ChainIdentifier::from(*checkpoint.digest()))
3271    }
3272
3273    #[instrument(level = "trace", skip_all)]
3274    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3275    where
3276        T: DeserializeOwned,
3277    {
3278        let o = self.get_object_read(object_id)?.into_object()?;
3279        if let Some(move_object) = o.data.try_as_move() {
3280            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3281                IotaError::ObjectDeserialization {
3282                    error: format!("{e}"),
3283                }
3284            })?)
3285        } else {
3286            Err(IotaError::ObjectDeserialization {
3287                error: format!("Provided object : [{object_id}] is not a Move object."),
3288            })
3289        }
3290    }
3291
3292    /// This function aims to serve rpc reads on past objects and
3293    /// we don't expect it to be called for other purposes.
3294    /// Depending on the object pruning policies that will be enforced in the
3295    /// future there is no software-level guarantee/SLA to retrieve an object
3296    /// with an old version even if it exists/existed.
3297    #[instrument(level = "trace", skip_all)]
3298    pub fn get_past_object_read(
3299        &self,
3300        object_id: &ObjectID,
3301        version: SequenceNumber,
3302    ) -> IotaResult<PastObjectRead> {
3303        // Firstly we see if the object ever existed by getting its latest data
3304        let Some(obj_ref) = self
3305            .get_object_cache_reader()
3306            .get_latest_object_ref_or_tombstone(*object_id)?
3307        else {
3308            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3309        };
3310
3311        if version > obj_ref.1 {
3312            return Ok(PastObjectRead::VersionTooHigh {
3313                object_id: *object_id,
3314                asked_version: version,
3315                latest_version: obj_ref.1,
3316            });
3317        }
3318
3319        if version < obj_ref.1 {
3320            // Read past objects
3321            return Ok(match self.read_object_at_version(object_id, version)? {
3322                Some((object, layout)) => {
3323                    let obj_ref = object.compute_object_reference();
3324                    PastObjectRead::VersionFound(obj_ref, object, layout)
3325                }
3326
3327                None => PastObjectRead::VersionNotFound(*object_id, version),
3328            });
3329        }
3330
3331        if !obj_ref.2.is_alive() {
3332            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3333        }
3334
3335        match self.read_object_at_version(object_id, obj_ref.1)? {
3336            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3337            None => {
3338                error!(
3339                    "Object with in parent_entry is missing from object store, datastore is \
3340                     inconsistent",
3341                );
3342                Err(UserInputError::ObjectNotFound {
3343                    object_id: *object_id,
3344                    version: Some(obj_ref.1),
3345                }
3346                .into())
3347            }
3348        }
3349    }
3350
3351    #[instrument(level = "trace", skip_all)]
3352    fn read_object_at_version(
3353        &self,
3354        object_id: &ObjectID,
3355        version: SequenceNumber,
3356    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3357        let Some(object) = self
3358            .get_object_cache_reader()
3359            .get_object_by_key(object_id, version)?
3360        else {
3361            return Ok(None);
3362        };
3363
3364        let layout = self.get_object_layout(&object)?;
3365        Ok(Some((object, layout)))
3366    }
3367
3368    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3369        let layout = object
3370            .data
3371            .try_as_move()
3372            .map(|object| {
3373                into_struct_layout(
3374                    self.load_epoch_store_one_call_per_task()
3375                        .executor()
3376                        // TODO(cache) - must read through cache
3377                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3378                        .get_annotated_layout(&object.type_().clone().into())?,
3379                )
3380            })
3381            .transpose()?;
3382        Ok(layout)
3383    }
3384
3385    fn get_owner_at_version(
3386        &self,
3387        object_id: &ObjectID,
3388        version: SequenceNumber,
3389    ) -> IotaResult<Owner> {
3390        self.get_object_store()
3391            .get_object_by_key(object_id, version)?
3392            .ok_or_else(|| {
3393                IotaError::from(UserInputError::ObjectNotFound {
3394                    object_id: *object_id,
3395                    version: Some(version),
3396                })
3397            })
3398            .map(|o| o.owner)
3399    }
3400
3401    #[instrument(level = "trace", skip_all)]
3402    pub fn get_owner_objects(
3403        &self,
3404        owner: IotaAddress,
3405        // If `Some`, the query will start from the next item after the specified cursor
3406        cursor: Option<ObjectID>,
3407        limit: usize,
3408        filter: Option<IotaObjectDataFilter>,
3409    ) -> IotaResult<Vec<ObjectInfo>> {
3410        if let Some(indexes) = &self.indexes {
3411            indexes.get_owner_objects(owner, cursor, limit, filter)
3412        } else {
3413            Err(IotaError::IndexStoreNotAvailable)
3414        }
3415    }
3416
3417    #[instrument(level = "trace", skip_all)]
3418    pub fn get_owned_coins_iterator_with_cursor(
3419        &self,
3420        owner: IotaAddress,
3421        // If `Some`, the query will start from the next item after the specified cursor
3422        cursor: (String, ObjectID),
3423        limit: usize,
3424        one_coin_type_only: bool,
3425    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3426        if let Some(indexes) = &self.indexes {
3427            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3428        } else {
3429            Err(IotaError::IndexStoreNotAvailable)
3430        }
3431    }
3432
3433    #[instrument(level = "trace", skip_all)]
3434    pub fn get_owner_objects_iterator(
3435        &self,
3436        owner: IotaAddress,
3437        // If `Some`, the query will start from the next item after the specified cursor
3438        cursor: Option<ObjectID>,
3439        filter: Option<IotaObjectDataFilter>,
3440    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3441        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3442        if let Some(indexes) = &self.indexes {
3443            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3444        } else {
3445            Err(IotaError::IndexStoreNotAvailable)
3446        }
3447    }
3448
3449    #[instrument(level = "trace", skip_all)]
3450    pub async fn get_move_objects<T>(
3451        &self,
3452        owner: IotaAddress,
3453        type_: MoveObjectType,
3454    ) -> IotaResult<Vec<T>>
3455    where
3456        T: DeserializeOwned,
3457    {
3458        let object_ids = self
3459            .get_owner_objects_iterator(owner, None, None)?
3460            .filter(|o| match &o.type_ {
3461                ObjectType::Struct(s) => &type_ == s,
3462                ObjectType::Package => false,
3463            })
3464            .map(|info| ObjectKey(info.object_id, info.version))
3465            .collect::<Vec<_>>();
3466        let mut move_objects = vec![];
3467
3468        let objects = self
3469            .get_object_store()
3470            .multi_get_objects_by_key(&object_ids)?;
3471
3472        for (o, id) in objects.into_iter().zip(object_ids) {
3473            let object = o.ok_or_else(|| {
3474                IotaError::from(UserInputError::ObjectNotFound {
3475                    object_id: id.0,
3476                    version: Some(id.1),
3477                })
3478            })?;
3479            let move_object = object.data.try_as_move().ok_or_else(|| {
3480                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3481            })?;
3482            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3483                IotaError::ObjectDeserialization {
3484                    error: format!("{e}"),
3485                }
3486            })?);
3487        }
3488        Ok(move_objects)
3489    }
3490
3491    #[instrument(level = "trace", skip_all)]
3492    pub fn get_dynamic_fields(
3493        &self,
3494        owner: ObjectID,
3495        // If `Some`, the query will start from the next item after the specified cursor
3496        cursor: Option<ObjectID>,
3497        limit: usize,
3498    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3499        Ok(self
3500            .get_dynamic_fields_iterator(owner, cursor)?
3501            .take(limit)
3502            .collect::<Result<Vec<_>, _>>()?)
3503    }
3504
3505    fn get_dynamic_fields_iterator(
3506        &self,
3507        owner: ObjectID,
3508        // If `Some`, the query will start from the next item after the specified cursor
3509        cursor: Option<ObjectID>,
3510    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3511    {
3512        if let Some(indexes) = &self.indexes {
3513            indexes.get_dynamic_fields_iterator(owner, cursor)
3514        } else {
3515            Err(IotaError::IndexStoreNotAvailable)
3516        }
3517    }
3518
3519    #[instrument(level = "trace", skip_all)]
3520    pub fn get_dynamic_field_object_id(
3521        &self,
3522        owner: ObjectID,
3523        name_type: TypeTag,
3524        name_bcs_bytes: &[u8],
3525    ) -> IotaResult<Option<ObjectID>> {
3526        if let Some(indexes) = &self.indexes {
3527            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3528        } else {
3529            Err(IotaError::IndexStoreNotAvailable)
3530        }
3531    }
3532
3533    #[instrument(level = "trace", skip_all)]
3534    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3535        Ok(self.get_indexes()?.next_sequence_number())
3536    }
3537
3538    #[instrument(level = "trace", skip_all)]
3539    pub async fn get_executed_transaction_and_effects(
3540        &self,
3541        digest: TransactionDigest,
3542        kv_store: Arc<TransactionKeyValueStore>,
3543    ) -> IotaResult<(Transaction, TransactionEffects)> {
3544        let transaction = kv_store.get_tx(digest).await?;
3545        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3546        Ok((transaction, effects))
3547    }
3548
3549    #[instrument(level = "trace", skip_all)]
3550    pub fn multi_get_checkpoint_by_sequence_number(
3551        &self,
3552        sequence_numbers: &[CheckpointSequenceNumber],
3553    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3554        Ok(self
3555            .checkpoint_store
3556            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3557    }
3558
3559    #[instrument(level = "trace", skip_all)]
3560    pub fn get_transaction_events(
3561        &self,
3562        digest: &TransactionEventsDigest,
3563    ) -> IotaResult<TransactionEvents> {
3564        self.get_transaction_cache_reader()
3565            .get_events(digest)?
3566            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3567    }
3568
3569    pub fn get_transaction_input_objects(
3570        &self,
3571        effects: &TransactionEffects,
3572    ) -> anyhow::Result<Vec<Object>> {
3573        let input_object_keys = effects
3574            .modified_at_versions()
3575            .into_iter()
3576            .map(|(object_id, version)| ObjectKey(object_id, version))
3577            .collect::<Vec<_>>();
3578
3579        let input_objects = self
3580            .get_object_store()
3581            .multi_get_objects_by_key(&input_object_keys)?
3582            .into_iter()
3583            .enumerate()
3584            .map(|(idx, maybe_object)| {
3585                maybe_object.ok_or_else(|| {
3586                    anyhow::anyhow!(
3587                        "missing input object key {:?} from tx {}",
3588                        input_object_keys[idx],
3589                        effects.transaction_digest()
3590                    )
3591                })
3592            })
3593            .collect::<anyhow::Result<Vec<_>>>()?;
3594        Ok(input_objects)
3595    }
3596
3597    pub fn get_transaction_output_objects(
3598        &self,
3599        effects: &TransactionEffects,
3600    ) -> anyhow::Result<Vec<Object>> {
3601        let output_object_keys = effects
3602            .all_changed_objects()
3603            .into_iter()
3604            .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3605            .collect::<Vec<_>>();
3606
3607        let output_objects = self
3608            .get_object_store()
3609            .multi_get_objects_by_key(&output_object_keys)?
3610            .into_iter()
3611            .enumerate()
3612            .map(|(idx, maybe_object)| {
3613                maybe_object.ok_or_else(|| {
3614                    anyhow::anyhow!(
3615                        "missing output object key {:?} from tx {}",
3616                        output_object_keys[idx],
3617                        effects.transaction_digest()
3618                    )
3619                })
3620            })
3621            .collect::<anyhow::Result<Vec<_>>>()?;
3622        Ok(output_objects)
3623    }
3624
3625    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3626        match &self.indexes {
3627            Some(i) => Ok(i.clone()),
3628            None => Err(IotaError::UnsupportedFeature {
3629                error: "extended object indexing is not enabled on this server".into(),
3630            }),
3631        }
3632    }
3633
3634    pub async fn get_transactions_for_tests(
3635        self: &Arc<Self>,
3636        filter: Option<TransactionFilter>,
3637        cursor: Option<TransactionDigest>,
3638        limit: Option<usize>,
3639        reverse: bool,
3640    ) -> IotaResult<Vec<TransactionDigest>> {
3641        let metrics = KeyValueStoreMetrics::new_for_tests();
3642        let kv_store = Arc::new(TransactionKeyValueStore::new(
3643            "rocksdb",
3644            metrics,
3645            self.clone(),
3646        ));
3647        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3648            .await
3649    }
3650
3651    #[instrument(level = "trace", skip_all)]
3652    pub async fn get_transactions(
3653        &self,
3654        kv_store: &Arc<TransactionKeyValueStore>,
3655        filter: Option<TransactionFilter>,
3656        // If `Some`, the query will start from the next item after the specified cursor
3657        cursor: Option<TransactionDigest>,
3658        limit: Option<usize>,
3659        reverse: bool,
3660    ) -> IotaResult<Vec<TransactionDigest>> {
3661        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3662            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3663            let iter = checkpoint_contents.iter().map(|c| c.transaction);
3664            if reverse {
3665                let iter = iter
3666                    .rev()
3667                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3668                    .skip(usize::from(cursor.is_some()));
3669                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3670            } else {
3671                let iter = iter
3672                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3673                    .skip(usize::from(cursor.is_some()));
3674                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3675            }
3676        }
3677        self.get_indexes()?
3678            .get_transactions(filter, cursor, limit, reverse)
3679    }
3680
3681    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3682        &self.checkpoint_store
3683    }
3684
3685    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3686        self.get_checkpoint_store()
3687            .get_highest_executed_checkpoint_seq_number()?
3688            .ok_or(IotaError::UserInput {
3689                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3690            })
3691    }
3692
3693    #[cfg(msim)]
3694    pub fn get_highest_pruned_checkpoint_for_testing(
3695        &self,
3696    ) -> IotaResult<CheckpointSequenceNumber> {
3697        self.database_for_testing()
3698            .perpetual_tables
3699            .get_highest_pruned_checkpoint()
3700    }
3701
3702    #[instrument(level = "trace", skip_all)]
3703    pub fn get_checkpoint_summary_by_sequence_number(
3704        &self,
3705        sequence_number: CheckpointSequenceNumber,
3706    ) -> IotaResult<CheckpointSummary> {
3707        let verified_checkpoint = self
3708            .get_checkpoint_store()
3709            .get_checkpoint_by_sequence_number(sequence_number)?;
3710        match verified_checkpoint {
3711            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3712            None => Err(IotaError::UserInput {
3713                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3714            }),
3715        }
3716    }
3717
3718    #[instrument(level = "trace", skip_all)]
3719    pub fn get_checkpoint_summary_by_digest(
3720        &self,
3721        digest: CheckpointDigest,
3722    ) -> IotaResult<CheckpointSummary> {
3723        let verified_checkpoint = self
3724            .get_checkpoint_store()
3725            .get_checkpoint_by_digest(&digest)?;
3726        match verified_checkpoint {
3727            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3728            None => Err(IotaError::UserInput {
3729                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3730            }),
3731        }
3732    }
3733
3734    #[instrument(level = "trace", skip_all)]
3735    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3736        if is_system_package(package_id) {
3737            return self.find_genesis_txn_digest();
3738        }
3739        Ok(self
3740            .get_object_read(&package_id)?
3741            .into_object()?
3742            .previous_transaction)
3743    }
3744
3745    #[instrument(level = "trace", skip_all)]
3746    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3747        let summary = self
3748            .get_verified_checkpoint_by_sequence_number(0)?
3749            .into_message();
3750        let content = self.get_checkpoint_contents(summary.content_digest)?;
3751        let genesis_transaction = content.enumerate_transactions(&summary).next();
3752        Ok(genesis_transaction
3753            .ok_or(IotaError::UserInput {
3754                error: UserInputError::GenesisTransactionNotFound,
3755            })?
3756            .1
3757            .transaction)
3758    }
3759
3760    #[instrument(level = "trace", skip_all)]
3761    pub fn get_verified_checkpoint_by_sequence_number(
3762        &self,
3763        sequence_number: CheckpointSequenceNumber,
3764    ) -> IotaResult<VerifiedCheckpoint> {
3765        let verified_checkpoint = self
3766            .get_checkpoint_store()
3767            .get_checkpoint_by_sequence_number(sequence_number)?;
3768        match verified_checkpoint {
3769            Some(verified_checkpoint) => Ok(verified_checkpoint),
3770            None => Err(IotaError::UserInput {
3771                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3772            }),
3773        }
3774    }
3775
3776    #[instrument(level = "trace", skip_all)]
3777    pub fn get_verified_checkpoint_summary_by_digest(
3778        &self,
3779        digest: CheckpointDigest,
3780    ) -> IotaResult<VerifiedCheckpoint> {
3781        let verified_checkpoint = self
3782            .get_checkpoint_store()
3783            .get_checkpoint_by_digest(&digest)?;
3784        match verified_checkpoint {
3785            Some(verified_checkpoint) => Ok(verified_checkpoint),
3786            None => Err(IotaError::UserInput {
3787                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3788            }),
3789        }
3790    }
3791
3792    #[instrument(level = "trace", skip_all)]
3793    pub fn get_checkpoint_contents(
3794        &self,
3795        digest: CheckpointContentsDigest,
3796    ) -> IotaResult<CheckpointContents> {
3797        self.get_checkpoint_store()
3798            .get_checkpoint_contents(&digest)?
3799            .ok_or(IotaError::UserInput {
3800                error: UserInputError::CheckpointContentsNotFound(digest),
3801            })
3802    }
3803
3804    #[instrument(level = "trace", skip_all)]
3805    pub fn get_checkpoint_contents_by_sequence_number(
3806        &self,
3807        sequence_number: CheckpointSequenceNumber,
3808    ) -> IotaResult<CheckpointContents> {
3809        let verified_checkpoint = self
3810            .get_checkpoint_store()
3811            .get_checkpoint_by_sequence_number(sequence_number)?;
3812        match verified_checkpoint {
3813            Some(verified_checkpoint) => {
3814                let content_digest = verified_checkpoint.into_inner().content_digest;
3815                self.get_checkpoint_contents(content_digest)
3816            }
3817            None => Err(IotaError::UserInput {
3818                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3819            }),
3820        }
3821    }
3822
3823    #[instrument(level = "trace", skip_all)]
3824    pub async fn query_events(
3825        &self,
3826        kv_store: &Arc<TransactionKeyValueStore>,
3827        query: EventFilter,
3828        // If `Some`, the query will start from the next item after the specified cursor
3829        cursor: Option<EventID>,
3830        limit: usize,
3831        descending: bool,
3832    ) -> IotaResult<Vec<IotaEvent>> {
3833        let index_store = self.get_indexes()?;
3834
3835        // Get the tx_num from tx_digest
3836        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
3837            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
3838                IotaError::TransactionNotFound {
3839                    digest: cursor.tx_digest,
3840                },
3841            )?;
3842            (tx_seq, cursor.event_seq as usize)
3843        } else if descending {
3844            (u64::MAX, usize::MAX)
3845        } else {
3846            (0, 0)
3847        };
3848
3849        let limit = limit + 1;
3850        let mut event_keys = match query {
3851            EventFilter::All(filters) => {
3852                if filters.is_empty() {
3853                    index_store.all_events(tx_num, event_num, limit, descending)?
3854                } else {
3855                    return Err(IotaError::UserInput {
3856                        error: UserInputError::Unsupported(
3857                            "This query type does not currently support filter combinations"
3858                                .to_string(),
3859                        ),
3860                    });
3861                }
3862            }
3863            EventFilter::Transaction(digest) => {
3864                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
3865            }
3866            EventFilter::MoveModule { package, module } => {
3867                let module_id = ModuleId::new(package.into(), module);
3868                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
3869            }
3870            EventFilter::MoveEventType(struct_name) => index_store
3871                .events_by_move_event_struct_name(
3872                    &struct_name,
3873                    tx_num,
3874                    event_num,
3875                    limit,
3876                    descending,
3877                )?,
3878            EventFilter::Sender(sender) => {
3879                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
3880            }
3881            EventFilter::TimeRange {
3882                start_time,
3883                end_time,
3884            } => index_store
3885                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
3886            EventFilter::MoveEventModule { package, module } => index_store
3887                .events_by_move_event_module(
3888                    &ModuleId::new(package.into(), module),
3889                    tx_num,
3890                    event_num,
3891                    limit,
3892                    descending,
3893                )?,
3894            // not using "_ =>" because we want to make sure we remember to add new variants here
3895            EventFilter::Package(_)
3896            | EventFilter::MoveEventField { .. }
3897            | EventFilter::Any(_)
3898            | EventFilter::And(_, _)
3899            | EventFilter::Or(_, _) => {
3900                return Err(IotaError::UserInput {
3901                    error: UserInputError::Unsupported(
3902                        "This query type is not supported by the full node.".to_string(),
3903                    ),
3904                });
3905            }
3906        };
3907
3908        // skip one event if exclusive cursor is provided,
3909        // otherwise truncate to the original limit.
3910        if cursor.is_some() {
3911            if !event_keys.is_empty() {
3912                event_keys.remove(0);
3913            }
3914        } else {
3915            event_keys.truncate(limit - 1);
3916        }
3917
3918        // get the unique set of digests from the event_keys
3919        let transaction_digests = event_keys
3920            .iter()
3921            .map(|(_, digest, _, _)| *digest)
3922            .collect::<HashSet<_>>()
3923            .into_iter()
3924            .collect::<Vec<_>>();
3925
3926        let events = kv_store
3927            .multi_get_events_by_tx_digests(&transaction_digests)
3928            .await?;
3929
3930        let events_map: HashMap<_, _> =
3931            transaction_digests.iter().zip(events.into_iter()).collect();
3932
3933        let stored_events = event_keys
3934            .into_iter()
3935            .map(|k| {
3936                (
3937                    k,
3938                    events_map
3939                        .get(&k.1)
3940                        .expect("fetched digest is missing")
3941                        .clone()
3942                        .and_then(|e| e.data.get(k.2).cloned()),
3943                )
3944            })
3945            .map(|((digest, tx_digest, event_seq, timestamp), event)| {
3946                event
3947                    .map(|e| (e, tx_digest, event_seq, timestamp))
3948                    .ok_or(IotaError::TransactionEventsNotFound { digest })
3949            })
3950            .collect::<Result<Vec<_>, _>>()?;
3951
3952        let epoch_store = self.load_epoch_store_one_call_per_task();
3953        let backing_store = self.get_backing_package_store().as_ref();
3954        let mut layout_resolver = epoch_store
3955            .executor()
3956            .type_layout_resolver(Box::new(backing_store));
3957        let mut events = vec![];
3958        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
3959            events.push(IotaEvent::try_from(
3960                e.clone(),
3961                tx_digest,
3962                event_seq as u64,
3963                Some(timestamp),
3964                layout_resolver.get_annotated_layout(&e.type_)?,
3965            )?)
3966        }
3967        Ok(events)
3968    }
3969
3970    pub async fn insert_genesis_object(&self, object: Object) {
3971        self.get_reconfig_api()
3972            .insert_genesis_object(object)
3973            .expect("Cannot insert genesis object")
3974    }
3975
3976    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
3977        futures::future::join_all(
3978            objects
3979                .iter()
3980                .map(|o| self.insert_genesis_object(o.clone())),
3981        )
3982        .await;
3983    }
3984
3985    /// Make a status response for a transaction
3986    #[instrument(level = "trace", skip_all)]
3987    pub fn get_transaction_status(
3988        &self,
3989        transaction_digest: &TransactionDigest,
3990        epoch_store: &Arc<AuthorityPerEpochStore>,
3991    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
3992        // TODO: In the case of read path, we should not have to re-sign the effects.
3993        if let Some(effects) =
3994            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
3995        {
3996            if let Some(transaction) = self
3997                .get_transaction_cache_reader()
3998                .get_transaction_block(transaction_digest)?
3999            {
4000                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4001                let events = if let Some(digest) = effects.events_digest() {
4002                    self.get_transaction_events(digest)?
4003                } else {
4004                    TransactionEvents::default()
4005                };
4006                return Ok(Some((
4007                    (*transaction).clone().into_message(),
4008                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4009                )));
4010            } else {
4011                // The read of effects and read of transaction are not atomic. It's possible
4012                // that we reverted the transaction (during epoch change) in
4013                // between the above two reads, and we end up having effects but
4014                // not transaction. In this case, we just fall through.
4015                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4016            }
4017        }
4018        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4019            self.metrics.tx_already_processed.inc();
4020            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4021            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4022        } else {
4023            Ok(None)
4024        }
4025    }
4026
4027    /// Get the signed effects of the given transaction. If the effects was
4028    /// signed in a previous epoch, re-sign it so that the caller is able to
4029    /// form a cert of the effects in the current epoch.
4030    #[instrument(level = "trace", skip_all)]
4031    pub fn get_signed_effects_and_maybe_resign(
4032        &self,
4033        transaction_digest: &TransactionDigest,
4034        epoch_store: &Arc<AuthorityPerEpochStore>,
4035    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4036        let effects = self
4037            .get_transaction_cache_reader()
4038            .get_executed_effects(transaction_digest)?;
4039        match effects {
4040            Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4041            None => Ok(None),
4042        }
4043    }
4044
4045    #[instrument(level = "trace", skip_all)]
4046    pub(crate) fn sign_effects(
4047        &self,
4048        effects: TransactionEffects,
4049        epoch_store: &Arc<AuthorityPerEpochStore>,
4050    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4051        let tx_digest = *effects.transaction_digest();
4052        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4053            Some(sig) if sig.epoch == epoch_store.epoch() => {
4054                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4055            }
4056            _ => {
4057                // If the transaction was executed in previous epochs, the validator will
4058                // re-sign the effects with new current epoch so that a client is always able to
4059                // obtain an effects certificate at the current epoch.
4060                //
4061                // Why is this necessary? Consider the following case:
4062                // - assume there are 4 validators
4063                // - Quorum driver gets 2 signed effects before reconfig halt
4064                // - The tx makes it into final checkpoint.
4065                // - 2 validators go away and are replaced in the new epoch.
4066                // - The new epoch begins.
4067                // - The quorum driver cannot complete the partial effects cert from the
4068                //   previous epoch, because it may not be able to reach either of the 2 former
4069                //   validators.
4070                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4071                //   the new epoch, the QD can make a new effects cert and return it to the
4072                //   client.
4073                //
4074                // This is a considered a short-term workaround. Eventually, Quorum Driver
4075                // should be able to return either an effects certificate, -or-
4076                // a proof of inclusion in a checkpoint. In the case above, the
4077                // Quorum Driver would return a proof of inclusion in the final
4078                // checkpoint, and this code would no longer be necessary.
4079                debug!(
4080                    ?tx_digest,
4081                    epoch=?epoch_store.epoch(),
4082                    "Re-signing the effects with the current epoch"
4083                );
4084
4085                let sig = AuthoritySignInfo::new(
4086                    epoch_store.epoch(),
4087                    &effects,
4088                    Intent::iota_app(IntentScope::TransactionEffects),
4089                    self.name,
4090                    &*self.secret,
4091                );
4092
4093                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4094
4095                epoch_store.insert_effects_digest_and_signature(
4096                    &tx_digest,
4097                    effects.digest(),
4098                    &sig,
4099                )?;
4100
4101                effects
4102            }
4103        };
4104
4105        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4106            signed_effects,
4107        ))
4108    }
4109
4110    // Returns coin objects for indexing for fullnode if indexing is enabled.
4111    #[instrument(level = "trace", skip_all)]
4112    fn fullnode_only_get_tx_coins_for_indexing(
4113        &self,
4114        inner_temporary_store: &InnerTemporaryStore,
4115        epoch_store: &Arc<AuthorityPerEpochStore>,
4116    ) -> Option<TxCoins> {
4117        if self.indexes.is_none() || self.is_validator(epoch_store) {
4118            return None;
4119        }
4120        let written_coin_objects = inner_temporary_store
4121            .written
4122            .iter()
4123            .filter_map(|(k, v)| {
4124                if v.is_coin() {
4125                    Some((*k, v.clone()))
4126                } else {
4127                    None
4128                }
4129            })
4130            .collect();
4131        let input_coin_objects = inner_temporary_store
4132            .input_objects
4133            .iter()
4134            .filter_map(|(k, v)| {
4135                if v.is_coin() {
4136                    Some((*k, v.clone()))
4137                } else {
4138                    None
4139                }
4140            })
4141            .collect::<ObjectMap>();
4142        Some((input_coin_objects, written_coin_objects))
4143    }
4144
4145    /// Get the TransactionEnvelope that currently locks the given object, if
4146    /// any. Since object locks are only valid for one epoch, we also need
4147    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4148    /// no lock records for the given object can be found.
4149    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4150    /// object record is at a different version.
4151    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4152    /// certain transaction. Returns None if the a lock record is
4153    /// initialized for the given ObjectRef but not yet locked by any
4154    /// transaction,     or cannot find the transaction in transaction
4155    /// table, because of data race etc.
4156    #[instrument(level = "trace", skip_all)]
4157    pub async fn get_transaction_lock(
4158        &self,
4159        object_ref: &ObjectRef,
4160        epoch_store: &AuthorityPerEpochStore,
4161    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4162        let lock_info = self
4163            .get_object_cache_reader()
4164            .get_lock(*object_ref, epoch_store)?;
4165        let lock_info = match lock_info {
4166            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4167                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4168                    provided_obj_ref: *object_ref,
4169                    current_version: locked_ref.1,
4170                }
4171                .into());
4172            }
4173            ObjectLockStatus::Initialized => {
4174                return Ok(None);
4175            }
4176            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4177        };
4178
4179        epoch_store.get_signed_transaction(&lock_info)
4180    }
4181
4182    pub async fn get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4183        self.get_object_cache_reader().get_objects(objects)
4184    }
4185
4186    pub async fn get_object_or_tombstone(
4187        &self,
4188        object_id: ObjectID,
4189    ) -> IotaResult<Option<ObjectRef>> {
4190        self.get_object_cache_reader()
4191            .get_latest_object_ref_or_tombstone(object_id)
4192    }
4193
4194    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4195    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4196    /// upgrade.
4197    ///
4198    /// This method can be used to dynamic adjust the amount of buffer. If set
4199    /// to 0, the upgrade will go through with only 2f+1 votes.
4200    ///
4201    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4202    /// should have the same value), or you risk halting the chain.
4203    pub fn set_override_protocol_upgrade_buffer_stake(
4204        &self,
4205        expected_epoch: EpochId,
4206        buffer_stake_bps: u64,
4207    ) -> IotaResult {
4208        let epoch_store = self.load_epoch_store_one_call_per_task();
4209        let actual_epoch = epoch_store.epoch();
4210        if actual_epoch != expected_epoch {
4211            return Err(IotaError::WrongEpoch {
4212                expected_epoch,
4213                actual_epoch,
4214            });
4215        }
4216
4217        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4218    }
4219
4220    pub fn clear_override_protocol_upgrade_buffer_stake(
4221        &self,
4222        expected_epoch: EpochId,
4223    ) -> IotaResult {
4224        let epoch_store = self.load_epoch_store_one_call_per_task();
4225        let actual_epoch = epoch_store.epoch();
4226        if actual_epoch != expected_epoch {
4227            return Err(IotaError::WrongEpoch {
4228                expected_epoch,
4229                actual_epoch,
4230            });
4231        }
4232
4233        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4234    }
4235
4236    /// Get the set of system packages that are compiled in to this build, if
4237    /// those packages are compatible with the current versions of those
4238    /// packages on-chain.
4239    pub async fn get_available_system_packages(
4240        &self,
4241        binary_config: &BinaryConfig,
4242    ) -> Vec<ObjectRef> {
4243        let mut results = vec![];
4244
4245        let system_packages = BuiltInFramework::iter_system_packages();
4246
4247        // Add extra framework packages during simtest
4248        #[cfg(msim)]
4249        let extra_packages = framework_injection::get_extra_packages(self.name);
4250        #[cfg(msim)]
4251        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4252
4253        for system_package in system_packages {
4254            let modules = system_package.modules().to_vec();
4255            // In simtests, we could override the current built-in framework packages.
4256            #[cfg(msim)]
4257            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4258                .unwrap_or(modules);
4259
4260            let Some(obj_ref) = iota_framework::compare_system_package(
4261                &self.get_object_store(),
4262                &system_package.id,
4263                &modules,
4264                system_package.dependencies.to_vec(),
4265                binary_config,
4266            )
4267            .await
4268            else {
4269                return vec![];
4270            };
4271            results.push(obj_ref);
4272        }
4273
4274        results
4275    }
4276
4277    /// Return the new versions, module bytes, and dependencies for the packages
4278    /// that have been committed to for a framework upgrade, in
4279    /// `system_packages`.  Loads the module contents from the binary, and
4280    /// performs the following checks:
4281    ///
4282    /// - Whether its contents matches what is on-chain already, in which case
4283    ///   no upgrade is required, and its contents are omitted from the output.
4284    /// - Whether the contents in the binary can form a package whose digest
4285    ///   matches the input, meaning the framework will be upgraded, and this
4286    ///   authority can satisfy that upgrade, in which case the contents are
4287    ///   included in the output.
4288    ///
4289    /// If the current version of the framework can't be loaded, the binary does
4290    /// not contain the bytes for that framework ID, or the resulting
4291    /// package fails the digest check, `None` is returned indicating that
4292    /// this authority cannot run the upgrade that the network voted on.
4293    async fn get_system_package_bytes(
4294        &self,
4295        system_packages: Vec<ObjectRef>,
4296        binary_config: &BinaryConfig,
4297    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4298        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4299        let objects = self.get_objects(&ids).await.expect("read cannot fail");
4300
4301        let mut res = Vec::with_capacity(system_packages.len());
4302        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4303            let prev_transaction = match object {
4304                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4305                    // Skip this one because it doesn't need to be upgraded.
4306                    info!("Framework {} does not need updating", system_package_ref.0);
4307                    continue;
4308                }
4309
4310                Some(cur_object) => cur_object.previous_transaction,
4311                None => TransactionDigest::genesis_marker(),
4312            };
4313
4314            #[cfg(msim)]
4315            let SystemPackage {
4316                id: _,
4317                bytes,
4318                dependencies,
4319            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4320                .unwrap_or_else(|| {
4321                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4322                });
4323
4324            #[cfg(not(msim))]
4325            let SystemPackage {
4326                id: _,
4327                bytes,
4328                dependencies,
4329            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4330
4331            let modules: Vec<_> = bytes
4332                .iter()
4333                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4334                .collect();
4335
4336            let new_object = Object::new_system_package(
4337                &modules,
4338                system_package_ref.1,
4339                dependencies.clone(),
4340                prev_transaction,
4341            );
4342
4343            let new_ref = new_object.compute_object_reference();
4344            if new_ref != system_package_ref {
4345                error!(
4346                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4347                );
4348                return None;
4349            }
4350
4351            res.push((system_package_ref.1, bytes, dependencies));
4352        }
4353
4354        Some(res)
4355    }
4356
4357    /// Returns the new protocol version and system packages that the network
4358    /// has voted to upgrade to. If the proposed protocol version is not
4359    /// supported, None is returned.
4360    fn is_protocol_version_supported_v1(
4361        proposed_protocol_version: ProtocolVersion,
4362        committee: &Committee,
4363        capabilities: Vec<AuthorityCapabilitiesV1>,
4364        mut buffer_stake_bps: u64,
4365    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
4366        if buffer_stake_bps > 10000 {
4367            warn!("clamping buffer_stake_bps to 10000");
4368            buffer_stake_bps = 10000;
4369        }
4370
4371        // For each validator, gather the protocol version and system packages that it
4372        // would like to upgrade to in the next epoch.
4373        let mut desired_upgrades: Vec<_> = capabilities
4374            .into_iter()
4375            .filter_map(|mut cap| {
4376                // A validator that lists no packages is voting against any change at all.
4377                if cap.available_system_packages.is_empty() {
4378                    return None;
4379                }
4380
4381                cap.available_system_packages.sort();
4382
4383                info!(
4384                    "validator {:?} supports {:?} with system packages: {:?}",
4385                    cap.authority.concise(),
4386                    cap.supported_protocol_versions,
4387                    cap.available_system_packages,
4388                );
4389
4390                // A validator that only supports the current protocol version is also voting
4391                // against any change, because framework upgrades always require a protocol
4392                // version bump.
4393                cap.supported_protocol_versions
4394                    .get_version_digest(proposed_protocol_version)
4395                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4396            })
4397            .collect();
4398
4399        // There can only be one set of votes that have a majority, find one if it
4400        // exists.
4401        desired_upgrades.sort();
4402        desired_upgrades
4403            .into_iter()
4404            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4405            .into_iter()
4406            .find_map(|((digest, packages), group)| {
4407                // should have been filtered out earlier.
4408                assert!(!packages.is_empty());
4409
4410                let mut stake_aggregator: StakeAggregator<(), true> =
4411                    StakeAggregator::new(Arc::new(committee.clone()));
4412
4413                for (_, _, authority) in group {
4414                    stake_aggregator.insert_generic(authority, ());
4415                }
4416
4417                let total_votes = stake_aggregator.total_votes();
4418                let quorum_threshold = committee.quorum_threshold();
4419                let f = committee.total_votes() - committee.quorum_threshold();
4420
4421                // multiple by buffer_stake_bps / 10000, rounded up.
4422                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
4423                let effective_threshold = quorum_threshold + buffer_stake;
4424
4425                info!(
4426                    protocol_config_digest = ?digest,
4427                    ?total_votes,
4428                    ?quorum_threshold,
4429                    ?buffer_stake_bps,
4430                    ?effective_threshold,
4431                    ?proposed_protocol_version,
4432                    ?packages,
4433                    "support for upgrade"
4434                );
4435
4436                let has_support = total_votes >= effective_threshold;
4437                has_support.then_some((proposed_protocol_version, packages))
4438            })
4439    }
4440
4441    /// Selects the highest supported protocol version and system packages that
4442    /// the network has voted to upgrade to. If no upgrade is supported,
4443    /// returns the current protocol version and system packages.
4444    fn choose_protocol_version_and_system_packages_v1(
4445        current_protocol_version: ProtocolVersion,
4446        committee: &Committee,
4447        capabilities: Vec<AuthorityCapabilitiesV1>,
4448        buffer_stake_bps: u64,
4449    ) -> (ProtocolVersion, Vec<ObjectRef>) {
4450        let mut next_protocol_version = current_protocol_version;
4451        let mut system_packages = vec![];
4452
4453        // Finds the highest supported protocol version and system packages by
4454        // incrementing the proposed protocol version by one until no further
4455        // upgrades are supported.
4456        while let Some((version, packages)) = Self::is_protocol_version_supported_v1(
4457            next_protocol_version + 1,
4458            committee,
4459            capabilities.clone(),
4460            buffer_stake_bps,
4461        ) {
4462            next_protocol_version = version;
4463            system_packages = packages;
4464        }
4465
4466        (next_protocol_version, system_packages)
4467    }
4468
4469    #[instrument(level = "debug", skip_all)]
4470    fn create_authenticator_state_tx(
4471        &self,
4472        epoch_store: &Arc<AuthorityPerEpochStore>,
4473    ) -> Option<EndOfEpochTransactionKind> {
4474        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4475            info!("authenticator state transactions not enabled");
4476            return None;
4477        }
4478
4479        let authenticator_state_exists = epoch_store.authenticator_state_exists();
4480        let tx = if authenticator_state_exists {
4481            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4482            let min_epoch =
4483                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4484            let authenticator_obj_initial_shared_version = epoch_store
4485                .epoch_start_config()
4486                .authenticator_obj_initial_shared_version()
4487                .expect("initial version must exist");
4488
4489            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4490                min_epoch,
4491                authenticator_obj_initial_shared_version,
4492            );
4493
4494            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4495
4496            tx
4497        } else {
4498            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4499            info!("Creating AuthenticatorStateCreate tx");
4500            tx
4501        };
4502        Some(tx)
4503    }
4504
4505    #[instrument(level = "debug", skip_all)]
4506    fn create_bridge_tx(
4507        &self,
4508        epoch_store: &Arc<AuthorityPerEpochStore>,
4509    ) -> Option<EndOfEpochTransactionKind> {
4510        if !epoch_store.protocol_config().enable_bridge() {
4511            info!("bridge not enabled");
4512            return None;
4513        }
4514        if epoch_store.bridge_exists() {
4515            return None;
4516        }
4517        let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
4518        info!("Creating Bridge Create tx");
4519        Some(tx)
4520    }
4521
4522    #[instrument(level = "debug", skip_all)]
4523    fn init_bridge_committee_tx(
4524        &self,
4525        epoch_store: &Arc<AuthorityPerEpochStore>,
4526    ) -> Option<EndOfEpochTransactionKind> {
4527        if !epoch_store.protocol_config().enable_bridge() {
4528            info!("bridge not enabled");
4529            return None;
4530        }
4531        if !epoch_store
4532            .protocol_config()
4533            .should_try_to_finalize_bridge_committee()
4534        {
4535            info!("should not try to finalize bridge committee yet");
4536            return None;
4537        }
4538        // Only create this transaction if bridge exists
4539        if !epoch_store.bridge_exists() {
4540            return None;
4541        }
4542
4543        if epoch_store.bridge_committee_initiated() {
4544            return None;
4545        }
4546
4547        let bridge_initial_shared_version = epoch_store
4548            .epoch_start_config()
4549            .bridge_obj_initial_shared_version()
4550            .expect("initial version must exist");
4551        let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
4552        info!("Init Bridge committee tx");
4553        Some(tx)
4554    }
4555
4556    /// Creates and execute the advance epoch transaction to effects without
4557    /// committing it to the database. The effects of the change epoch tx
4558    /// are only written to the database after a certified checkpoint has been
4559    /// formed and executed by CheckpointExecutor.
4560    ///
4561    /// When a framework upgraded has been decided on, but the validator does
4562    /// not have the new versions of the packages locally, the validator
4563    /// cannot form the ChangeEpochTx. In this case it returns Err,
4564    /// indicating that the checkpoint builder should give up trying to make the
4565    /// final checkpoint. As long as the network is able to create a certified
4566    /// checkpoint (which should be ensured by the capabilities vote), it
4567    /// will arrive via state sync and be executed by CheckpointExecutor.
4568    #[instrument(level = "error", skip_all)]
4569    pub async fn create_and_execute_advance_epoch_tx(
4570        &self,
4571        epoch_store: &Arc<AuthorityPerEpochStore>,
4572        gas_cost_summary: &GasCostSummary,
4573        checkpoint: CheckpointSequenceNumber,
4574        epoch_start_timestamp_ms: CheckpointTimestamp,
4575    ) -> anyhow::Result<(
4576        IotaSystemState,
4577        Option<SystemEpochInfoEvent>,
4578        TransactionEffects,
4579    )> {
4580        let mut txns = Vec::new();
4581
4582        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4583            txns.push(tx);
4584        }
4585        if let Some(tx) = self.create_bridge_tx(epoch_store) {
4586            txns.push(tx);
4587        }
4588        if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
4589            txns.push(tx);
4590        }
4591
4592        let next_epoch = epoch_store.epoch() + 1;
4593
4594        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4595
4596        let (next_epoch_protocol_version, next_epoch_system_packages) =
4597            Self::choose_protocol_version_and_system_packages_v1(
4598                epoch_store.protocol_version(),
4599                epoch_store.committee(),
4600                epoch_store
4601                    .get_capabilities_v1()
4602                    .expect("read capabilities from db cannot fail"),
4603                buffer_stake_bps,
4604            );
4605
4606        // since system packages are created during the current epoch, they should abide
4607        // by the rules of the current epoch, including the current epoch's max
4608        // Move binary format version
4609        let config = epoch_store.protocol_config();
4610        let binary_config = to_binary_config(config);
4611        let Some(next_epoch_system_package_bytes) = self
4612            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4613            .await
4614        else {
4615            error!(
4616                "upgraded system packages {:?} are not locally available, cannot create \
4617                ChangeEpochTx. validator binary must be upgraded to the correct version!",
4618                next_epoch_system_packages
4619            );
4620            // the checkpoint builder will keep retrying forever when it hits this error.
4621            // Eventually, one of two things will happen:
4622            // - The operator will upgrade this binary to one that has the new packages
4623            //   locally, and this function will succeed.
4624            // - The final checkpoint will be certified by other validators, we will receive
4625            //   it via state sync, and execute it. This will upgrade the framework
4626            //   packages, reconfigure, and most likely shut down in the new epoch (this
4627            //   validator likely doesn't support the new protocol version, or else it
4628            //   should have had the packages.)
4629            return Err(anyhow!(
4630                "missing system packages: cannot form ChangeEpochTx"
4631            ));
4632        };
4633
4634        // ChangeEpochV2 requires that both options are set - ProtocolDefinedBaseFee and
4635        // MaxCommitteeMembersCount.
4636        if config.protocol_defined_base_fee()
4637            && config.max_committee_members_count_as_option().is_some()
4638        {
4639            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4640                next_epoch,
4641                next_epoch_protocol_version,
4642                gas_cost_summary.storage_cost,
4643                gas_cost_summary.computation_cost,
4644                gas_cost_summary.computation_cost_burned,
4645                gas_cost_summary.storage_rebate,
4646                gas_cost_summary.non_refundable_storage_fee,
4647                epoch_start_timestamp_ms,
4648                next_epoch_system_package_bytes,
4649            ));
4650        } else {
4651            txns.push(EndOfEpochTransactionKind::new_change_epoch(
4652                next_epoch,
4653                next_epoch_protocol_version,
4654                gas_cost_summary.storage_cost,
4655                gas_cost_summary.computation_cost,
4656                gas_cost_summary.storage_rebate,
4657                gas_cost_summary.non_refundable_storage_fee,
4658                epoch_start_timestamp_ms,
4659                next_epoch_system_package_bytes,
4660            ));
4661        }
4662
4663        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4664
4665        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4666            tx.clone(),
4667            epoch_store.epoch(),
4668            checkpoint,
4669        );
4670
4671        let tx_digest = executable_tx.digest();
4672
4673        info!(
4674            ?next_epoch,
4675            ?next_epoch_protocol_version,
4676            ?next_epoch_system_packages,
4677            computation_cost=?gas_cost_summary.computation_cost,
4678            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4679            storage_cost=?gas_cost_summary.storage_cost,
4680            storage_rebate=?gas_cost_summary.storage_rebate,
4681            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4682            ?tx_digest,
4683            "Creating advance epoch transaction"
4684        );
4685
4686        fail_point_async!("change_epoch_tx_delay");
4687        let tx_lock = epoch_store.acquire_tx_lock(tx_digest).await;
4688
4689        // The tx could have been executed by state sync already - if so simply return
4690        // an error. The checkpoint builder will shortly be terminated by
4691        // reconfiguration anyway.
4692        if self
4693            .get_transaction_cache_reader()
4694            .is_tx_already_executed(tx_digest)
4695            .expect("read cannot fail")
4696        {
4697            warn!("change epoch tx has already been executed via state sync");
4698            return Err(anyhow::anyhow!(
4699                "change epoch tx has already been executed via state sync",
4700            ));
4701        }
4702
4703        let execution_guard = self
4704            .execution_lock_for_executable_transaction(&executable_tx)
4705            .await?;
4706
4707        // We must manually assign the shared object versions to the transaction before
4708        // executing it. This is because we do not sequence end-of-epoch
4709        // transactions through consensus.
4710        epoch_store
4711            .assign_shared_object_versions_idempotent(
4712                self.get_object_cache_reader().as_ref(),
4713                &[executable_tx.clone()],
4714            )
4715            .await?;
4716
4717        let input_objects =
4718            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4719
4720        let (temporary_store, effects, _execution_error_opt) =
4721            self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4722        let system_obj = get_iota_system_state(&temporary_store.written)
4723            .expect("change epoch tx must write to system object");
4724        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
4725        let system_epoch_info_event = temporary_store
4726            .events
4727            .data
4728            .into_iter()
4729            .find(|event| event.is_system_epoch_info_event())
4730            .map(SystemEpochInfoEvent::from);
4731        // The system epoch info event can be `None` in case if the `advance_epoch`
4732        // Move function call failed and was executed in the safe mode.
4733        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4734
4735        // We must write tx and effects to the state sync tables so that state sync is
4736        // able to deliver to the transaction to CheckpointExecutor after it is
4737        // included in a certified checkpoint.
4738        self.get_state_sync_store()
4739            .insert_transaction_and_effects(&tx, &effects)
4740            .map_err(|err| {
4741                let err: anyhow::Error = err.into();
4742                err
4743            })?;
4744
4745        info!(
4746            "Effects summary of the change epoch transaction: {:?}",
4747            effects.summary_for_debug()
4748        );
4749        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
4750        // The change epoch transaction cannot fail to execute.
4751        assert!(effects.status().is_ok());
4752        Ok((system_obj, system_epoch_info_event, effects))
4753    }
4754
4755    /// This function is called at the very end of the epoch.
4756    /// This step is required before updating new epoch in the db and calling
4757    /// reopen_epoch_db.
4758    #[instrument(level = "error", skip_all)]
4759    async fn revert_uncommitted_epoch_transactions(
4760        &self,
4761        epoch_store: &AuthorityPerEpochStore,
4762    ) -> IotaResult {
4763        {
4764            let state = epoch_store.get_reconfig_state_write_lock_guard();
4765            if state.should_accept_user_certs() {
4766                // Need to change this so that consensus adapter do not accept certificates from
4767                // user. This can happen if our local validator did not initiate
4768                // epoch change locally, but 2f+1 nodes already concluded the
4769                // epoch.
4770                //
4771                // This lock is essentially a barrier for
4772                // `epoch_store.pending_consensus_certificates` table we are reading on the line
4773                // after this block
4774                epoch_store.close_user_certs(state);
4775            }
4776            // lock is dropped here
4777        }
4778        let pending_certificates = epoch_store.pending_consensus_certificates();
4779        info!(
4780            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
4781            pending_certificates.len(),
4782            pending_certificates,
4783        );
4784        for digest in pending_certificates {
4785            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
4786                info!(
4787                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
4788                    digest
4789                );
4790                continue;
4791            }
4792            info!("Reverting {:?} at the end of epoch", digest);
4793            epoch_store.revert_executed_transaction(&digest)?;
4794            self.get_reconfig_api().revert_state_update(&digest)?;
4795        }
4796        info!("All uncommitted local transactions reverted");
4797        Ok(())
4798    }
4799
4800    #[instrument(level = "error", skip_all)]
4801    async fn reopen_epoch_db(
4802        &self,
4803        cur_epoch_store: &AuthorityPerEpochStore,
4804        new_committee: Committee,
4805        epoch_start_configuration: EpochStartConfiguration,
4806        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4807    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
4808        let new_epoch = new_committee.epoch;
4809        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
4810        assert_eq!(
4811            epoch_start_configuration.epoch_start_state().epoch(),
4812            new_committee.epoch
4813        );
4814        fail_point!("before-open-new-epoch-store");
4815        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
4816            self.name,
4817            new_committee,
4818            epoch_start_configuration,
4819            self.get_backing_package_store().clone(),
4820            self.get_object_store().clone(),
4821            expensive_safety_check_config,
4822            cur_epoch_store.get_chain_identifier(),
4823        );
4824        self.epoch_store.store(new_epoch_store.clone());
4825        Ok(new_epoch_store)
4826    }
4827
4828    #[cfg(test)]
4829    pub(crate) fn iter_live_object_set_for_testing(
4830        &self,
4831    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
4832        self.get_accumulator_store().iter_live_object_set()
4833    }
4834
4835    #[cfg(test)]
4836    pub(crate) fn shutdown_execution_for_test(&self) {
4837        self.tx_execution_shutdown
4838            .lock()
4839            .take()
4840            .unwrap()
4841            .send(())
4842            .unwrap();
4843    }
4844
4845    /// NOTE: this function is only to be used for fuzzing and testing. Never
4846    /// use in prod
4847    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> IotaResult {
4848        self.get_reconfig_api()
4849            .bulk_insert_genesis_objects(objects)?;
4850        self.get_object_cache_reader()
4851            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
4852        Ok(())
4853    }
4854}
4855
4856pub struct RandomnessRoundReceiver {
4857    authority_state: Arc<AuthorityState>,
4858    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
4859}
4860
4861impl RandomnessRoundReceiver {
4862    pub fn spawn(
4863        authority_state: Arc<AuthorityState>,
4864        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
4865    ) -> JoinHandle<()> {
4866        let rrr = RandomnessRoundReceiver {
4867            authority_state,
4868            randomness_rx,
4869        };
4870        spawn_monitored_task!(rrr.run())
4871    }
4872
4873    async fn run(mut self) {
4874        info!("RandomnessRoundReceiver event loop started");
4875
4876        loop {
4877            tokio::select! {
4878                maybe_recv = self.randomness_rx.recv() => {
4879                    if let Some((epoch, round, bytes)) = maybe_recv {
4880                        self.handle_new_randomness(epoch, round, bytes);
4881                    } else {
4882                        break;
4883                    }
4884                },
4885            }
4886        }
4887
4888        info!("RandomnessRoundReceiver event loop ended");
4889    }
4890
4891    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
4892    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
4893        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
4894        if epoch_store.epoch() != epoch {
4895            warn!(
4896                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
4897                epoch_store.epoch()
4898            );
4899            return;
4900        }
4901        let transaction = VerifiedTransaction::new_randomness_state_update(
4902            epoch,
4903            round,
4904            bytes,
4905            epoch_store
4906                .epoch_start_config()
4907                .randomness_obj_initial_shared_version(),
4908        );
4909        debug!(
4910            "created randomness state update transaction with digest: {:?}",
4911            transaction.digest()
4912        );
4913        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
4914        let digest = *transaction.digest();
4915
4916        // Send transaction to TransactionManager for execution.
4917        self.authority_state
4918            .transaction_manager()
4919            .enqueue(vec![transaction], &epoch_store);
4920
4921        let authority_state = self.authority_state.clone();
4922        spawn_monitored_task!(async move {
4923            // Wait for transaction execution in a separate task, to avoid deadlock in case
4924            // of out-of-order randomness generation. (Each
4925            // RandomnessStateUpdate depends on the output of the
4926            // RandomnessStateUpdate from the previous round.)
4927            //
4928            // We set a very long timeout so that in case this gets stuck for some reason,
4929            // the validator will eventually crash rather than continuing in a
4930            // zombie mode.
4931            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
4932            let result = tokio::time::timeout(
4933                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
4934                authority_state
4935                    .get_transaction_cache_reader()
4936                    .notify_read_executed_effects(&[digest]),
4937            )
4938            .await;
4939            let result = match result {
4940                Ok(result) => result,
4941                Err(_) => {
4942                    if cfg!(debug_assertions) {
4943                        // Crash on randomness update execution timeout in debug builds.
4944                        panic!(
4945                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
4946                        );
4947                    }
4948                    warn!(
4949                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
4950                    );
4951                    // Continue waiting as long as necessary in non-debug builds.
4952                    authority_state
4953                        .get_transaction_cache_reader()
4954                        .notify_read_executed_effects(&[digest])
4955                        .await
4956                }
4957            };
4958
4959            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
4960            let effects = effects.pop().expect("should return effects");
4961            if *effects.status() != ExecutionStatus::Success {
4962                panic!(
4963                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
4964                );
4965            }
4966            debug!(
4967                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
4968            );
4969        });
4970    }
4971}
4972
4973#[async_trait]
4974impl TransactionKeyValueStoreTrait for AuthorityState {
4975    async fn multi_get(
4976        &self,
4977        transaction_keys: &[TransactionDigest],
4978        effects_keys: &[TransactionDigest],
4979    ) -> IotaResult<KVStoreTransactionData> {
4980        let txns = if !transaction_keys.is_empty() {
4981            self.get_transaction_cache_reader()
4982                .multi_get_transaction_blocks(transaction_keys)?
4983                .into_iter()
4984                .map(|t| t.map(|t| (*t).clone().into_inner()))
4985                .collect()
4986        } else {
4987            vec![]
4988        };
4989
4990        let fx = if !effects_keys.is_empty() {
4991            self.get_transaction_cache_reader()
4992                .multi_get_executed_effects(effects_keys)?
4993        } else {
4994            vec![]
4995        };
4996
4997        Ok((txns, fx))
4998    }
4999
5000    async fn multi_get_checkpoints(
5001        &self,
5002        checkpoint_summaries: &[CheckpointSequenceNumber],
5003        checkpoint_contents: &[CheckpointSequenceNumber],
5004        checkpoint_summaries_by_digest: &[CheckpointDigest],
5005    ) -> IotaResult<(
5006        Vec<Option<CertifiedCheckpointSummary>>,
5007        Vec<Option<CheckpointContents>>,
5008        Vec<Option<CertifiedCheckpointSummary>>,
5009    )> {
5010        // TODO: use multi-get methods if it ever becomes important (unlikely)
5011        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5012        let store = self.get_checkpoint_store();
5013        for seq in checkpoint_summaries {
5014            let checkpoint = store
5015                .get_checkpoint_by_sequence_number(*seq)?
5016                .map(|c| c.into_inner());
5017
5018            summaries.push(checkpoint);
5019        }
5020
5021        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5022        for seq in checkpoint_contents {
5023            let checkpoint = store
5024                .get_checkpoint_by_sequence_number(*seq)?
5025                .and_then(|summary| {
5026                    store
5027                        .get_checkpoint_contents(&summary.content_digest)
5028                        .expect("db read cannot fail")
5029                });
5030            contents.push(checkpoint);
5031        }
5032
5033        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5034        for digest in checkpoint_summaries_by_digest {
5035            let checkpoint = store
5036                .get_checkpoint_by_digest(digest)?
5037                .map(|c| c.into_inner());
5038            summaries_by_digest.push(checkpoint);
5039        }
5040
5041        Ok((summaries, contents, summaries_by_digest))
5042    }
5043
5044    async fn get_transaction_perpetual_checkpoint(
5045        &self,
5046        digest: TransactionDigest,
5047    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5048        self.get_checkpoint_cache()
5049            .get_transaction_perpetual_checkpoint(&digest)
5050            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5051    }
5052
5053    async fn get_object(
5054        &self,
5055        object_id: ObjectID,
5056        version: VersionNumber,
5057    ) -> IotaResult<Option<Object>> {
5058        self.get_object_cache_reader()
5059            .get_object_by_key(&object_id, version)
5060    }
5061
5062    async fn multi_get_transactions_perpetual_checkpoints(
5063        &self,
5064        digests: &[TransactionDigest],
5065    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5066        let res = self
5067            .get_checkpoint_cache()
5068            .multi_get_transactions_perpetual_checkpoints(digests)?;
5069
5070        Ok(res
5071            .into_iter()
5072            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5073            .collect())
5074    }
5075
5076    #[instrument(skip(self))]
5077    async fn multi_get_events_by_tx_digests(
5078        &self,
5079        digests: &[TransactionDigest],
5080    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5081        if digests.is_empty() {
5082            return Ok(vec![]);
5083        }
5084        let events_digests: Vec<_> = self
5085            .get_transaction_cache_reader()
5086            .multi_get_executed_effects(digests)?
5087            .into_iter()
5088            .map(|t| t.and_then(|t| t.events_digest().cloned()))
5089            .collect();
5090        let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5091        let mut events = self
5092            .get_transaction_cache_reader()
5093            .multi_get_events(&non_empty_events)?
5094            .into_iter();
5095        Ok(events_digests
5096            .into_iter()
5097            .map(|ev| ev.and_then(|_| events.next()?))
5098            .collect())
5099    }
5100}
5101
5102#[cfg(msim)]
5103pub mod framework_injection {
5104    use std::{
5105        cell::RefCell,
5106        collections::{BTreeMap, BTreeSet},
5107    };
5108
5109    use iota_framework::{BuiltInFramework, SystemPackage};
5110    use iota_types::{
5111        base_types::{AuthorityName, ObjectID},
5112        is_system_package,
5113    };
5114    use move_binary_format::CompiledModule;
5115
5116    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5117
5118    // Thread local cache because all simtests run in a single unique thread.
5119    thread_local! {
5120        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5121    }
5122
5123    type Framework = Vec<CompiledModule>;
5124
5125    pub type PackageUpgradeCallback =
5126        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5127
5128    enum PackageOverrideConfig {
5129        Global(Framework),
5130        PerValidator(PackageUpgradeCallback),
5131    }
5132
5133    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5134        modules
5135            .iter()
5136            .map(|m| {
5137                let mut buf = Vec::new();
5138                m.serialize_with_version(m.version, &mut buf).unwrap();
5139                buf
5140            })
5141            .collect()
5142    }
5143
5144    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5145        OVERRIDE.with(|bs| {
5146            bs.borrow_mut()
5147                .insert(package_id, PackageOverrideConfig::Global(modules))
5148        });
5149    }
5150
5151    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5152        OVERRIDE.with(|bs| {
5153            bs.borrow_mut()
5154                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5155        });
5156    }
5157
5158    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5159        OVERRIDE.with(|cfg| {
5160            cfg.borrow().get(package_id).and_then(|entry| match entry {
5161                PackageOverrideConfig::Global(framework) => {
5162                    Some(compiled_modules_to_bytes(framework))
5163                }
5164                PackageOverrideConfig::PerValidator(func) => {
5165                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5166                }
5167            })
5168        })
5169    }
5170
5171    pub fn get_override_modules(
5172        package_id: &ObjectID,
5173        name: AuthorityName,
5174    ) -> Option<Vec<CompiledModule>> {
5175        OVERRIDE.with(|cfg| {
5176            cfg.borrow().get(package_id).and_then(|entry| match entry {
5177                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5178                PackageOverrideConfig::PerValidator(func) => func(name),
5179            })
5180        })
5181    }
5182
5183    pub fn get_override_system_package(
5184        package_id: &ObjectID,
5185        name: AuthorityName,
5186    ) -> Option<SystemPackage> {
5187        let bytes = get_override_bytes(package_id, name)?;
5188        let dependencies = if is_system_package(*package_id) {
5189            BuiltInFramework::get_package_by_id(package_id)
5190                .dependencies
5191                .to_vec()
5192        } else {
5193            // Assume that entirely new injected packages depend on all existing system
5194            // packages.
5195            BuiltInFramework::all_package_ids()
5196        };
5197        Some(SystemPackage {
5198            id: *package_id,
5199            bytes,
5200            dependencies,
5201        })
5202    }
5203
5204    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5205        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5206        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5207            cfg.borrow()
5208                .keys()
5209                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5210                .collect()
5211        });
5212
5213        extra
5214            .into_iter()
5215            .map(|package| SystemPackage {
5216                id: package,
5217                bytes: get_override_bytes(&package, name).unwrap(),
5218                dependencies: BuiltInFramework::all_package_ids(),
5219            })
5220            .collect()
5221    }
5222}
5223
5224#[derive(Debug, Serialize, Deserialize, Clone)]
5225pub struct ObjDumpFormat {
5226    pub id: ObjectID,
5227    pub version: VersionNumber,
5228    pub digest: ObjectDigest,
5229    pub object: Object,
5230}
5231
5232impl ObjDumpFormat {
5233    fn new(object: Object) -> Self {
5234        let oref = object.compute_object_reference();
5235        Self {
5236            id: oref.0,
5237            version: oref.1,
5238            digest: oref.2,
5239            object,
5240        }
5241    }
5242}
5243
5244#[derive(Debug, Serialize, Deserialize, Clone)]
5245pub struct NodeStateDump {
5246    pub tx_digest: TransactionDigest,
5247    pub sender_signed_data: SenderSignedData,
5248    pub executed_epoch: u64,
5249    pub reference_gas_price: u64,
5250    pub protocol_version: u64,
5251    pub epoch_start_timestamp_ms: u64,
5252    pub computed_effects: TransactionEffects,
5253    pub expected_effects_digest: TransactionEffectsDigest,
5254    pub relevant_system_packages: Vec<ObjDumpFormat>,
5255    pub shared_objects: Vec<ObjDumpFormat>,
5256    pub loaded_child_objects: Vec<ObjDumpFormat>,
5257    pub modified_at_versions: Vec<ObjDumpFormat>,
5258    pub runtime_reads: Vec<ObjDumpFormat>,
5259    pub input_objects: Vec<ObjDumpFormat>,
5260}
5261
5262impl NodeStateDump {
5263    pub fn new(
5264        tx_digest: &TransactionDigest,
5265        effects: &TransactionEffects,
5266        expected_effects_digest: TransactionEffectsDigest,
5267        object_store: &dyn ObjectStore,
5268        epoch_store: &Arc<AuthorityPerEpochStore>,
5269        inner_temporary_store: &InnerTemporaryStore,
5270        certificate: &VerifiedExecutableTransaction,
5271    ) -> IotaResult<Self> {
5272        // Epoch info
5273        let executed_epoch = epoch_store.epoch();
5274        let reference_gas_price = epoch_store.reference_gas_price();
5275        let epoch_start_config = epoch_store.epoch_start_config();
5276        let protocol_version = epoch_store.protocol_version().as_u64();
5277        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5278
5279        // Record all system packages at this version
5280        let mut relevant_system_packages = Vec::new();
5281        for sys_package_id in BuiltInFramework::all_package_ids() {
5282            if let Some(w) = object_store.get_object(&sys_package_id)? {
5283                relevant_system_packages.push(ObjDumpFormat::new(w))
5284            }
5285        }
5286
5287        // Record all the shared objects
5288        let mut shared_objects = Vec::new();
5289        for kind in effects.input_shared_objects() {
5290            match kind {
5291                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5292                    if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1)? {
5293                        shared_objects.push(ObjDumpFormat::new(w))
5294                    }
5295                }
5296                InputSharedObject::ReadDeleted(..)
5297                | InputSharedObject::MutateDeleted(..)
5298                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
5299                                                           * objects. */
5300            }
5301        }
5302
5303        // Record all loaded child objects
5304        // Child objects which are read but not mutated are not tracked anywhere else
5305        let mut loaded_child_objects = Vec::new();
5306        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5307            if let Some(w) = object_store.get_object_by_key(id, meta.version)? {
5308                loaded_child_objects.push(ObjDumpFormat::new(w))
5309            }
5310        }
5311
5312        // Record all modified objects
5313        let mut modified_at_versions = Vec::new();
5314        for (id, ver) in effects.modified_at_versions() {
5315            if let Some(w) = object_store.get_object_by_key(&id, ver)? {
5316                modified_at_versions.push(ObjDumpFormat::new(w))
5317            }
5318        }
5319
5320        // Packages read at runtime, which were not previously loaded into the temoorary
5321        // store Some packages may be fetched at runtime and wont show up in
5322        // input objects
5323        let mut runtime_reads = Vec::new();
5324        for obj in inner_temporary_store
5325            .runtime_packages_loaded_from_db
5326            .values()
5327        {
5328            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5329        }
5330
5331        // All other input objects should already be in `inner_temporary_store.objects`
5332
5333        Ok(Self {
5334            tx_digest: *tx_digest,
5335            executed_epoch,
5336            reference_gas_price,
5337            epoch_start_timestamp_ms,
5338            protocol_version,
5339            relevant_system_packages,
5340            shared_objects,
5341            loaded_child_objects,
5342            modified_at_versions,
5343            runtime_reads,
5344            sender_signed_data: certificate.clone().into_message(),
5345            input_objects: inner_temporary_store
5346                .input_objects
5347                .values()
5348                .map(|o| ObjDumpFormat::new(o.clone()))
5349                .collect(),
5350            computed_effects: effects.clone(),
5351            expected_effects_digest,
5352        })
5353    }
5354
5355    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5356        let mut objects = Vec::new();
5357        objects.extend(self.relevant_system_packages.clone());
5358        objects.extend(self.shared_objects.clone());
5359        objects.extend(self.loaded_child_objects.clone());
5360        objects.extend(self.modified_at_versions.clone());
5361        objects.extend(self.runtime_reads.clone());
5362        objects.extend(self.input_objects.clone());
5363        objects
5364    }
5365
5366    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5367        let file_name = format!(
5368            "{}_{}_NODE_DUMP.json",
5369            self.tx_digest,
5370            AuthorityState::unixtime_now_ms()
5371        );
5372        let mut path = path.to_path_buf();
5373        path.push(&file_name);
5374        let mut file = File::create(path.clone())?;
5375        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5376        Ok(path)
5377    }
5378
5379    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5380        let file = File::open(path)?;
5381        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5382    }
5383}