iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_storage::{
48    key_value_store::{
49        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
50    },
51    key_value_store_metrics::KeyValueStoreMetrics,
52};
53#[cfg(msim)]
54use iota_types::committee::CommitteeTrait;
55use iota_types::{
56    IOTA_SYSTEM_ADDRESS, TypeTag,
57    authenticator_state::get_authenticator_state,
58    base_types::*,
59    committee::{Committee, EpochId, ProtocolVersion},
60    crypto::{AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer, default_hash},
61    deny_list_v1::check_coin_deny_list_v1_during_signing,
62    digests::{ChainIdentifier, TransactionEventsDigest},
63    dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
64    effects::{
65        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
66        TransactionEvents, VerifiedCertifiedTransactionEffects, VerifiedSignedTransactionEffects,
67    },
68    error::{ExecutionError, IotaError, IotaResult, UserInputError},
69    event::{Event, EventID, SystemEpochInfoEvent},
70    executable_transaction::VerifiedExecutableTransaction,
71    execution_config_utils::to_binary_config,
72    execution_status::ExecutionStatus,
73    fp_ensure,
74    gas::{GasCostSummary, IotaGasStatus},
75    gas_coin::NANOS_PER_IOTA,
76    inner_temporary_store::{
77        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
78        WrittenObjects,
79    },
80    iota_system_state::{
81        IotaSystemState, IotaSystemStateTrait,
82        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
83    },
84    is_system_package,
85    layout_resolver::{LayoutResolver, into_struct_layout},
86    message_envelope::Message,
87    messages_checkpoint::{
88        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
89        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
90        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
91        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
92    },
93    messages_consensus::AuthorityCapabilitiesV1,
94    messages_grpc::{
95        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
96        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
97        TransactionStatus,
98    },
99    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
100    object::{
101        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
102        bounded_visitor::BoundedVisitor,
103    },
104    storage::{
105        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
106    },
107    supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions},
108    transaction::*,
109    transaction_executor::SimulateTransactionResult,
110};
111use itertools::Itertools;
112use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
113use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
114use parking_lot::Mutex;
115use prometheus::{
116    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
117    register_histogram_vec_with_registry, register_histogram_with_registry,
118    register_int_counter_vec_with_registry, register_int_counter_with_registry,
119    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
120};
121use serde::{Deserialize, Serialize, de::DeserializeOwned};
122use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
123use tap::TapFallible;
124use tokio::{
125    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
126    task::JoinHandle,
127};
128use tracing::{Instrument, debug, error, info, instrument, trace, warn};
129use typed_store::TypedStoreError;
130
131use self::{
132    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
133};
134#[cfg(msim)]
135pub use crate::checkpoints::checkpoint_executor::utils::{
136    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
137};
138use crate::{
139    authority::{
140        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
141        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
142        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
143        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
144        authority_store_tables::AuthorityPrunerTables,
145        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
146    },
147    authority_client::NetworkAuthorityClient,
148    checkpoints::CheckpointStore,
149    congestion_tracker::CongestionTracker,
150    consensus_adapter::ConsensusAdapter,
151    epoch::committee_store::CommitteeStore,
152    execution_cache::{
153        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
154        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
155        TransactionCacheRead,
156    },
157    execution_driver::execution_process,
158    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
159    metrics::{LatencyObserver, RateTracker},
160    module_cache_metrics::ResolverMetrics,
161    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
162    rest_index::RestIndexStore,
163    stake_aggregator::StakeAggregator,
164    state_accumulator::{AccumulatorStore, StateAccumulator},
165    subscription_handler::SubscriptionHandler,
166    transaction_input_loader::TransactionInputLoader,
167    transaction_manager::TransactionManager,
168    transaction_outputs::TransactionOutputs,
169    validator_tx_finalizer::ValidatorTxFinalizer,
170    verify_indexes::verify_indexes,
171};
172
173#[cfg(test)]
174#[path = "unit_tests/authority_tests.rs"]
175pub mod authority_tests;
176
177#[cfg(test)]
178#[path = "unit_tests/transaction_tests.rs"]
179pub mod transaction_tests;
180
181#[cfg(test)]
182#[path = "unit_tests/batch_transaction_tests.rs"]
183mod batch_transaction_tests;
184
185#[cfg(test)]
186#[path = "unit_tests/move_integration_tests.rs"]
187pub mod move_integration_tests;
188
189#[cfg(test)]
190#[path = "unit_tests/gas_tests.rs"]
191mod gas_tests;
192
193#[cfg(test)]
194#[path = "unit_tests/batch_verification_tests.rs"]
195mod batch_verification_tests;
196
197#[cfg(test)]
198#[path = "unit_tests/coin_deny_list_tests.rs"]
199mod coin_deny_list_tests;
200
201#[cfg(test)]
202#[path = "unit_tests/auth_unit_test_utils.rs"]
203pub mod auth_unit_test_utils;
204
205pub mod authority_test_utils;
206
207pub mod authority_per_epoch_store;
208pub mod authority_per_epoch_store_pruner;
209
210pub mod authority_store_pruner;
211pub mod authority_store_tables;
212pub mod authority_store_types;
213pub mod epoch_start_configuration;
214pub mod shared_object_congestion_tracker;
215pub mod shared_object_version_manager;
216pub mod suggested_gas_price_calculator;
217pub mod test_authority_builder;
218pub mod transaction_deferral;
219
220pub(crate) mod authority_store;
221pub mod backpressure;
222
223/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
224pub struct AuthorityMetrics {
225    tx_orders: IntCounter,
226    total_certs: IntCounter,
227    total_cert_attempts: IntCounter,
228    total_effects: IntCounter,
229    pub shared_obj_tx: IntCounter,
230    sponsored_tx: IntCounter,
231    tx_already_processed: IntCounter,
232    num_input_objs: Histogram,
233    num_shared_objects: Histogram,
234    batch_size: Histogram,
235
236    authority_state_handle_transaction_latency: Histogram,
237
238    execute_certificate_latency_single_writer: Histogram,
239    execute_certificate_latency_shared_object: Histogram,
240
241    execute_certificate_with_effects_latency: Histogram,
242    internal_execution_latency: Histogram,
243    execution_load_input_objects_latency: Histogram,
244    prepare_certificate_latency: Histogram,
245    commit_certificate_latency: Histogram,
246    db_checkpoint_latency: Histogram,
247
248    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
249    pub(crate) transaction_manager_num_missing_objects: IntGauge,
250    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
251    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
252    pub(crate) transaction_manager_num_ready: IntGauge,
253    pub(crate) transaction_manager_object_cache_size: IntGauge,
254    pub(crate) transaction_manager_object_cache_hits: IntCounter,
255    pub(crate) transaction_manager_object_cache_misses: IntCounter,
256    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
257    pub(crate) transaction_manager_package_cache_size: IntGauge,
258    pub(crate) transaction_manager_package_cache_hits: IntCounter,
259    pub(crate) transaction_manager_package_cache_misses: IntCounter,
260    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
261    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
262
263    pub(crate) execution_driver_executed_transactions: IntCounter,
264    pub(crate) execution_driver_dispatch_queue: IntGauge,
265    pub(crate) execution_queueing_delay_s: Histogram,
266    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
267    pub(crate) execution_gas_latency_ratio: Histogram,
268
269    pub(crate) skipped_consensus_txns: IntCounter,
270    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
271
272    pub(crate) authority_overload_status: IntGauge,
273    pub(crate) authority_load_shedding_percentage: IntGauge,
274
275    pub(crate) transaction_overload_sources: IntCounterVec,
276
277    /// Post processing metrics
278    post_processing_total_events_emitted: IntCounter,
279    post_processing_total_tx_indexed: IntCounter,
280    post_processing_total_tx_had_event_processed: IntCounter,
281    post_processing_total_failures: IntCounter,
282
283    /// Consensus handler metrics
284    pub consensus_handler_processed: IntCounterVec,
285    pub consensus_handler_transaction_sizes: HistogramVec,
286    pub consensus_handler_num_low_scoring_authorities: IntGauge,
287    pub consensus_handler_scores: IntGaugeVec,
288    pub consensus_handler_deferred_transactions: IntCounter,
289    pub consensus_handler_congested_transactions: IntCounter,
290    pub consensus_handler_cancelled_transactions: IntCounter,
291    pub consensus_handler_max_object_costs: IntGaugeVec,
292    pub consensus_committed_subdags: IntCounterVec,
293    pub consensus_committed_messages: IntGaugeVec,
294    pub consensus_committed_user_transactions: IntGaugeVec,
295    pub consensus_calculated_throughput: IntGauge,
296    pub consensus_calculated_throughput_profile: IntGauge,
297
298    pub limits_metrics: Arc<LimitsMetrics>,
299
300    /// bytecode verifier metrics for tracking timeouts
301    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
302
303    /// Count of zklogin signatures
304    pub zklogin_sig_count: IntCounter,
305    /// Count of multisig signatures
306    pub multisig_sig_count: IntCounter,
307
308    // Tracks recent average txn queueing delay between when it is ready for execution
309    // until it starts executing.
310    pub execution_queueing_latency: LatencyObserver,
311
312    // Tracks the rate of transactions become ready for execution in transaction manager.
313    // The need for the Mutex is that the tracker is updated in transaction manager and read
314    // in the overload_monitor. There should be low mutex contention because
315    // transaction manager is single threaded and the read rate in overload_monitor is
316    // low. In the case where transaction manager becomes multi-threaded, we can
317    // create one rate tracker per thread.
318    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
319
320    // Tracks the rate of transactions starts execution in execution driver.
321    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
322    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
323}
324
325// Override default Prom buckets for positive numbers in 0-10M range
326const POSITIVE_INT_BUCKETS: &[f64] = &[
327    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
328    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
329    7000000., 10000000.,
330];
331
332const LATENCY_SEC_BUCKETS: &[f64] = &[
333    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
334    10., 20., 30., 60., 90.,
335];
336
337// Buckets for low latency samples. Starts from 10us.
338const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
339    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
340    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
341];
342
343const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
344    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
345    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
346];
347
348/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
349pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
350
351impl AuthorityMetrics {
352    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
353        let execute_certificate_latency = register_histogram_vec_with_registry!(
354            "authority_state_execute_certificate_latency",
355            "Latency of executing certificates, including waiting for inputs",
356            &["tx_type"],
357            LATENCY_SEC_BUCKETS.to_vec(),
358            registry,
359        )
360        .unwrap();
361
362        let execute_certificate_latency_single_writer =
363            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
364        let execute_certificate_latency_shared_object =
365            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
366
367        Self {
368            tx_orders: register_int_counter_with_registry!(
369                "total_transaction_orders",
370                "Total number of transaction orders",
371                registry,
372            )
373            .unwrap(),
374            total_certs: register_int_counter_with_registry!(
375                "total_transaction_certificates",
376                "Total number of transaction certificates handled",
377                registry,
378            )
379            .unwrap(),
380            total_cert_attempts: register_int_counter_with_registry!(
381                "total_handle_certificate_attempts",
382                "Number of calls to handle_certificate",
383                registry,
384            )
385            .unwrap(),
386            // total_effects == total transactions finished
387            total_effects: register_int_counter_with_registry!(
388                "total_transaction_effects",
389                "Total number of transaction effects produced",
390                registry,
391            )
392            .unwrap(),
393
394            shared_obj_tx: register_int_counter_with_registry!(
395                "num_shared_obj_tx",
396                "Number of transactions involving shared objects",
397                registry,
398            )
399            .unwrap(),
400
401            sponsored_tx: register_int_counter_with_registry!(
402                "num_sponsored_tx",
403                "Number of sponsored transactions",
404                registry,
405            )
406            .unwrap(),
407
408            tx_already_processed: register_int_counter_with_registry!(
409                "num_tx_already_processed",
410                "Number of transaction orders already processed previously",
411                registry,
412            )
413            .unwrap(),
414            num_input_objs: register_histogram_with_registry!(
415                "num_input_objects",
416                "Distribution of number of input TX objects per TX",
417                POSITIVE_INT_BUCKETS.to_vec(),
418                registry,
419            )
420            .unwrap(),
421            num_shared_objects: register_histogram_with_registry!(
422                "num_shared_objects",
423                "Number of shared input objects per TX",
424                POSITIVE_INT_BUCKETS.to_vec(),
425                registry,
426            )
427            .unwrap(),
428            batch_size: register_histogram_with_registry!(
429                "batch_size",
430                "Distribution of size of transaction batch",
431                POSITIVE_INT_BUCKETS.to_vec(),
432                registry,
433            )
434            .unwrap(),
435            authority_state_handle_transaction_latency: register_histogram_with_registry!(
436                "authority_state_handle_transaction_latency",
437                "Latency of handling transactions",
438                LATENCY_SEC_BUCKETS.to_vec(),
439                registry,
440            )
441            .unwrap(),
442            execute_certificate_latency_single_writer,
443            execute_certificate_latency_shared_object,
444            execute_certificate_with_effects_latency: register_histogram_with_registry!(
445                "authority_state_execute_certificate_with_effects_latency",
446                "Latency of executing certificates with effects, including waiting for inputs",
447                LATENCY_SEC_BUCKETS.to_vec(),
448                registry,
449            )
450            .unwrap(),
451            internal_execution_latency: register_histogram_with_registry!(
452                "authority_state_internal_execution_latency",
453                "Latency of actual certificate executions",
454                LATENCY_SEC_BUCKETS.to_vec(),
455                registry,
456            )
457            .unwrap(),
458            execution_load_input_objects_latency: register_histogram_with_registry!(
459                "authority_state_execution_load_input_objects_latency",
460                "Latency of loading input objects for execution",
461                LOW_LATENCY_SEC_BUCKETS.to_vec(),
462                registry,
463            )
464            .unwrap(),
465            prepare_certificate_latency: register_histogram_with_registry!(
466                "authority_state_prepare_certificate_latency",
467                "Latency of executing certificates, before committing the results",
468                LATENCY_SEC_BUCKETS.to_vec(),
469                registry,
470            )
471            .unwrap(),
472            commit_certificate_latency: register_histogram_with_registry!(
473                "authority_state_commit_certificate_latency",
474                "Latency of committing certificate execution results",
475                LATENCY_SEC_BUCKETS.to_vec(),
476                registry,
477            )
478            .unwrap(),
479            db_checkpoint_latency: register_histogram_with_registry!(
480                "db_checkpoint_latency",
481                "Latency of checkpointing dbs",
482                LATENCY_SEC_BUCKETS.to_vec(),
483                registry,
484            ).unwrap(),
485            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
486                "transaction_manager_num_enqueued_certificates",
487                "Current number of certificates enqueued to TransactionManager",
488                &["result"],
489                registry,
490            )
491            .unwrap(),
492            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
493                "transaction_manager_num_missing_objects",
494                "Current number of missing objects in TransactionManager",
495                registry,
496            )
497            .unwrap(),
498            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
499                "transaction_manager_num_pending_certificates",
500                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
501                registry,
502            )
503            .unwrap(),
504            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
505                "transaction_manager_num_executing_certificates",
506                "Number of executing certificates, including queued and actually running certificates",
507                registry,
508            )
509            .unwrap(),
510            transaction_manager_num_ready: register_int_gauge_with_registry!(
511                "transaction_manager_num_ready",
512                "Number of ready transactions in TransactionManager",
513                registry,
514            )
515            .unwrap(),
516            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
517                "transaction_manager_object_cache_size",
518                "Current size of object-availability cache in TransactionManager",
519                registry,
520            )
521            .unwrap(),
522            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
523                "transaction_manager_object_cache_hits",
524                "Number of object-availability cache hits in TransactionManager",
525                registry,
526            )
527            .unwrap(),
528            authority_overload_status: register_int_gauge_with_registry!(
529                "authority_overload_status",
530                "Whether authority is current experiencing overload and enters load shedding mode.",
531                registry)
532            .unwrap(),
533            authority_load_shedding_percentage: register_int_gauge_with_registry!(
534                "authority_load_shedding_percentage",
535                "The percentage of transactions is shed when the authority is in load shedding mode.",
536                registry)
537            .unwrap(),
538            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
539                "transaction_manager_object_cache_misses",
540                "Number of object-availability cache misses in TransactionManager",
541                registry,
542            )
543            .unwrap(),
544            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
545                "transaction_manager_object_cache_evictions",
546                "Number of object-availability cache evictions in TransactionManager",
547                registry,
548            )
549            .unwrap(),
550            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
551                "transaction_manager_package_cache_size",
552                "Current size of package-availability cache in TransactionManager",
553                registry,
554            )
555            .unwrap(),
556            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
557                "transaction_manager_package_cache_hits",
558                "Number of package-availability cache hits in TransactionManager",
559                registry,
560            )
561            .unwrap(),
562            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
563                "transaction_manager_package_cache_misses",
564                "Number of package-availability cache misses in TransactionManager",
565                registry,
566            )
567            .unwrap(),
568            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
569                "transaction_manager_package_cache_evictions",
570                "Number of package-availability cache evictions in TransactionManager",
571                registry,
572            )
573            .unwrap(),
574            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
575                "transaction_manager_transaction_queue_age_s",
576                "Time spent in waiting for transaction in the queue",
577                LATENCY_SEC_BUCKETS.to_vec(),
578                registry,
579            )
580            .unwrap(),
581            transaction_overload_sources: register_int_counter_vec_with_registry!(
582                "transaction_overload_sources",
583                "Number of times each source indicates transaction overload.",
584                &["source"],
585                registry)
586            .unwrap(),
587            execution_driver_executed_transactions: register_int_counter_with_registry!(
588                "execution_driver_executed_transactions",
589                "Cumulative number of transaction executed by execution driver",
590                registry,
591            )
592            .unwrap(),
593            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
594                "execution_driver_dispatch_queue",
595                "Number of transaction pending in execution driver dispatch queue",
596                registry,
597            )
598            .unwrap(),
599            execution_queueing_delay_s: register_histogram_with_registry!(
600                "execution_queueing_delay_s",
601                "Queueing delay between a transaction is ready for execution until it starts executing.",
602                LATENCY_SEC_BUCKETS.to_vec(),
603                registry
604            )
605            .unwrap(),
606            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
607                "prepare_cert_gas_latency_ratio",
608                "The ratio of computation gas divided by VM execution latency.",
609                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
610                registry
611            )
612            .unwrap(),
613            execution_gas_latency_ratio: register_histogram_with_registry!(
614                "execution_gas_latency_ratio",
615                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
616                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
617                registry
618            )
619            .unwrap(),
620            skipped_consensus_txns: register_int_counter_with_registry!(
621                "skipped_consensus_txns",
622                "Total number of consensus transactions skipped",
623                registry,
624            )
625            .unwrap(),
626            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
627                "skipped_consensus_txns_cache_hit",
628                "Total number of consensus transactions skipped because of local cache hit",
629                registry,
630            )
631            .unwrap(),
632            post_processing_total_events_emitted: register_int_counter_with_registry!(
633                "post_processing_total_events_emitted",
634                "Total number of events emitted in post processing",
635                registry,
636            )
637            .unwrap(),
638            post_processing_total_tx_indexed: register_int_counter_with_registry!(
639                "post_processing_total_tx_indexed",
640                "Total number of txes indexed in post processing",
641                registry,
642            )
643            .unwrap(),
644            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
645                "post_processing_total_tx_had_event_processed",
646                "Total number of txes finished event processing in post processing",
647                registry,
648            )
649            .unwrap(),
650            post_processing_total_failures: register_int_counter_with_registry!(
651                "post_processing_total_failures",
652                "Total number of failure in post processing",
653                registry,
654            )
655            .unwrap(),
656            consensus_handler_processed: register_int_counter_vec_with_registry!(
657                "consensus_handler_processed",
658                "Number of transactions processed by consensus handler",
659                &["class"],
660                registry
661            ).unwrap(),
662            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
663                "consensus_handler_transaction_sizes",
664                "Sizes of each type of transactions processed by consensus handler",
665                &["class"],
666                POSITIVE_INT_BUCKETS.to_vec(),
667                registry
668            ).unwrap(),
669            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
670                "consensus_handler_num_low_scoring_authorities",
671                "Number of low scoring authorities based on reputation scores from consensus",
672                registry
673            ).unwrap(),
674            consensus_handler_scores: register_int_gauge_vec_with_registry!(
675                "consensus_handler_scores",
676                "scores from consensus for each authority",
677                &["authority"],
678                registry,
679            ).unwrap(),
680            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
681                "consensus_handler_deferred_transactions",
682                "Number of transactions deferred by consensus handler",
683                registry,
684            ).unwrap(),
685            consensus_handler_congested_transactions: register_int_counter_with_registry!(
686                "consensus_handler_congested_transactions",
687                "Number of transactions deferred by consensus handler due to congestion",
688                registry,
689            ).unwrap(),
690            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
691                "consensus_handler_cancelled_transactions",
692                "Number of transactions cancelled by consensus handler",
693                registry,
694            ).unwrap(),
695            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
696                "consensus_handler_max_congestion_control_object_costs",
697                "Max object costs for congestion control in the current consensus commit",
698                &["commit_type"],
699                registry,
700            ).unwrap(),
701            consensus_committed_subdags: register_int_counter_vec_with_registry!(
702                "consensus_committed_subdags",
703                "Number of committed subdags, sliced by author",
704                &["authority"],
705                registry,
706            ).unwrap(),
707            consensus_committed_messages: register_int_gauge_vec_with_registry!(
708                "consensus_committed_messages",
709                "Total number of committed consensus messages, sliced by author",
710                &["authority"],
711                registry,
712            ).unwrap(),
713            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
714                "consensus_committed_user_transactions",
715                "Number of committed user transactions, sliced by submitter",
716                &["authority"],
717                registry,
718            ).unwrap(),
719            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
720            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
721            zklogin_sig_count: register_int_counter_with_registry!(
722                "zklogin_sig_count",
723                "Count of zkLogin signatures",
724                registry,
725            )
726            .unwrap(),
727            multisig_sig_count: register_int_counter_with_registry!(
728                "multisig_sig_count",
729                "Count of zkLogin signatures",
730                registry,
731            )
732            .unwrap(),
733            consensus_calculated_throughput: register_int_gauge_with_registry!(
734                "consensus_calculated_throughput",
735                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
736                registry,
737            ).unwrap(),
738            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
739                "consensus_calculated_throughput_profile",
740                "The current active calculated throughput profile",
741                registry
742            ).unwrap(),
743            execution_queueing_latency: LatencyObserver::new(),
744            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
745            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
746        }
747    }
748
749    /// Reset metrics that contain `hostname` as one of the labels. This is
750    /// needed to avoid retaining metrics for long-gone committee members and
751    /// only exposing metrics for the committee in the current epoch.
752    pub fn reset_on_reconfigure(&self) {
753        self.consensus_committed_messages.reset();
754        self.consensus_handler_scores.reset();
755        self.consensus_committed_user_transactions.reset();
756    }
757}
758
759/// a Trait object for `Signer` that is:
760/// - Pin, i.e. confined to one place in memory (we don't want to copy private
761///   keys).
762/// - Sync, i.e. can be safely shared between threads.
763///
764/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
765pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
766
767pub struct AuthorityState {
768    // Fixed size, static, identity of the authority
769    /// The name of this authority.
770    pub name: AuthorityName,
771    /// The signature key of the authority.
772    pub secret: StableSyncAuthoritySigner,
773
774    /// The database
775    input_loader: TransactionInputLoader,
776    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
777
778    epoch_store: ArcSwap<AuthorityPerEpochStore>,
779
780    /// This lock denotes current 'execution epoch'.
781    /// Execution acquires read lock, checks certificate epoch and holds it
782    /// until all writes are complete. Reconfiguration acquires write lock,
783    /// changes the epoch and revert all transactions from previous epoch
784    /// that are executed but did not make into checkpoint.
785    execution_lock: RwLock<EpochId>,
786
787    pub indexes: Option<Arc<IndexStore>>,
788    pub rest_index: Option<Arc<RestIndexStore>>,
789
790    pub subscription_handler: Arc<SubscriptionHandler>,
791    pub checkpoint_store: Arc<CheckpointStore>,
792
793    committee_store: Arc<CommitteeStore>,
794
795    /// Manages pending certificates and their missing input objects.
796    transaction_manager: Arc<TransactionManager>,
797
798    /// Shuts down the execution task. Used only in testing.
799    #[cfg_attr(not(test), expect(unused))]
800    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
801
802    pub metrics: Arc<AuthorityMetrics>,
803    _pruner: AuthorityStorePruner,
804    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
805
806    /// Take db checkpoints of different dbs
807    db_checkpoint_config: DBCheckpointConfig,
808
809    pub config: NodeConfig,
810
811    /// Current overload status in this authority. Updated periodically.
812    pub overload_info: AuthorityOverloadInfo,
813
814    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
815
816    /// The chain identifier is derived from the digest of the genesis
817    /// checkpoint.
818    chain_identifier: ChainIdentifier,
819
820    pub(crate) congestion_tracker: Arc<CongestionTracker>,
821}
822
823/// The authority state encapsulates all state, drives execution, and ensures
824/// safety.
825///
826/// Note the authority operations can be accessed through a read ref (&) and do
827/// not require &mut. Internally a database is synchronized through a mutex
828/// lock.
829///
830/// Repeating valid commands should produce no changes and return no error.
831impl AuthorityState {
832    pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
833        epoch_store.committee().authority_exists(&self.name)
834    }
835
836    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
837        !self.is_validator(epoch_store)
838    }
839
840    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
841        &self.committee_store
842    }
843
844    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
845        self.committee_store.clone()
846    }
847
848    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
849        &self.config.authority_overload_config
850    }
851
852    pub fn get_epoch_state_commitments(
853        &self,
854        epoch: EpochId,
855    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
856        self.checkpoint_store.get_epoch_state_commitments(epoch)
857    }
858
859    /// This is a private method and should be kept that way. It doesn't check
860    /// whether the provided transaction is a system transaction, and hence
861    /// can only be called internally.
862    #[instrument(level = "trace", skip_all)]
863    async fn handle_transaction_impl(
864        &self,
865        transaction: VerifiedTransaction,
866        epoch_store: &Arc<AuthorityPerEpochStore>,
867    ) -> IotaResult<VerifiedSignedTransaction> {
868        // Ensure that validator cannot reconfigure while we are signing the tx
869        let _execution_lock = self.execution_lock_for_signing();
870
871        let tx_digest = transaction.digest();
872        let tx_data = transaction.data().transaction_data();
873
874        let input_object_kinds = tx_data.input_objects()?;
875        let receiving_objects_refs = tx_data.receiving_objects();
876
877        // Note: the deny checks may do redundant package loads but:
878        // - they only load packages when there is an active package deny map
879        // - the loads are cached anyway
880        iota_transaction_checks::deny::check_transaction_for_signing(
881            tx_data,
882            transaction.tx_signatures(),
883            &input_object_kinds,
884            &receiving_objects_refs,
885            &self.config.transaction_deny_config,
886            self.get_backing_package_store().as_ref(),
887        )?;
888
889        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
890            Some(tx_digest),
891            &input_object_kinds,
892            &receiving_objects_refs,
893            epoch_store.epoch(),
894        )?;
895
896        let (_gas_status, checked_input_objects) =
897            iota_transaction_checks::check_transaction_input(
898                epoch_store.protocol_config(),
899                epoch_store.reference_gas_price(),
900                tx_data,
901                input_objects,
902                &receiving_objects,
903                &self.metrics.bytecode_verifier_metrics,
904                &self.config.verifier_signing_config,
905            )?;
906
907        check_coin_deny_list_v1_during_signing(
908            tx_data.sender(),
909            &checked_input_objects,
910            &receiving_objects,
911            &self.get_object_store(),
912        )?;
913
914        let owned_objects = checked_input_objects.inner().filter_owned_objects();
915
916        let signed_transaction = VerifiedSignedTransaction::new(
917            epoch_store.epoch(),
918            transaction,
919            self.name,
920            &*self.secret,
921        );
922
923        // Check and write locks, to signed transaction, into the database
924        // The call to self.set_transaction_lock checks the lock is not conflicting,
925        // and returns ConflictingTransaction error in case there is a lock on a
926        // different existing transaction.
927        self.get_cache_writer().try_acquire_transaction_locks(
928            epoch_store,
929            &owned_objects,
930            signed_transaction.clone(),
931        )?;
932
933        Ok(signed_transaction)
934    }
935
936    /// Initiate a new transaction.
937    #[instrument(level = "trace", skip_all)]
938    pub async fn handle_transaction(
939        &self,
940        epoch_store: &Arc<AuthorityPerEpochStore>,
941        transaction: VerifiedTransaction,
942    ) -> IotaResult<HandleTransactionResponse> {
943        let tx_digest = *transaction.digest();
944        debug!("handle_transaction");
945
946        // Ensure an idempotent answer.
947        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
948            return Ok(HandleTransactionResponse { status });
949        }
950
951        let _metrics_guard = self
952            .metrics
953            .authority_state_handle_transaction_latency
954            .start_timer();
955        self.metrics.tx_orders.inc();
956
957        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
958        match signed {
959            Ok(s) => {
960                if self.is_validator(epoch_store) {
961                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
962                        let tx = s.clone();
963                        let validator_tx_finalizer = validator_tx_finalizer.clone();
964                        let cache_reader = self.get_transaction_cache_reader().clone();
965                        let epoch_store = epoch_store.clone();
966                        spawn_monitored_task!(epoch_store.within_alive_epoch(
967                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
968                        ));
969                    }
970                }
971                Ok(HandleTransactionResponse {
972                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
973                })
974            }
975            // It happens frequently that while we are checking the validity of the transaction, it
976            // has just been executed.
977            // In that case, we could still return Ok to avoid showing confusing errors.
978            Err(err) => Ok(HandleTransactionResponse {
979                status: self
980                    .get_transaction_status(&tx_digest, epoch_store)?
981                    .ok_or(err)?
982                    .1,
983            }),
984        }
985    }
986
987    pub fn check_system_overload_at_signing(&self) -> bool {
988        self.config
989            .authority_overload_config
990            .check_system_overload_at_signing
991    }
992
993    pub fn check_system_overload_at_execution(&self) -> bool {
994        self.config
995            .authority_overload_config
996            .check_system_overload_at_execution
997    }
998
999    pub(crate) fn check_system_overload(
1000        &self,
1001        consensus_adapter: &Arc<ConsensusAdapter>,
1002        tx_data: &SenderSignedData,
1003        do_authority_overload_check: bool,
1004    ) -> IotaResult {
1005        if do_authority_overload_check {
1006            self.check_authority_overload(tx_data).tap_err(|_| {
1007                self.update_overload_metrics("execution_queue");
1008            })?;
1009        }
1010        self.transaction_manager
1011            .check_execution_overload(self.overload_config(), tx_data)
1012            .tap_err(|_| {
1013                self.update_overload_metrics("execution_pending");
1014            })?;
1015        consensus_adapter.check_consensus_overload().tap_err(|_| {
1016            self.update_overload_metrics("consensus");
1017        })?;
1018
1019        let pending_tx_count = self
1020            .get_cache_commit()
1021            .approximate_pending_transaction_count();
1022        if pending_tx_count
1023            > self
1024                .config
1025                .execution_cache_config
1026                .writeback_cache
1027                .backpressure_threshold_for_rpc()
1028        {
1029            return Err(IotaError::ValidatorOverloadedRetryAfter {
1030                retry_after_secs: 10,
1031            });
1032        }
1033
1034        Ok(())
1035    }
1036
1037    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1038        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1039            return Ok(());
1040        }
1041
1042        let load_shedding_percentage = self
1043            .overload_info
1044            .load_shedding_percentage
1045            .load(Ordering::Relaxed);
1046        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1047    }
1048
1049    fn update_overload_metrics(&self, source: &str) {
1050        self.metrics
1051            .transaction_overload_sources
1052            .with_label_values(&[source])
1053            .inc();
1054    }
1055
1056    /// Executes a transaction that's known to have correct effects.
1057    /// For such transaction, we don't have to wait for consensus to set shared
1058    /// object locks because we already know the shared object versions
1059    /// based on the effects. This function can be called by a fullnode
1060    /// only.
1061    #[instrument(level = "trace", skip_all)]
1062    pub async fn fullnode_execute_certificate_with_effects(
1063        &self,
1064        transaction: &VerifiedExecutableTransaction,
1065        // NOTE: the caller of this must promise to wait until it
1066        // knows for sure this tx is finalized, namely, it has seen a
1067        // CertifiedTransactionEffects or at least f+1 identifical effects
1068        // digests matching this TransactionEffectsEnvelope, before calling
1069        // this function, in order to prevent a byzantine validator from
1070        // giving us incorrect effects.
1071        effects: &VerifiedCertifiedTransactionEffects,
1072        epoch_store: &Arc<AuthorityPerEpochStore>,
1073    ) -> IotaResult {
1074        assert!(self.is_fullnode(epoch_store));
1075        // NOTE: the fullnode can change epoch during local execution. It should not
1076        // cause data inconsistency, but can be problematic for certain tests.
1077        // The check below mitigates the issue, but it is not a fundamental solution to
1078        // avoid race between local execution and reconfiguration.
1079        if self.epoch_store.load().epoch() != epoch_store.epoch() {
1080            return Err(IotaError::EpochEnded(epoch_store.epoch()));
1081        }
1082        let _metrics_guard = self
1083            .metrics
1084            .execute_certificate_with_effects_latency
1085            .start_timer();
1086        let digest = *transaction.digest();
1087        debug!("execute_certificate_with_effects");
1088        fp_ensure!(
1089            *effects.data().transaction_digest() == digest,
1090            IotaError::ErrorWhileProcessingCertificate {
1091                err: "effects/tx digest mismatch".to_string()
1092            }
1093        );
1094
1095        if transaction.contains_shared_object() {
1096            epoch_store.acquire_shared_version_assignments_from_effects(
1097                transaction,
1098                effects.data(),
1099                self.get_object_cache_reader().as_ref(),
1100            )?;
1101        }
1102
1103        let expected_effects_digest = effects.digest();
1104
1105        self.transaction_manager
1106            .enqueue(vec![transaction.clone()], epoch_store);
1107
1108        let observed_effects = self
1109            .get_transaction_cache_reader()
1110            .try_notify_read_executed_effects(&[digest])
1111            .instrument(tracing::debug_span!(
1112                "notify_read_effects_in_execute_certificate_with_effects"
1113            ))
1114            .await?
1115            .pop()
1116            .expect("notify_read_effects should return exactly 1 element");
1117
1118        let observed_effects_digest = observed_effects.digest();
1119        if &observed_effects_digest != expected_effects_digest {
1120            panic!(
1121                "Locally executed effects do not match canonical effects! expected_effects_digest={:?} observed_effects_digest={:?} expected_effects={:?} observed_effects={:?} input_objects={:?}",
1122                expected_effects_digest,
1123                observed_effects_digest,
1124                effects.data(),
1125                observed_effects,
1126                transaction.data().transaction_data().input_objects()
1127            );
1128        }
1129        Ok(())
1130    }
1131
1132    /// Executes a certificate for its effects.
1133    #[instrument(level = "trace", skip_all)]
1134    pub async fn execute_certificate(
1135        &self,
1136        certificate: &VerifiedCertificate,
1137        epoch_store: &Arc<AuthorityPerEpochStore>,
1138    ) -> IotaResult<TransactionEffects> {
1139        let _metrics_guard = if certificate.contains_shared_object() {
1140            self.metrics
1141                .execute_certificate_latency_shared_object
1142                .start_timer()
1143        } else {
1144            self.metrics
1145                .execute_certificate_latency_single_writer
1146                .start_timer()
1147        };
1148        trace!("execute_certificate");
1149
1150        self.metrics.total_cert_attempts.inc();
1151
1152        if !certificate.contains_shared_object() {
1153            // Shared object transactions need to be sequenced by the consensus before
1154            // enqueueing for execution, done in
1155            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1156            // object transactions, they can be enqueued for execution immediately.
1157            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1158        }
1159
1160        // tx could be reverted when epoch ends, so we must be careful not to return a
1161        // result here after the epoch ends.
1162        epoch_store
1163            .within_alive_epoch(self.notify_read_effects(certificate))
1164            .await
1165            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1166            .and_then(|r| r)
1167    }
1168
1169    /// Internal logic to execute a certificate.
1170    ///
1171    /// Guarantees that
1172    /// - If input objects are available, return no permanent failure.
1173    /// - Execution and output commit are atomic. i.e. outputs are only written
1174    ///   to storage,
1175    /// on successful execution; crashed execution has no observable effect and
1176    /// can be retried.
1177    ///
1178    /// It is caller's responsibility to ensure input objects are available and
1179    /// locks are set. If this cannot be satisfied by the caller,
1180    /// execute_certificate() should be called instead.
1181    ///
1182    /// Should only be called within iota-core.
1183    #[instrument(level = "trace", skip_all)]
1184    pub fn try_execute_immediately(
1185        &self,
1186        certificate: &VerifiedExecutableTransaction,
1187        expected_effects_digest: Option<TransactionEffectsDigest>,
1188        epoch_store: &Arc<AuthorityPerEpochStore>,
1189    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1190        let _scope = monitored_scope("Execution::try_execute_immediately");
1191        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1192
1193        let tx_digest = certificate.digest();
1194
1195        // Acquire a lock to prevent concurrent executions of the same transaction.
1196        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1197
1198        // The cert could have been processed by a concurrent attempt of the same cert,
1199        // so check if the effects have already been written.
1200        if let Some(effects) = self
1201            .get_transaction_cache_reader()
1202            .try_get_executed_effects(tx_digest)?
1203        {
1204            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1205                assert_eq!(
1206                    effects.digest(),
1207                    expected_effects_digest_inner,
1208                    "Unexpected effects digest for transaction {tx_digest:?}"
1209                );
1210            }
1211            tx_guard.release();
1212            return Ok((effects, None));
1213        }
1214        let input_objects =
1215            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1216
1217        // If no expected_effects_digest was provided, try to get it from storage.
1218        // We could be re-executing a previously executed but uncommitted transaction,
1219        // perhaps after restarting with a new binary. In this situation, if
1220        // we have published an effects signature, we must be sure not to
1221        // equivocate. TODO: read from cache instead of DB
1222        let expected_effects_digest =
1223            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1224
1225        self.process_certificate(
1226            tx_guard,
1227            certificate,
1228            input_objects,
1229            expected_effects_digest,
1230            epoch_store,
1231        )
1232        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1233        .tap_ok(
1234            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1235        )
1236    }
1237
1238    pub fn read_objects_for_execution(
1239        &self,
1240        tx_lock: &CertLockGuard,
1241        certificate: &VerifiedExecutableTransaction,
1242        epoch_store: &Arc<AuthorityPerEpochStore>,
1243    ) -> IotaResult<InputObjects> {
1244        let _scope = monitored_scope("Execution::load_input_objects");
1245        let _metrics_guard = self
1246            .metrics
1247            .execution_load_input_objects_latency
1248            .start_timer();
1249        let input_objects = &certificate.data().transaction_data().input_objects()?;
1250        self.input_loader.read_objects_for_execution(
1251            epoch_store,
1252            &certificate.key(),
1253            tx_lock,
1254            input_objects,
1255            epoch_store.epoch(),
1256        )
1257    }
1258
1259    /// Test only wrapper for `try_execute_immediately()` above, useful for
1260    /// checking errors if the pre-conditions are not satisfied, and
1261    /// executing change epoch transactions.
1262    pub fn try_execute_for_test(
1263        &self,
1264        certificate: &VerifiedCertificate,
1265    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1266        let epoch_store = self.epoch_store_for_testing();
1267        let (effects, execution_error_opt) = self.try_execute_immediately(
1268            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1269            None,
1270            &epoch_store,
1271        )?;
1272        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1273        Ok((signed_effects, execution_error_opt))
1274    }
1275
1276    /// Non-fallible version of `try_execute_for_test()`.
1277    pub fn execute_for_test(
1278        &self,
1279        certificate: &VerifiedCertificate,
1280    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1281        self.try_execute_for_test(certificate)
1282            .expect("try_execute_for_test should not fail")
1283    }
1284
1285    pub async fn notify_read_effects(
1286        &self,
1287        certificate: &VerifiedCertificate,
1288    ) -> IotaResult<TransactionEffects> {
1289        self.get_transaction_cache_reader()
1290            .try_notify_read_executed_effects(&[*certificate.digest()])
1291            .await
1292            .map(|mut r| r.pop().expect("must return correct number of effects"))
1293    }
1294
1295    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1296        self.get_object_cache_reader()
1297            .try_check_owned_objects_are_live(owned_object_refs)
1298    }
1299
1300    /// This function captures the required state to debug a forked transaction.
1301    /// The dump is written to a file in dir `path`, with name prefixed by the
1302    /// transaction digest. NOTE: Since this info escapes the validator
1303    /// context, make sure not to leak any private info here
1304    pub(crate) fn debug_dump_transaction_state(
1305        &self,
1306        tx_digest: &TransactionDigest,
1307        effects: &TransactionEffects,
1308        expected_effects_digest: TransactionEffectsDigest,
1309        inner_temporary_store: &InnerTemporaryStore,
1310        certificate: &VerifiedExecutableTransaction,
1311        debug_dump_config: &StateDebugDumpConfig,
1312    ) -> IotaResult<PathBuf> {
1313        let dump_dir = debug_dump_config
1314            .dump_file_directory
1315            .as_ref()
1316            .cloned()
1317            .unwrap_or(std::env::temp_dir());
1318        let epoch_store = self.load_epoch_store_one_call_per_task();
1319
1320        NodeStateDump::new(
1321            tx_digest,
1322            effects,
1323            expected_effects_digest,
1324            self.get_object_store().as_ref(),
1325            &epoch_store,
1326            inner_temporary_store,
1327            certificate,
1328        )?
1329        .write_to_file(&dump_dir)
1330        .map_err(|e| IotaError::FileIO(e.to_string()))
1331    }
1332
1333    #[instrument(level = "trace", skip_all)]
1334    pub(crate) fn process_certificate(
1335        &self,
1336        tx_guard: CertTxGuard,
1337        certificate: &VerifiedExecutableTransaction,
1338        input_objects: InputObjects,
1339        expected_effects_digest: Option<TransactionEffectsDigest>,
1340        epoch_store: &Arc<AuthorityPerEpochStore>,
1341    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1342        let process_certificate_start_time = tokio::time::Instant::now();
1343        let digest = *certificate.digest();
1344
1345        fail_point_if!("correlated-crash-process-certificate", || {
1346            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1347                iota_simulator::task::kill_current_node(None);
1348            }
1349        });
1350
1351        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1352        // Any caller that verifies the signatures on the certificate will have already
1353        // checked the epoch. But paths that don't verify sigs (e.g. execution
1354        // from checkpoint, reading from db) present the possibility of an epoch
1355        // mismatch. If this cert is not finalzied in previous epoch, then it's
1356        // invalid.
1357        let execution_guard = match execution_guard {
1358            Ok(execution_guard) => execution_guard,
1359            Err(err) => {
1360                tx_guard.release();
1361                return Err(err);
1362            }
1363        };
1364        // Since we obtain a reference to the epoch store before taking the execution
1365        // lock, it's possible that reconfiguration has happened and they no
1366        // longer match.
1367        if *execution_guard != epoch_store.epoch() {
1368            tx_guard.release();
1369            info!("The epoch of the execution_guard doesn't match the epoch store");
1370            return Err(IotaError::WrongEpoch {
1371                expected_epoch: epoch_store.epoch(),
1372                actual_epoch: *execution_guard,
1373            });
1374        }
1375
1376        // Errors originating from prepare_certificate may be transient (failure to read
1377        // locks) or non-transient (transaction input is invalid, move vm
1378        // errors). However, all errors from this function occur before we have
1379        // written anything to the db, so we commit the tx guard and rely on the
1380        // client to retry the tx (if it was transient).
1381        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1382            &execution_guard,
1383            certificate,
1384            input_objects,
1385            epoch_store,
1386        ) {
1387            Err(e) => {
1388                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1389                tx_guard.release();
1390                return Err(e);
1391            }
1392            Ok(res) => res,
1393        };
1394
1395        if let Some(expected_effects_digest) = expected_effects_digest {
1396            if effects.digest() != expected_effects_digest {
1397                // We dont want to mask the original error, so we log it and continue.
1398                match self.debug_dump_transaction_state(
1399                    &digest,
1400                    &effects,
1401                    expected_effects_digest,
1402                    &inner_temporary_store,
1403                    certificate,
1404                    &self.config.state_debug_dump_config,
1405                ) {
1406                    Ok(out_path) => {
1407                        info!(
1408                            "Dumped node state for transaction {} to {}",
1409                            digest,
1410                            out_path.as_path().display().to_string()
1411                        );
1412                    }
1413                    Err(e) => {
1414                        error!("Error dumping state for transaction {}: {e}", digest);
1415                    }
1416                }
1417                error!(
1418                    tx_digest = ?digest,
1419                    ?expected_effects_digest,
1420                    actual_effects = ?effects,
1421                    "fork detected!"
1422                );
1423                panic!(
1424                    "Transaction {} is expected to have effects digest {}, but got {}!",
1425                    digest,
1426                    expected_effects_digest,
1427                    effects.digest(),
1428                );
1429            }
1430        }
1431
1432        fail_point!("crash");
1433
1434        self.commit_certificate(
1435            certificate,
1436            inner_temporary_store,
1437            &effects,
1438            tx_guard,
1439            execution_guard,
1440            epoch_store,
1441        )?;
1442
1443        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1444            certificate.data().transaction_data().kind()
1445        {
1446            if let Some(err) = &execution_error_opt {
1447                debug_fatal!("Authenticator state update failed: {:?}", err);
1448            }
1449            epoch_store.update_authenticator_state(auth_state);
1450
1451            // double check that the signature verifier always matches the authenticator
1452            // state
1453            if cfg!(debug_assertions) {
1454                let authenticator_state = get_authenticator_state(self.get_object_store())
1455                    .expect("Read cannot fail")
1456                    .expect("Authenticator state must exist");
1457
1458                let mut sys_jwks: Vec<_> = authenticator_state
1459                    .active_jwks
1460                    .into_iter()
1461                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1462                    .collect();
1463                let mut active_jwks: Vec<_> = epoch_store
1464                    .signature_verifier
1465                    .get_jwks()
1466                    .into_iter()
1467                    .collect();
1468                sys_jwks.sort();
1469                active_jwks.sort();
1470
1471                assert_eq!(sys_jwks, active_jwks);
1472            }
1473        }
1474
1475        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1476        if elapsed > 0.0 {
1477            self.metrics
1478                .execution_gas_latency_ratio
1479                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1480        };
1481        Ok((effects, execution_error_opt))
1482    }
1483
1484    #[instrument(level = "trace", skip_all)]
1485    fn commit_certificate(
1486        &self,
1487        certificate: &VerifiedExecutableTransaction,
1488        inner_temporary_store: InnerTemporaryStore,
1489        effects: &TransactionEffects,
1490        tx_guard: CertTxGuard,
1491        _execution_guard: ExecutionLockReadGuard<'_>,
1492        epoch_store: &Arc<AuthorityPerEpochStore>,
1493    ) -> IotaResult {
1494        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1495            monitored_scope("Execution::commit_certificate");
1496        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1497
1498        let tx_key = certificate.key();
1499        let tx_digest = certificate.digest();
1500        let input_object_count = inner_temporary_store.input_objects.len();
1501        let shared_object_count = effects.input_shared_objects().len();
1502
1503        let output_keys = inner_temporary_store.get_output_keys(effects);
1504
1505        // index certificate
1506        let _ = self
1507            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1508            .tap_err(|e| {
1509                self.metrics.post_processing_total_failures.inc();
1510                error!(?tx_digest, "tx post processing failed: {e}");
1511            });
1512
1513        // The insertion to epoch_store is not atomic with the insertion to the
1514        // perpetual store. This is OK because we insert to the epoch store
1515        // first. And during lookups we always look up in the perpetual store first.
1516        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1517
1518        // Allow testing what happens if we crash here.
1519        fail_point!("crash");
1520
1521        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1522            certificate.clone().into_unsigned(),
1523            effects.clone(),
1524            inner_temporary_store,
1525        );
1526        self.get_cache_writer()
1527            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1528
1529        if certificate.transaction_data().is_end_of_epoch_tx() {
1530            // At the end of epoch, since system packages may have been upgraded, force
1531            // reload them in the cache.
1532            self.get_object_cache_reader()
1533                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1534        }
1535
1536        // commit_certificate finished, the tx is fully committed to the store.
1537        tx_guard.commit_tx();
1538
1539        // Notifies transaction manager about transaction and output objects committed.
1540        // This provides necessary information to transaction manager to start executing
1541        // additional ready transactions.
1542        self.transaction_manager
1543            .notify_commit(tx_digest, output_keys, epoch_store);
1544
1545        self.update_metrics(certificate, input_object_count, shared_object_count);
1546
1547        Ok(())
1548    }
1549
1550    fn update_metrics(
1551        &self,
1552        certificate: &VerifiedExecutableTransaction,
1553        input_object_count: usize,
1554        shared_object_count: usize,
1555    ) {
1556        // count signature by scheme, for zklogin and multisig
1557        if certificate.has_zklogin_sig() {
1558            self.metrics.zklogin_sig_count.inc();
1559        } else if certificate.has_upgraded_multisig() {
1560            self.metrics.multisig_sig_count.inc();
1561        }
1562
1563        self.metrics.total_effects.inc();
1564        self.metrics.total_certs.inc();
1565
1566        if shared_object_count > 0 {
1567            self.metrics.shared_obj_tx.inc();
1568        }
1569
1570        if certificate.is_sponsored_tx() {
1571            self.metrics.sponsored_tx.inc();
1572        }
1573
1574        self.metrics
1575            .num_input_objs
1576            .observe(input_object_count as f64);
1577        self.metrics
1578            .num_shared_objects
1579            .observe(shared_object_count as f64);
1580        self.metrics.batch_size.observe(
1581            certificate
1582                .data()
1583                .intent_message()
1584                .value
1585                .kind()
1586                .num_commands() as f64,
1587        );
1588    }
1589
1590    /// prepare_certificate validates the transaction input, and executes the
1591    /// certificate, returning effects, output objects, events, etc.
1592    ///
1593    /// It reads state from the db (both owned and shared locks), but it has no
1594    /// side effects.
1595    ///
1596    /// It can be generally understood that a failure of prepare_certificate
1597    /// indicates a non-transient error, e.g. the transaction input is
1598    /// somehow invalid, the correct locks are not held, etc. However, this
1599    /// is not entirely true, as a transient db read error may also cause
1600    /// this function to fail.
1601    #[instrument(level = "trace", skip_all)]
1602    fn prepare_certificate(
1603        &self,
1604        _execution_guard: &ExecutionLockReadGuard<'_>,
1605        certificate: &VerifiedExecutableTransaction,
1606        input_objects: InputObjects,
1607        epoch_store: &Arc<AuthorityPerEpochStore>,
1608    ) -> IotaResult<(
1609        InnerTemporaryStore,
1610        TransactionEffects,
1611        Option<ExecutionError>,
1612    )> {
1613        let _scope = monitored_scope("Execution::prepare_certificate");
1614        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1615        let prepare_certificate_start_time = tokio::time::Instant::now();
1616
1617        // TODO: We need to move this to a more appropriate place to avoid redundant
1618        // checks.
1619        let tx_data = certificate.data().transaction_data();
1620        tx_data.validity_check(epoch_store.protocol_config())?;
1621
1622        // The cost of partially re-auditing a transaction before execution is
1623        // tolerated.
1624        let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1625            certificate,
1626            input_objects,
1627            epoch_store.protocol_config(),
1628            epoch_store.reference_gas_price(),
1629        )?;
1630
1631        let owned_object_refs = input_objects.inner().filter_owned_objects();
1632        self.check_owned_locks(&owned_object_refs)?;
1633        let tx_digest = *certificate.digest();
1634        let protocol_config = epoch_store.protocol_config();
1635        let transaction_data = &certificate.data().intent_message().value;
1636        let (kind, signer, gas) = transaction_data.execution_parts();
1637
1638        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1639        let (inner_temp_store, _, mut effects, execution_error_opt) =
1640            epoch_store.executor().execute_transaction_to_effects(
1641                self.get_backing_store().as_ref(),
1642                protocol_config,
1643                self.metrics.limits_metrics.clone(),
1644                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1645                // cyclic dependency w/ iota-adapter
1646                self.config
1647                    .expensive_safety_check_config
1648                    .enable_deep_per_tx_iota_conservation_check(),
1649                self.config.certificate_deny_config.certificate_deny_set(),
1650                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1651                epoch_store
1652                    .epoch_start_config()
1653                    .epoch_data()
1654                    .epoch_start_timestamp(),
1655                input_objects,
1656                gas,
1657                gas_status,
1658                kind,
1659                signer,
1660                tx_digest,
1661                &mut None,
1662            );
1663
1664        fail_point_if!("cp_execution_nondeterminism", || {
1665            #[cfg(msim)]
1666            self.create_fail_state(certificate, epoch_store, &mut effects);
1667        });
1668
1669        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1670        if elapsed > 0.0 {
1671            self.metrics
1672                .prepare_cert_gas_latency_ratio
1673                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1674        }
1675
1676        Ok((inner_temp_store, effects, execution_error_opt.err()))
1677    }
1678
1679    pub fn prepare_certificate_for_benchmark(
1680        &self,
1681        certificate: &VerifiedExecutableTransaction,
1682        input_objects: InputObjects,
1683        epoch_store: &Arc<AuthorityPerEpochStore>,
1684    ) -> IotaResult<(
1685        InnerTemporaryStore,
1686        TransactionEffects,
1687        Option<ExecutionError>,
1688    )> {
1689        let lock = RwLock::new(epoch_store.epoch());
1690        let execution_guard = lock.try_read().unwrap();
1691
1692        self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1693    }
1694
1695    #[allow(clippy::type_complexity)]
1696    pub fn dry_exec_transaction(
1697        &self,
1698        transaction: TransactionData,
1699        transaction_digest: TransactionDigest,
1700    ) -> IotaResult<(
1701        DryRunTransactionBlockResponse,
1702        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1703        TransactionEffects,
1704        Option<ObjectID>,
1705    )> {
1706        let epoch_store = self.load_epoch_store_one_call_per_task();
1707        if !self.is_fullnode(&epoch_store) {
1708            return Err(IotaError::UnsupportedFeature {
1709                error: "dry-exec is only supported on fullnodes".to_string(),
1710            });
1711        }
1712
1713        if transaction.kind().is_system_tx() {
1714            return Err(IotaError::UnsupportedFeature {
1715                error: "dry-exec does not support system transactions".to_string(),
1716            });
1717        }
1718
1719        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1720    }
1721
1722    #[allow(clippy::type_complexity)]
1723    pub fn dry_exec_transaction_for_benchmark(
1724        &self,
1725        transaction: TransactionData,
1726        transaction_digest: TransactionDigest,
1727    ) -> IotaResult<(
1728        DryRunTransactionBlockResponse,
1729        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1730        TransactionEffects,
1731        Option<ObjectID>,
1732    )> {
1733        let epoch_store = self.load_epoch_store_one_call_per_task();
1734        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1735    }
1736
1737    #[allow(clippy::type_complexity)]
1738    fn dry_exec_transaction_impl(
1739        &self,
1740        epoch_store: &AuthorityPerEpochStore,
1741        transaction: TransactionData,
1742        transaction_digest: TransactionDigest,
1743    ) -> IotaResult<(
1744        DryRunTransactionBlockResponse,
1745        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1746        TransactionEffects,
1747        Option<ObjectID>,
1748    )> {
1749        // Cheap validity checks for a transaction, including input size limits.
1750        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1751
1752        let input_object_kinds = transaction.input_objects()?;
1753        let receiving_object_refs = transaction.receiving_objects();
1754
1755        iota_transaction_checks::deny::check_transaction_for_signing(
1756            &transaction,
1757            &[],
1758            &input_object_kinds,
1759            &receiving_object_refs,
1760            &self.config.transaction_deny_config,
1761            self.get_backing_package_store().as_ref(),
1762        )?;
1763
1764        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1765            // We don't want to cache this transaction since it's a dry run.
1766            None,
1767            &input_object_kinds,
1768            &receiving_object_refs,
1769            epoch_store.epoch(),
1770        )?;
1771
1772        // make a gas object if one was not provided
1773        let mut transaction = transaction;
1774        let mut gas_object_refs = transaction.gas().to_vec();
1775        let reference_gas_price = epoch_store.reference_gas_price();
1776        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1777            let sender = transaction.gas_owner();
1778            let gas_object_id = ObjectID::random();
1779            let gas_object = Object::new_move(
1780                MoveObject::new_gas_coin(
1781                    OBJECT_START_VERSION,
1782                    gas_object_id,
1783                    SIMULATION_GAS_COIN_VALUE,
1784                ),
1785                Owner::AddressOwner(sender),
1786                TransactionDigest::genesis_marker(),
1787            );
1788            let gas_object_ref = gas_object.compute_object_reference();
1789            gas_object_refs = vec![gas_object_ref];
1790            // Add gas object to transaction gas payment
1791            transaction.gas_data_mut().payment = gas_object_refs.clone();
1792            (
1793                iota_transaction_checks::check_transaction_input_with_given_gas(
1794                    epoch_store.protocol_config(),
1795                    reference_gas_price,
1796                    &transaction,
1797                    input_objects,
1798                    receiving_objects,
1799                    gas_object,
1800                    &self.metrics.bytecode_verifier_metrics,
1801                    &self.config.verifier_signing_config,
1802                )?,
1803                Some(gas_object_id),
1804            )
1805        } else {
1806            (
1807                iota_transaction_checks::check_transaction_input(
1808                    epoch_store.protocol_config(),
1809                    reference_gas_price,
1810                    &transaction,
1811                    input_objects,
1812                    &receiving_objects,
1813                    &self.metrics.bytecode_verifier_metrics,
1814                    &self.config.verifier_signing_config,
1815                )?,
1816                None,
1817            )
1818        };
1819
1820        let protocol_config = epoch_store.protocol_config();
1821        let (kind, signer, _) = transaction.execution_parts();
1822
1823        let silent = true;
1824        let executor = iota_execution::executor(protocol_config, silent, None)
1825            .expect("Creating an executor should not fail here");
1826
1827        let expensive_checks = false;
1828        let (inner_temp_store, _, effects, _execution_error) = executor
1829            .execute_transaction_to_effects(
1830                self.get_backing_store().as_ref(),
1831                protocol_config,
1832                self.metrics.limits_metrics.clone(),
1833                expensive_checks,
1834                self.config.certificate_deny_config.certificate_deny_set(),
1835                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1836                epoch_store
1837                    .epoch_start_config()
1838                    .epoch_data()
1839                    .epoch_start_timestamp(),
1840                checked_input_objects,
1841                gas_object_refs,
1842                gas_status,
1843                kind,
1844                signer,
1845                transaction_digest,
1846                &mut None,
1847            );
1848        let tx_digest = *effects.transaction_digest();
1849
1850        let module_cache =
1851            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1852
1853        let mut layout_resolver =
1854            epoch_store
1855                .executor()
1856                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1857                    &inner_temp_store,
1858                    self.get_backing_package_store(),
1859                )));
1860        // Returning empty vector here because we recalculate changes in the rpc layer.
1861        let object_changes = Vec::new();
1862
1863        // Returning empty vector here because we recalculate changes in the rpc layer.
1864        let balance_changes = Vec::new();
1865
1866        let written_with_kind = effects
1867            .created()
1868            .into_iter()
1869            .map(|(oref, _)| (oref, WriteKind::Create))
1870            .chain(
1871                effects
1872                    .unwrapped()
1873                    .into_iter()
1874                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1875            )
1876            .chain(
1877                effects
1878                    .mutated()
1879                    .into_iter()
1880                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
1881            )
1882            .map(|(oref, kind)| {
1883                let obj = inner_temp_store.written.get(&oref.0).unwrap();
1884                // TODO: Avoid clones.
1885                (oref.0, (oref, obj.clone(), kind))
1886            })
1887            .collect();
1888
1889        Ok((
1890            DryRunTransactionBlockResponse {
1891                // to avoid cloning `transaction`, fields are populated in this order
1892                suggested_gas_price: self
1893                    .congestion_tracker
1894                    .get_prediction_suggested_gas_price(&transaction),
1895                input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1896                    .map_err(|e| IotaError::TransactionSerialization {
1897                        error: format!(
1898                            "Failed to convert transaction to IotaTransactionBlockData: {e}",
1899                        ),
1900                    })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
1901                effects: effects.clone().try_into()?,
1902                events: IotaTransactionBlockEvents::try_from(
1903                    inner_temp_store.events.clone(),
1904                    tx_digest,
1905                    None,
1906                    layout_resolver.as_mut(),
1907                )?,
1908                object_changes,
1909                balance_changes,
1910            },
1911            written_with_kind,
1912            effects,
1913            mock_gas,
1914        ))
1915    }
1916
1917    pub fn simulate_transaction(
1918        &self,
1919        transaction: TransactionData,
1920    ) -> IotaResult<SimulateTransactionResult> {
1921        if transaction.kind().is_system_tx() {
1922            return Err(IotaError::UnsupportedFeature {
1923                error: "simulate does not support system transactions".to_string(),
1924            });
1925        }
1926
1927        let epoch_store = self.load_epoch_store_one_call_per_task();
1928        if !self.is_fullnode(&epoch_store) {
1929            return Err(IotaError::UnsupportedFeature {
1930                error: "simulate is only supported on fullnodes".to_string(),
1931            });
1932        }
1933
1934        self.simulate_transaction_impl(&epoch_store, transaction)
1935    }
1936
1937    fn simulate_transaction_impl(
1938        &self,
1939        epoch_store: &AuthorityPerEpochStore,
1940        transaction: TransactionData,
1941    ) -> IotaResult<SimulateTransactionResult> {
1942        // Cheap validity checks for a transaction, including input size limits.
1943        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1944
1945        let input_object_kinds = transaction.input_objects()?;
1946        let receiving_object_refs = transaction.receiving_objects();
1947
1948        iota_transaction_checks::deny::check_transaction_for_signing(
1949            &transaction,
1950            &[],
1951            &input_object_kinds,
1952            &receiving_object_refs,
1953            &self.config.transaction_deny_config,
1954            self.get_backing_package_store().as_ref(),
1955        )?;
1956
1957        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1958            // We don't want to cache this transaction since it's a dry run.
1959            None,
1960            &input_object_kinds,
1961            &receiving_object_refs,
1962            epoch_store.epoch(),
1963        )?;
1964        // make a gas object if one was not provided
1965        let mut transaction = transaction;
1966        let mut gas_object_refs = transaction.gas().to_vec();
1967        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1968            let sender = transaction.gas_owner();
1969            let gas_object_id = ObjectID::MAX;
1970            let gas_object = Object::new_move(
1971                MoveObject::new_gas_coin(
1972                    OBJECT_START_VERSION,
1973                    gas_object_id,
1974                    SIMULATION_GAS_COIN_VALUE,
1975                ),
1976                Owner::AddressOwner(sender),
1977                TransactionDigest::genesis_marker(),
1978            );
1979            let gas_object_ref = gas_object.compute_object_reference();
1980            gas_object_refs = vec![gas_object_ref];
1981
1982            // Add the gas object to the transaction payment.
1983            transaction.gas_data_mut().payment = gas_object_refs.clone();
1984            (
1985                iota_transaction_checks::check_transaction_input_with_given_gas(
1986                    epoch_store.protocol_config(),
1987                    epoch_store.reference_gas_price(),
1988                    &transaction,
1989                    input_objects,
1990                    receiving_objects,
1991                    gas_object,
1992                    &self.metrics.bytecode_verifier_metrics,
1993                    &self.config.verifier_signing_config,
1994                )?,
1995                Some(gas_object_id),
1996            )
1997        } else {
1998            (
1999                iota_transaction_checks::check_transaction_input(
2000                    epoch_store.protocol_config(),
2001                    epoch_store.reference_gas_price(),
2002                    &transaction,
2003                    input_objects,
2004                    &receiving_objects,
2005                    &self.metrics.bytecode_verifier_metrics,
2006                    &self.config.verifier_signing_config,
2007                )?,
2008                None,
2009            )
2010        };
2011
2012        let protocol_config = epoch_store.protocol_config();
2013        let (kind, signer, _) = transaction.execution_parts();
2014
2015        let silent = true;
2016        let executor = iota_execution::executor(protocol_config, silent, None)
2017            .expect("Creating an executor should not fail here");
2018
2019        let expensive_checks = false;
2020        let (inner_temp_store, _, effects, _execution_error) = executor
2021            .execute_transaction_to_effects(
2022                self.get_backing_store().as_ref(),
2023                protocol_config,
2024                self.metrics.limits_metrics.clone(),
2025                expensive_checks,
2026                self.config.certificate_deny_config.certificate_deny_set(),
2027                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2028                epoch_store
2029                    .epoch_start_config()
2030                    .epoch_data()
2031                    .epoch_start_timestamp(),
2032                checked_input_objects,
2033                gas_object_refs,
2034                gas_status,
2035                kind,
2036                signer,
2037                transaction.digest(),
2038                &mut None,
2039            );
2040
2041        Ok(SimulateTransactionResult {
2042            input_objects: inner_temp_store.input_objects,
2043            output_objects: inner_temp_store.written,
2044            events: effects.events_digest().map(|_| inner_temp_store.events),
2045            effects,
2046            mock_gas_id: mock_gas,
2047        })
2048    }
2049
2050    /// The object ID for gas can be any object ID, even for an uncreated object
2051    pub async fn dev_inspect_transaction_block(
2052        &self,
2053        sender: IotaAddress,
2054        transaction_kind: TransactionKind,
2055        gas_price: Option<u64>,
2056        gas_budget: Option<u64>,
2057        gas_sponsor: Option<IotaAddress>,
2058        gas_objects: Option<Vec<ObjectRef>>,
2059        show_raw_txn_data_and_effects: Option<bool>,
2060        skip_checks: Option<bool>,
2061    ) -> IotaResult<DevInspectResults> {
2062        let epoch_store = self.load_epoch_store_one_call_per_task();
2063
2064        if !self.is_fullnode(&epoch_store) {
2065            return Err(IotaError::UnsupportedFeature {
2066                error: "dev-inspect is only supported on fullnodes".to_string(),
2067            });
2068        }
2069
2070        if transaction_kind.is_system_tx() {
2071            return Err(IotaError::UnsupportedFeature {
2072                error: "system transactions are not supported".to_string(),
2073            });
2074        }
2075
2076        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2077        let skip_checks = skip_checks.unwrap_or(true);
2078        let reference_gas_price = epoch_store.reference_gas_price();
2079        let protocol_config = epoch_store.protocol_config();
2080        let max_tx_gas = protocol_config.max_tx_gas();
2081
2082        let price = gas_price.unwrap_or(reference_gas_price);
2083        let budget = gas_budget.unwrap_or(max_tx_gas);
2084        let owner = gas_sponsor.unwrap_or(sender);
2085        // Payment might be empty here, but it's fine we'll have to deal with it later
2086        // after reading all the input objects.
2087        let payment = gas_objects.unwrap_or_default();
2088        let transaction = TransactionData::V1(TransactionDataV1 {
2089            kind: transaction_kind.clone(),
2090            sender,
2091            gas_data: GasData {
2092                payment,
2093                owner,
2094                price,
2095                budget,
2096            },
2097            expiration: TransactionExpiration::None,
2098        });
2099
2100        let raw_txn_data = if show_raw_txn_data_and_effects {
2101            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2102                error: "Failed to serialize transaction during dev inspect".to_string(),
2103            })?
2104        } else {
2105            vec![]
2106        };
2107
2108        transaction.validity_check_no_gas_check(protocol_config)?;
2109
2110        let input_object_kinds = transaction.input_objects()?;
2111        let receiving_object_refs = transaction.receiving_objects();
2112
2113        iota_transaction_checks::deny::check_transaction_for_signing(
2114            &transaction,
2115            &[],
2116            &input_object_kinds,
2117            &receiving_object_refs,
2118            &self.config.transaction_deny_config,
2119            self.get_backing_package_store().as_ref(),
2120        )?;
2121
2122        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2123            // We don't want to cache this transaction since it's a dev inspect.
2124            None,
2125            &input_object_kinds,
2126            &receiving_object_refs,
2127            epoch_store.epoch(),
2128        )?;
2129
2130        // Create and use a dummy gas object if there is no gas object provided.
2131        let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2132            SIMULATION_GAS_COIN_VALUE,
2133            transaction.gas_owner(),
2134        );
2135
2136        let gas_objects = if transaction.gas().is_empty() {
2137            let gas_object_ref = dummy_gas_object.compute_object_reference();
2138            vec![gas_object_ref]
2139        } else {
2140            transaction.gas().to_vec()
2141        };
2142
2143        let (gas_status, checked_input_objects) = if skip_checks {
2144            // If we are skipping checks, then we call the check_dev_inspect_input function
2145            // which will perform only lightweight checks on the transaction
2146            // input. And if the gas field is empty, that means we will
2147            // use the dummy gas object so we need to add it to the input objects vector.
2148            if transaction.gas().is_empty() {
2149                input_objects.push(ObjectReadResult::new(
2150                    InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2151                    dummy_gas_object.into(),
2152                ));
2153            }
2154            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2155                protocol_config,
2156                &transaction_kind,
2157                input_objects,
2158                receiving_objects,
2159            )?;
2160            let gas_status = IotaGasStatus::new(
2161                max_tx_gas,
2162                transaction.gas_price(),
2163                reference_gas_price,
2164                protocol_config,
2165            )?;
2166
2167            (gas_status, checked_input_objects)
2168        } else {
2169            // If we are not skipping checks, then we call the check_transaction_input
2170            // function and its dummy gas variant which will perform full
2171            // fledged checks just like a real transaction execution.
2172            if transaction.gas().is_empty() {
2173                iota_transaction_checks::check_transaction_input_with_given_gas(
2174                    epoch_store.protocol_config(),
2175                    reference_gas_price,
2176                    &transaction,
2177                    input_objects,
2178                    receiving_objects,
2179                    dummy_gas_object,
2180                    &self.metrics.bytecode_verifier_metrics,
2181                    &self.config.verifier_signing_config,
2182                )?
2183            } else {
2184                iota_transaction_checks::check_transaction_input(
2185                    epoch_store.protocol_config(),
2186                    reference_gas_price,
2187                    &transaction,
2188                    input_objects,
2189                    &receiving_objects,
2190                    &self.metrics.bytecode_verifier_metrics,
2191                    &self.config.verifier_signing_config,
2192                )?
2193            }
2194        };
2195
2196        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2197            .expect("Creating an executor should not fail here");
2198        let intent_msg = IntentMessage::new(
2199            Intent {
2200                version: IntentVersion::V0,
2201                scope: IntentScope::TransactionData,
2202                app_id: AppId::Iota,
2203            },
2204            transaction,
2205        );
2206        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2207        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2208            self.get_backing_store().as_ref(),
2209            protocol_config,
2210            self.metrics.limits_metrics.clone(),
2211            // expensive checks
2212            false,
2213            self.config.certificate_deny_config.certificate_deny_set(),
2214            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2215            epoch_store
2216                .epoch_start_config()
2217                .epoch_data()
2218                .epoch_start_timestamp(),
2219            checked_input_objects,
2220            gas_objects,
2221            gas_status,
2222            transaction_kind,
2223            sender,
2224            transaction_digest,
2225            skip_checks,
2226        );
2227
2228        let raw_effects = if show_raw_txn_data_and_effects {
2229            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2230                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2231            })?
2232        } else {
2233            vec![]
2234        };
2235
2236        let mut layout_resolver =
2237            epoch_store
2238                .executor()
2239                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2240                    &inner_temp_store,
2241                    self.get_backing_package_store(),
2242                )));
2243
2244        DevInspectResults::new(
2245            effects,
2246            inner_temp_store.events.clone(),
2247            execution_result,
2248            raw_txn_data,
2249            raw_effects,
2250            layout_resolver.as_mut(),
2251        )
2252    }
2253
2254    // Only used for testing because of how epoch store is loaded.
2255    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2256        let epoch_store = self.epoch_store_for_testing();
2257        Ok(epoch_store.reference_gas_price())
2258    }
2259
2260    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2261        self.get_transaction_cache_reader()
2262            .try_is_tx_already_executed(digest)
2263    }
2264
2265    /// Non-fallible version of `try_is_tx_already_executed`.
2266    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2267        self.try_is_tx_already_executed(digest)
2268            .expect("storage access failed")
2269    }
2270
2271    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2272    #[instrument(level = "debug", skip_all, err)]
2273    fn index_tx(
2274        &self,
2275        indexes: &IndexStore,
2276        digest: &TransactionDigest,
2277        // TODO: index_tx really just need the transaction data here.
2278        cert: &VerifiedExecutableTransaction,
2279        effects: &TransactionEffects,
2280        events: &TransactionEvents,
2281        timestamp_ms: u64,
2282        tx_coins: Option<TxCoins>,
2283        written: &WrittenObjects,
2284        inner_temporary_store: &InnerTemporaryStore,
2285    ) -> IotaResult<u64> {
2286        let changes = self
2287            .process_object_index(effects, written, inner_temporary_store)
2288            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2289
2290        indexes.index_tx(
2291            cert.data().intent_message().value.sender(),
2292            cert.data()
2293                .intent_message()
2294                .value
2295                .input_objects()?
2296                .iter()
2297                .map(|o| o.object_id()),
2298            effects
2299                .all_changed_objects()
2300                .into_iter()
2301                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2302            cert.data()
2303                .intent_message()
2304                .value
2305                .move_calls()
2306                .into_iter()
2307                .map(|(package, module, function)| {
2308                    (*package, module.to_owned(), function.to_owned())
2309                }),
2310            events,
2311            changes,
2312            digest,
2313            timestamp_ms,
2314            tx_coins,
2315        )
2316    }
2317
2318    #[cfg(msim)]
2319    fn create_fail_state(
2320        &self,
2321        certificate: &VerifiedExecutableTransaction,
2322        epoch_store: &Arc<AuthorityPerEpochStore>,
2323        effects: &mut TransactionEffects,
2324    ) {
2325        use std::cell::RefCell;
2326        thread_local! {
2327            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2328        }
2329        if !certificate.data().intent_message().value.is_system_tx() {
2330            let committee = epoch_store.committee();
2331            let cur_stake = (**committee).weight(&self.name);
2332            if cur_stake > 0 {
2333                FAIL_STATE.with_borrow_mut(|fail_state| {
2334                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2335                    if fail_state.0 < committee.validity_threshold() {
2336                        fail_state.0 += cur_stake;
2337                        fail_state.1.insert(self.name);
2338                    }
2339
2340                    if fail_state.1.contains(&self.name) {
2341                        info!("cp_exec failing tx");
2342                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2343                    }
2344                });
2345            }
2346        }
2347    }
2348
2349    fn process_object_index(
2350        &self,
2351        effects: &TransactionEffects,
2352        written: &WrittenObjects,
2353        inner_temporary_store: &InnerTemporaryStore,
2354    ) -> IotaResult<ObjectIndexChanges> {
2355        let epoch_store = self.load_epoch_store_one_call_per_task();
2356        let mut layout_resolver =
2357            epoch_store
2358                .executor()
2359                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2360                    inner_temporary_store,
2361                    self.get_backing_package_store(),
2362                )));
2363
2364        let modified_at_version = effects
2365            .modified_at_versions()
2366            .into_iter()
2367            .collect::<HashMap<_, _>>();
2368
2369        let tx_digest = effects.transaction_digest();
2370        let mut deleted_owners = vec![];
2371        let mut deleted_dynamic_fields = vec![];
2372        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2373            let old_version = modified_at_version.get(&id).unwrap();
2374            // When we process the index, the latest object hasn't been written yet so
2375            // the old object must be present.
2376            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2377                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2378            ) {
2379                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2380                Owner::ObjectOwner(object_id) => {
2381                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2382                }
2383                _ => {}
2384            }
2385        }
2386
2387        let mut new_owners = vec![];
2388        let mut new_dynamic_fields = vec![];
2389
2390        for (oref, owner, kind) in effects.all_changed_objects() {
2391            let id = &oref.0;
2392            // For mutated objects, retrieve old owner and delete old index if there is a
2393            // owner change.
2394            if let WriteKind::Mutate = kind {
2395                let Some(old_version) = modified_at_version.get(id) else {
2396                    panic!(
2397                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2398                    );
2399                };
2400                // When we process the index, the latest object hasn't been written yet so
2401                // the old object must be present.
2402                let Some(old_object) = self
2403                    .get_object_store()
2404                    .try_get_object_by_key(id, *old_version)?
2405                else {
2406                    panic!(
2407                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2408                    );
2409                };
2410                if old_object.owner != owner {
2411                    match old_object.owner {
2412                        Owner::AddressOwner(addr) => {
2413                            deleted_owners.push((addr, *id));
2414                        }
2415                        Owner::ObjectOwner(object_id) => {
2416                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2417                        }
2418                        _ => {}
2419                    }
2420                }
2421            }
2422
2423            match owner {
2424                Owner::AddressOwner(addr) => {
2425                    // TODO: We can remove the object fetching after we added ObjectType to
2426                    // TransactionEffects
2427                    let new_object = written.get(id).unwrap_or_else(
2428                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2429                    );
2430                    assert_eq!(
2431                        new_object.version(),
2432                        oref.1,
2433                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2434                        tx_digest,
2435                        id,
2436                        new_object.version(),
2437                        oref.1
2438                    );
2439
2440                    let type_ = new_object
2441                        .type_()
2442                        .map(|type_| ObjectType::Struct(type_.clone()))
2443                        .unwrap_or(ObjectType::Package);
2444
2445                    new_owners.push((
2446                        (addr, *id),
2447                        ObjectInfo {
2448                            object_id: *id,
2449                            version: oref.1,
2450                            digest: oref.2,
2451                            type_,
2452                            owner,
2453                            previous_transaction: *effects.transaction_digest(),
2454                        },
2455                    ));
2456                }
2457                Owner::ObjectOwner(owner) => {
2458                    let new_object = written.get(id).unwrap_or_else(
2459                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2460                    );
2461                    assert_eq!(
2462                        new_object.version(),
2463                        oref.1,
2464                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2465                        tx_digest,
2466                        id,
2467                        new_object.version(),
2468                        oref.1
2469                    );
2470
2471                    let Some(df_info) = self
2472                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2473                        .unwrap_or_else(|e| {
2474                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2475                            None
2476                        }
2477                    )
2478                        else {
2479                            // Skip indexing for non dynamic field objects.
2480                            continue;
2481                        };
2482                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2483                }
2484                _ => {}
2485            }
2486        }
2487
2488        Ok(ObjectIndexChanges {
2489            deleted_owners,
2490            deleted_dynamic_fields,
2491            new_owners,
2492            new_dynamic_fields,
2493        })
2494    }
2495
2496    fn try_create_dynamic_field_info(
2497        &self,
2498        o: &Object,
2499        written: &WrittenObjects,
2500        resolver: &mut dyn LayoutResolver,
2501        get_latest_object_version: bool,
2502    ) -> IotaResult<Option<DynamicFieldInfo>> {
2503        // Skip if not a move object
2504        let Some(move_object) = o.data.try_as_move().cloned() else {
2505            return Ok(None);
2506        };
2507
2508        // We only index dynamic field objects
2509        if !move_object.type_().is_dynamic_field() {
2510            return Ok(None);
2511        }
2512
2513        let layout = resolver
2514            .get_annotated_layout(&move_object.type_().clone().into())?
2515            .into_layout();
2516
2517        let field =
2518            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2519                IotaError::ObjectDeserialization {
2520                    error: e.to_string(),
2521                }
2522            })?;
2523
2524        let type_ = field.kind;
2525        let name_type: TypeTag = field.name_layout.into();
2526        let bcs_name = field.name_bytes.to_owned();
2527
2528        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2529            .map_err(|e| {
2530                warn!("{e}");
2531                IotaError::ObjectDeserialization {
2532                    error: e.to_string(),
2533                }
2534            })?;
2535
2536        let name = DynamicFieldName {
2537            type_: name_type,
2538            value: IotaMoveValue::from(name_value).to_json_value(),
2539        };
2540
2541        let value_metadata = field.value_metadata().map_err(|e| {
2542            warn!("{e}");
2543            IotaError::ObjectDeserialization {
2544                error: e.to_string(),
2545            }
2546        })?;
2547
2548        Ok(Some(match value_metadata {
2549            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2550                name,
2551                bcs_name,
2552                type_,
2553                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2554                object_id: o.id(),
2555                version: o.version(),
2556                digest: o.digest(),
2557            },
2558
2559            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2560                // Find the actual object from storage using the object id obtained from the
2561                // wrapper.
2562
2563                // Try to find the object in the written objects first.
2564                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2565                    (
2566                        object.version(),
2567                        object.digest(),
2568                        object.data.type_().unwrap().clone(),
2569                    )
2570                } else {
2571                    // If not found, try to find it in the database.
2572                    let object = if get_latest_object_version {
2573                        // Loading genesis object could meet a condition that the version of the
2574                        // genesis object is behind the one in the snapshot.
2575                        // In this case, the object can not be found with get_object_by_key, we need
2576                        // to use get_object instead. Since get_object is a heavier operation, we
2577                        // only allow to use it for genesis.
2578                        // reference: https://github.com/iotaledger/iota/issues/7267
2579                        self.get_object_store()
2580                            .try_get_object(&object_id)?
2581                            .ok_or_else(|| UserInputError::ObjectNotFound {
2582                                object_id,
2583                                version: Some(o.version()),
2584                            })?
2585                    } else {
2586                        // Non-genesis object should be in the database with the given version.
2587                        self.get_object_store()
2588                            .try_get_object_by_key(&object_id, o.version())?
2589                            .ok_or_else(|| UserInputError::ObjectNotFound {
2590                                object_id,
2591                                version: Some(o.version()),
2592                            })?
2593                    };
2594
2595                    (
2596                        object.version(),
2597                        object.digest(),
2598                        object.data.type_().unwrap().clone(),
2599                    )
2600                };
2601
2602                DynamicFieldInfo {
2603                    name,
2604                    bcs_name,
2605                    type_,
2606                    object_type: object_type.to_string(),
2607                    object_id,
2608                    version,
2609                    digest,
2610                }
2611            }
2612        }))
2613    }
2614
2615    #[instrument(level = "trace", skip_all, err)]
2616    fn post_process_one_tx(
2617        &self,
2618        certificate: &VerifiedExecutableTransaction,
2619        effects: &TransactionEffects,
2620        inner_temporary_store: &InnerTemporaryStore,
2621        epoch_store: &Arc<AuthorityPerEpochStore>,
2622    ) -> IotaResult {
2623        if self.indexes.is_none() {
2624            return Ok(());
2625        }
2626
2627        let tx_digest = certificate.digest();
2628        let timestamp_ms = Self::unixtime_now_ms();
2629        let events = &inner_temporary_store.events;
2630        let written = &inner_temporary_store.written;
2631        let tx_coins =
2632            self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2633
2634        // Index tx
2635        if let Some(indexes) = &self.indexes {
2636            let _ = self
2637                .index_tx(
2638                    indexes.as_ref(),
2639                    tx_digest,
2640                    certificate,
2641                    effects,
2642                    events,
2643                    timestamp_ms,
2644                    tx_coins,
2645                    written,
2646                    inner_temporary_store,
2647                )
2648                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2649                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2650                .expect("Indexing tx should not fail");
2651
2652            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2653            let events = self.make_transaction_block_events(
2654                events.clone(),
2655                *tx_digest,
2656                timestamp_ms,
2657                epoch_store,
2658                inner_temporary_store,
2659            )?;
2660            // Emit events
2661            self.subscription_handler
2662                .process_tx(certificate.data().transaction_data(), &effects, &events)
2663                .tap_ok(|_| {
2664                    self.metrics
2665                        .post_processing_total_tx_had_event_processed
2666                        .inc()
2667                })
2668                .tap_err(|e| {
2669                    warn!(
2670                        ?tx_digest,
2671                        "Post processing - Couldn't process events for tx: {}", e
2672                    )
2673                })?;
2674
2675            self.metrics
2676                .post_processing_total_events_emitted
2677                .inc_by(events.data.len() as u64);
2678        };
2679        Ok(())
2680    }
2681
2682    fn make_transaction_block_events(
2683        &self,
2684        transaction_events: TransactionEvents,
2685        digest: TransactionDigest,
2686        timestamp_ms: u64,
2687        epoch_store: &Arc<AuthorityPerEpochStore>,
2688        inner_temporary_store: &InnerTemporaryStore,
2689    ) -> IotaResult<IotaTransactionBlockEvents> {
2690        let mut layout_resolver =
2691            epoch_store
2692                .executor()
2693                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2694                    inner_temporary_store,
2695                    self.get_backing_package_store(),
2696                )));
2697        IotaTransactionBlockEvents::try_from(
2698            transaction_events,
2699            digest,
2700            Some(timestamp_ms),
2701            layout_resolver.as_mut(),
2702        )
2703    }
2704
2705    pub fn unixtime_now_ms() -> u64 {
2706        let now = SystemTime::now()
2707            .duration_since(UNIX_EPOCH)
2708            .expect("Time went backwards")
2709            .as_millis();
2710        u64::try_from(now).expect("Travelling in time machine")
2711    }
2712
2713    #[instrument(level = "trace", skip_all)]
2714    pub async fn handle_transaction_info_request(
2715        &self,
2716        request: TransactionInfoRequest,
2717    ) -> IotaResult<TransactionInfoResponse> {
2718        let epoch_store = self.load_epoch_store_one_call_per_task();
2719        let (transaction, status) = self
2720            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2721            .ok_or(IotaError::TransactionNotFound {
2722                digest: request.transaction_digest,
2723            })?;
2724        Ok(TransactionInfoResponse {
2725            transaction,
2726            status,
2727        })
2728    }
2729
2730    #[instrument(level = "trace", skip_all)]
2731    pub async fn handle_object_info_request(
2732        &self,
2733        request: ObjectInfoRequest,
2734    ) -> IotaResult<ObjectInfoResponse> {
2735        let epoch_store = self.load_epoch_store_one_call_per_task();
2736
2737        let requested_object_seq = match request.request_kind {
2738            ObjectInfoRequestKind::LatestObjectInfo => {
2739                let (_, seq, _) = self
2740                    .try_get_object_or_tombstone(request.object_id)
2741                    .await?
2742                    .ok_or_else(|| {
2743                        IotaError::from(UserInputError::ObjectNotFound {
2744                            object_id: request.object_id,
2745                            version: None,
2746                        })
2747                    })?;
2748                seq
2749            }
2750            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2751        };
2752
2753        let object = self
2754            .get_object_store()
2755            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2756            .ok_or_else(|| {
2757                IotaError::from(UserInputError::ObjectNotFound {
2758                    object_id: request.object_id,
2759                    version: Some(requested_object_seq),
2760                })
2761            })?;
2762
2763        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2764            (request.generate_layout, object.data.try_as_move())
2765        {
2766            Some(into_struct_layout(
2767                self.load_epoch_store_one_call_per_task()
2768                    .executor()
2769                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2770                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2771            )?)
2772        } else {
2773            None
2774        };
2775
2776        let lock = if !object.is_address_owned() {
2777            // Only address owned objects have locks.
2778            None
2779        } else {
2780            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2781                .await?
2782                .map(|s| s.into_inner())
2783        };
2784
2785        Ok(ObjectInfoResponse {
2786            object,
2787            layout,
2788            lock_for_debugging: lock,
2789        })
2790    }
2791
2792    #[instrument(level = "trace", skip_all)]
2793    pub fn handle_checkpoint_request(
2794        &self,
2795        request: &CheckpointRequest,
2796    ) -> IotaResult<CheckpointResponse> {
2797        let summary = if request.certified {
2798            let summary = match request.sequence_number {
2799                Some(seq) => self
2800                    .checkpoint_store
2801                    .get_checkpoint_by_sequence_number(seq)?,
2802                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2803            }
2804            .map(|v| v.into_inner());
2805            summary.map(CheckpointSummaryResponse::Certified)
2806        } else {
2807            let summary = match request.sequence_number {
2808                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2809                None => self
2810                    .checkpoint_store
2811                    .get_latest_locally_computed_checkpoint()?,
2812            };
2813            summary.map(CheckpointSummaryResponse::Pending)
2814        };
2815        let contents = match &summary {
2816            Some(s) => self
2817                .checkpoint_store
2818                .get_checkpoint_contents(&s.content_digest())?,
2819            None => None,
2820        };
2821        Ok(CheckpointResponse {
2822            checkpoint: summary,
2823            contents,
2824        })
2825    }
2826
2827    fn check_protocol_version(
2828        supported_protocol_versions: SupportedProtocolVersions,
2829        current_version: ProtocolVersion,
2830    ) {
2831        info!("current protocol version is now {:?}", current_version);
2832        info!("supported versions are: {:?}", supported_protocol_versions);
2833        if !supported_protocol_versions.is_version_supported(current_version) {
2834            let msg = format!(
2835                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2836            );
2837
2838            error!("{}", msg);
2839            eprintln!("{msg}");
2840
2841            #[cfg(not(msim))]
2842            std::process::exit(1);
2843
2844            #[cfg(msim)]
2845            iota_simulator::task::shutdown_current_node();
2846        }
2847    }
2848
2849    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
2850    pub async fn new(
2851        name: AuthorityName,
2852        secret: StableSyncAuthoritySigner,
2853        supported_protocol_versions: SupportedProtocolVersions,
2854        store: Arc<AuthorityStore>,
2855        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2856        epoch_store: Arc<AuthorityPerEpochStore>,
2857        committee_store: Arc<CommitteeStore>,
2858        indexes: Option<Arc<IndexStore>>,
2859        rest_index: Option<Arc<RestIndexStore>>,
2860        checkpoint_store: Arc<CheckpointStore>,
2861        prometheus_registry: &Registry,
2862        genesis_objects: &[Object],
2863        db_checkpoint_config: &DBCheckpointConfig,
2864        config: NodeConfig,
2865        archive_readers: ArchiveReaderBalancer,
2866        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2867        chain_identifier: ChainIdentifier,
2868        pruner_db: Option<Arc<AuthorityPrunerTables>>,
2869    ) -> Arc<Self> {
2870        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2871
2872        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2873        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2874        let transaction_manager = Arc::new(TransactionManager::new(
2875            execution_cache_trait_pointers.object_cache_reader.clone(),
2876            execution_cache_trait_pointers
2877                .transaction_cache_reader
2878                .clone(),
2879            &epoch_store,
2880            tx_ready_certificates,
2881            metrics.clone(),
2882        ));
2883        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2884
2885        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2886            epoch_store.get_parent_path(),
2887            &config.authority_store_pruning_config,
2888        );
2889        let _pruner = AuthorityStorePruner::new(
2890            store.perpetual_tables.clone(),
2891            checkpoint_store.clone(),
2892            rest_index.clone(),
2893            config.authority_store_pruning_config.clone(),
2894            epoch_store.committee().authority_exists(&name),
2895            epoch_store.epoch_start_state().epoch_duration_ms(),
2896            prometheus_registry,
2897            archive_readers,
2898            pruner_db,
2899        );
2900        let input_loader =
2901            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2902        let epoch = epoch_store.epoch();
2903        let rgp = epoch_store.reference_gas_price();
2904        let state = Arc::new(AuthorityState {
2905            name,
2906            secret,
2907            execution_lock: RwLock::new(epoch),
2908            epoch_store: ArcSwap::new(epoch_store.clone()),
2909            input_loader,
2910            execution_cache_trait_pointers,
2911            indexes,
2912            rest_index,
2913            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2914            checkpoint_store,
2915            committee_store,
2916            transaction_manager,
2917            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2918            metrics,
2919            _pruner,
2920            _authority_per_epoch_pruner,
2921            db_checkpoint_config: db_checkpoint_config.clone(),
2922            config,
2923            overload_info: AuthorityOverloadInfo::default(),
2924            validator_tx_finalizer,
2925            chain_identifier,
2926            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
2927        });
2928
2929        // Start a task to execute ready certificates.
2930        let authority_state = Arc::downgrade(&state);
2931        spawn_monitored_task!(execution_process(
2932            authority_state,
2933            rx_ready_certificates,
2934            rx_execution_shutdown,
2935        ));
2936
2937        // TODO: This doesn't belong to the constructor of AuthorityState.
2938        state
2939            .create_owner_index_if_empty(genesis_objects, &epoch_store)
2940            .expect("Error indexing genesis objects.");
2941
2942        state
2943    }
2944
2945    // TODO: Consolidate our traits to reduce the number of methods here.
2946    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2947        &self.execution_cache_trait_pointers.object_cache_reader
2948    }
2949
2950    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2951        &self.execution_cache_trait_pointers.transaction_cache_reader
2952    }
2953
2954    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2955        &self.execution_cache_trait_pointers.cache_writer
2956    }
2957
2958    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2959        &self.execution_cache_trait_pointers.backing_store
2960    }
2961
2962    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2963        &self.execution_cache_trait_pointers.backing_package_store
2964    }
2965
2966    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2967        &self.execution_cache_trait_pointers.object_store
2968    }
2969
2970    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2971        &self.execution_cache_trait_pointers.reconfig_api
2972    }
2973
2974    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2975        &self.execution_cache_trait_pointers.accumulator_store
2976    }
2977
2978    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2979        &self.execution_cache_trait_pointers.checkpoint_cache
2980    }
2981
2982    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2983        &self.execution_cache_trait_pointers.state_sync_store
2984    }
2985
2986    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2987        &self.execution_cache_trait_pointers.cache_commit
2988    }
2989
2990    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2991        self.execution_cache_trait_pointers
2992            .testing_api
2993            .database_for_testing()
2994    }
2995
2996    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2997        &self,
2998        config: NodeConfig,
2999        metrics: Arc<AuthorityStorePruningMetrics>,
3000    ) -> anyhow::Result<()> {
3001        let archive_readers =
3002            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3003        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3004            &self.database_for_testing().perpetual_tables,
3005            &self.checkpoint_store,
3006            self.rest_index.as_deref(),
3007            None,
3008            config.authority_store_pruning_config,
3009            metrics,
3010            archive_readers,
3011            EPOCH_DURATION_MS_FOR_TESTING,
3012        )
3013        .await
3014    }
3015
3016    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3017        &self.transaction_manager
3018    }
3019
3020    /// Adds transactions / certificates to transaction manager for ordered
3021    /// execution.
3022    pub fn enqueue_transactions_for_execution(
3023        &self,
3024        txns: Vec<VerifiedExecutableTransaction>,
3025        epoch_store: &Arc<AuthorityPerEpochStore>,
3026    ) {
3027        self.transaction_manager.enqueue(txns, epoch_store)
3028    }
3029
3030    /// Adds certificates to transaction manager for ordered execution.
3031    pub fn enqueue_certificates_for_execution(
3032        &self,
3033        certs: Vec<VerifiedCertificate>,
3034        epoch_store: &Arc<AuthorityPerEpochStore>,
3035    ) {
3036        self.transaction_manager
3037            .enqueue_certificates(certs, epoch_store)
3038    }
3039
3040    pub fn enqueue_with_expected_effects_digest(
3041        &self,
3042        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3043        epoch_store: &AuthorityPerEpochStore,
3044    ) {
3045        self.transaction_manager
3046            .enqueue_with_expected_effects_digest(certs, epoch_store)
3047    }
3048
3049    fn create_owner_index_if_empty(
3050        &self,
3051        genesis_objects: &[Object],
3052        epoch_store: &Arc<AuthorityPerEpochStore>,
3053    ) -> IotaResult {
3054        let Some(index_store) = &self.indexes else {
3055            return Ok(());
3056        };
3057        if !index_store.is_empty() {
3058            return Ok(());
3059        }
3060
3061        let mut new_owners = vec![];
3062        let mut new_dynamic_fields = vec![];
3063        let mut layout_resolver = epoch_store
3064            .executor()
3065            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3066        for o in genesis_objects.iter() {
3067            match o.owner {
3068                Owner::AddressOwner(addr) => new_owners.push((
3069                    (addr, o.id()),
3070                    ObjectInfo::new(&o.compute_object_reference(), o),
3071                )),
3072                Owner::ObjectOwner(object_id) => {
3073                    let id = o.id();
3074                    let Some(info) = self.try_create_dynamic_field_info(
3075                        o,
3076                        &BTreeMap::new(),
3077                        layout_resolver.as_mut(),
3078                        true,
3079                    )?
3080                    else {
3081                        continue;
3082                    };
3083                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3084                }
3085                _ => {}
3086            }
3087        }
3088
3089        index_store.insert_genesis_objects(ObjectIndexChanges {
3090            deleted_owners: vec![],
3091            deleted_dynamic_fields: vec![],
3092            new_owners,
3093            new_dynamic_fields,
3094        })
3095    }
3096
3097    /// Attempts to acquire execution lock for an executable transaction.
3098    /// Returns the lock if the transaction is matching current executed epoch
3099    /// Returns None otherwise
3100    pub fn execution_lock_for_executable_transaction(
3101        &self,
3102        transaction: &VerifiedExecutableTransaction,
3103    ) -> IotaResult<ExecutionLockReadGuard> {
3104        let lock = self
3105            .execution_lock
3106            .try_read()
3107            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3108        if *lock == transaction.auth_sig().epoch() {
3109            Ok(lock)
3110        } else {
3111            Err(IotaError::WrongEpoch {
3112                expected_epoch: *lock,
3113                actual_epoch: transaction.auth_sig().epoch(),
3114            })
3115        }
3116    }
3117
3118    /// Acquires the execution lock for the duration of a transaction signing
3119    /// request. This prevents reconfiguration from starting until we are
3120    /// finished handling the signing request. Otherwise, in-memory lock
3121    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3122    /// while we are attempting to acquire locks for the transaction.
3123    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard> {
3124        self.execution_lock
3125            .try_read()
3126            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3127    }
3128
3129    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard {
3130        self.execution_lock.write().await
3131    }
3132
3133    #[instrument(level = "error", skip_all)]
3134    pub async fn reconfigure(
3135        &self,
3136        cur_epoch_store: &AuthorityPerEpochStore,
3137        supported_protocol_versions: SupportedProtocolVersions,
3138        new_committee: Committee,
3139        epoch_start_configuration: EpochStartConfiguration,
3140        accumulator: Arc<StateAccumulator>,
3141        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3142        epoch_supply_change: i64,
3143        epoch_last_checkpoint: CheckpointSequenceNumber,
3144    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3145        Self::check_protocol_version(
3146            supported_protocol_versions,
3147            epoch_start_configuration
3148                .epoch_start_state()
3149                .protocol_version(),
3150        );
3151        self.metrics.reset_on_reconfigure();
3152        self.committee_store.insert_new_committee(&new_committee)?;
3153
3154        // Wait until no transactions are being executed.
3155        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3156
3157        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3158        cur_epoch_store.epoch_terminated().await;
3159
3160        let highest_locally_built_checkpoint_seq = self
3161            .checkpoint_store
3162            .get_latest_locally_computed_checkpoint()?
3163            .map(|c| *c.sequence_number())
3164            .unwrap_or(0);
3165
3166        assert!(
3167            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3168            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3169        );
3170        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3171            // if we built the last checkpoint locally (as opposed to receiving it from a
3172            // peer), then all shared_version_assignments except the one for the
3173            // ChangeEpoch transaction should have been removed
3174            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3175            // Note that while 1 is the typical value, 0 is possible if the node restarts
3176            // after committing the last checkpoint but before reconfiguring.
3177            if num_shared_version_assignments > 1 {
3178                // If this happens in prod, we have a memory leak, but not a correctness issue.
3179                debug_fatal!(
3180                    "all shared_version_assignments should have been removed \
3181                    (num_shared_version_assignments: {num_shared_version_assignments})"
3182                );
3183            }
3184        }
3185
3186        // Safe to reconfigure now. No transactions are being executed,
3187        // and no epoch-specific tasks are running.
3188
3189        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3190        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3191        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3192            .await?;
3193        self.get_reconfig_api()
3194            .clear_state_end_of_epoch(&execution_lock);
3195        self.check_system_consistency(
3196            cur_epoch_store,
3197            accumulator,
3198            expensive_safety_check_config,
3199            epoch_supply_change,
3200        )?;
3201        self.get_reconfig_api()
3202            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3203        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3204            if self
3205                .db_checkpoint_config
3206                .perform_db_checkpoints_at_epoch_end
3207            {
3208                let checkpoint_indexes = self
3209                    .db_checkpoint_config
3210                    .perform_index_db_checkpoints_at_epoch_end
3211                    .unwrap_or(false);
3212                let current_epoch = cur_epoch_store.epoch();
3213                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3214                self.checkpoint_all_dbs(
3215                    &epoch_checkpoint_path,
3216                    cur_epoch_store,
3217                    checkpoint_indexes,
3218                )?;
3219            }
3220        }
3221
3222        self.get_reconfig_api()
3223            .reconfigure_cache(&epoch_start_configuration)
3224            .await;
3225
3226        let new_epoch = new_committee.epoch;
3227        let new_epoch_store = self
3228            .reopen_epoch_db(
3229                cur_epoch_store,
3230                new_committee,
3231                epoch_start_configuration,
3232                expensive_safety_check_config,
3233                epoch_last_checkpoint,
3234            )
3235            .await?;
3236        assert_eq!(new_epoch_store.epoch(), new_epoch);
3237        self.transaction_manager.reconfigure(new_epoch);
3238        *execution_lock = new_epoch;
3239        // drop execution_lock after epoch store was updated
3240        // see also assert in AuthorityState::process_certificate
3241        // on the epoch store and execution lock epoch match
3242        Ok(new_epoch_store)
3243    }
3244
3245    /// Advance the epoch store to the next epoch for testing only.
3246    /// This only manually sets all the places where we have the epoch number.
3247    /// It doesn't properly reconfigure the node, hence should be only used for
3248    /// testing.
3249    pub async fn reconfigure_for_testing(&self) {
3250        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3251        let epoch_store = self.epoch_store_for_testing().clone();
3252        let protocol_config = epoch_store.protocol_config().clone();
3253        // The current protocol config used in the epoch store may have been overridden
3254        // and diverged from the protocol config definitions. That override may
3255        // have now been dropped when the initial guard was dropped. We reapply
3256        // the override before creating the new epoch store, to make sure that
3257        // the new epoch store has the same protocol config as the current one.
3258        // Since this is for testing only, we mostly like to keep the protocol config
3259        // the same across epochs.
3260        let _guard =
3261            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3262        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3263            self.get_backing_package_store().clone(),
3264            self.get_object_store().clone(),
3265            &self.config.expensive_safety_check_config,
3266            self.checkpoint_store
3267                .get_epoch_last_checkpoint(epoch_store.epoch())
3268                .unwrap()
3269                .map(|c| *c.sequence_number())
3270                .unwrap_or_default(),
3271        );
3272        let new_epoch = new_epoch_store.epoch();
3273        self.transaction_manager.reconfigure(new_epoch);
3274        self.epoch_store.store(new_epoch_store);
3275        epoch_store.epoch_terminated().await;
3276        *execution_lock = new_epoch;
3277    }
3278
3279    #[instrument(level = "error", skip_all)]
3280    fn check_system_consistency(
3281        &self,
3282        cur_epoch_store: &AuthorityPerEpochStore,
3283        accumulator: Arc<StateAccumulator>,
3284        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3285        epoch_supply_change: i64,
3286    ) -> IotaResult<()> {
3287        info!(
3288            "Performing iota conservation consistency check for epoch {}",
3289            cur_epoch_store.epoch()
3290        );
3291
3292        if cfg!(debug_assertions) {
3293            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3294        }
3295
3296        self.get_reconfig_api()
3297            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3298
3299        // check for root state hash consistency with live object set
3300        if expensive_safety_check_config.enable_state_consistency_check() {
3301            info!(
3302                "Performing state consistency check for epoch {}",
3303                cur_epoch_store.epoch()
3304            );
3305            self.expensive_check_is_consistent_state(
3306                accumulator,
3307                cur_epoch_store,
3308                cfg!(debug_assertions), // panic in debug mode only
3309            );
3310        }
3311
3312        if expensive_safety_check_config.enable_secondary_index_checks() {
3313            if let Some(indexes) = self.indexes.clone() {
3314                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3315                    .expect("secondary indexes are inconsistent");
3316            }
3317        }
3318
3319        Ok(())
3320    }
3321
3322    fn expensive_check_is_consistent_state(
3323        &self,
3324        accumulator: Arc<StateAccumulator>,
3325        cur_epoch_store: &AuthorityPerEpochStore,
3326        panic: bool,
3327    ) {
3328        let live_object_set_hash = accumulator.digest_live_object_set();
3329
3330        let root_state_hash: ECMHLiveObjectSetDigest = self
3331            .get_accumulator_store()
3332            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3333            .expect("Retrieving root state hash cannot fail")
3334            .expect("Root state hash for epoch must exist")
3335            .1
3336            .digest()
3337            .into();
3338
3339        let is_inconsistent = root_state_hash != live_object_set_hash;
3340        if is_inconsistent {
3341            if panic {
3342                panic!(
3343                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3344                );
3345            } else {
3346                error!(
3347                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3348                    root_state_hash, live_object_set_hash
3349                );
3350            }
3351        } else {
3352            info!("State consistency check passed");
3353        }
3354
3355        if !panic {
3356            accumulator.set_inconsistent_state(is_inconsistent);
3357        }
3358    }
3359
3360    pub fn current_epoch_for_testing(&self) -> EpochId {
3361        self.epoch_store_for_testing().epoch()
3362    }
3363
3364    #[instrument(level = "error", skip_all)]
3365    pub fn checkpoint_all_dbs(
3366        &self,
3367        checkpoint_path: &Path,
3368        cur_epoch_store: &AuthorityPerEpochStore,
3369        checkpoint_indexes: bool,
3370    ) -> IotaResult {
3371        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3372        let current_epoch = cur_epoch_store.epoch();
3373
3374        if checkpoint_path.exists() {
3375            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3376            return Ok(());
3377        }
3378
3379        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3380        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3381
3382        if checkpoint_path_tmp.exists() {
3383            fs::remove_dir_all(&checkpoint_path_tmp)
3384                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3385        }
3386
3387        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3388        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3389
3390        // NOTE: Do not change the order of invoking these checkpoint calls
3391        // We want to snapshot checkpoint db first to not race with state sync
3392        self.checkpoint_store
3393            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3394
3395        self.get_reconfig_api()
3396            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3397
3398        self.committee_store
3399            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3400
3401        if checkpoint_indexes {
3402            if let Some(indexes) = self.indexes.as_ref() {
3403                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3404            }
3405        }
3406
3407        fs::rename(checkpoint_path_tmp, checkpoint_path)
3408            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3409        Ok(())
3410    }
3411
3412    /// Load the current epoch store. This can change during reconfiguration. To
3413    /// ensure that we never end up accessing different epoch stores in a
3414    /// single task, we need to make sure that this is called once per task.
3415    /// Each call needs to be carefully audited to ensure it is
3416    /// the case. This also means we should minimize the number of call-sites.
3417    /// Only call it when there is no way to obtain it from somewhere else.
3418    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3419        self.epoch_store.load()
3420    }
3421
3422    // Load the epoch store, should be used in tests only.
3423    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3424        self.load_epoch_store_one_call_per_task()
3425    }
3426
3427    pub fn clone_committee_for_testing(&self) -> Committee {
3428        Committee::clone(self.epoch_store_for_testing().committee())
3429    }
3430
3431    #[instrument(level = "trace", skip_all)]
3432    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3433        self.get_object_store()
3434            .try_get_object(object_id)
3435            .map_err(Into::into)
3436    }
3437
3438    /// Non-fallible version of `try_get_object`.
3439    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3440        self.try_get_object(object_id)
3441            .await
3442            .expect("storage access failed")
3443    }
3444
3445    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3446        Ok(self
3447            .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3448            .await?
3449            .expect("framework object should always exist")
3450            .compute_object_reference())
3451    }
3452
3453    // This function is only used for testing.
3454    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3455        self.get_object_cache_reader()
3456            .try_get_iota_system_state_object_unsafe()
3457    }
3458
3459    #[instrument(level = "trace", skip_all)]
3460    pub fn get_checkpoint_by_sequence_number(
3461        &self,
3462        sequence_number: CheckpointSequenceNumber,
3463    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3464        Ok(self
3465            .checkpoint_store
3466            .get_checkpoint_by_sequence_number(sequence_number)?)
3467    }
3468
3469    #[instrument(level = "trace", skip_all)]
3470    pub fn get_transaction_checkpoint_for_tests(
3471        &self,
3472        digest: &TransactionDigest,
3473        epoch_store: &AuthorityPerEpochStore,
3474    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3475        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3476        let Some(checkpoint) = checkpoint else {
3477            return Ok(None);
3478        };
3479        let checkpoint = self
3480            .checkpoint_store
3481            .get_checkpoint_by_sequence_number(checkpoint)?;
3482        Ok(checkpoint)
3483    }
3484
3485    #[instrument(level = "trace", skip_all)]
3486    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3487        Ok(
3488            match self
3489                .get_object_cache_reader()
3490                .try_get_latest_object_or_tombstone(*object_id)?
3491            {
3492                Some((_, ObjectOrTombstone::Object(object))) => {
3493                    let layout = self.get_object_layout(&object)?;
3494                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3495                }
3496                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3497                None => ObjectRead::NotExists(*object_id),
3498            },
3499        )
3500    }
3501
3502    /// Chain Identifier is the digest of the genesis checkpoint.
3503    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3504        self.chain_identifier
3505    }
3506
3507    #[instrument(level = "trace", skip_all)]
3508    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3509    where
3510        T: DeserializeOwned,
3511    {
3512        let o = self.get_object_read(object_id)?.into_object()?;
3513        if let Some(move_object) = o.data.try_as_move() {
3514            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3515                IotaError::ObjectDeserialization {
3516                    error: format!("{e}"),
3517                }
3518            })?)
3519        } else {
3520            Err(IotaError::ObjectDeserialization {
3521                error: format!("Provided object : [{object_id}] is not a Move object."),
3522            })
3523        }
3524    }
3525
3526    /// This function aims to serve rpc reads on past objects and
3527    /// we don't expect it to be called for other purposes.
3528    /// Depending on the object pruning policies that will be enforced in the
3529    /// future there is no software-level guarantee/SLA to retrieve an object
3530    /// with an old version even if it exists/existed.
3531    #[instrument(level = "trace", skip_all)]
3532    pub fn get_past_object_read(
3533        &self,
3534        object_id: &ObjectID,
3535        version: SequenceNumber,
3536    ) -> IotaResult<PastObjectRead> {
3537        // Firstly we see if the object ever existed by getting its latest data
3538        let Some(obj_ref) = self
3539            .get_object_cache_reader()
3540            .try_get_latest_object_ref_or_tombstone(*object_id)?
3541        else {
3542            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3543        };
3544
3545        if version > obj_ref.1 {
3546            return Ok(PastObjectRead::VersionTooHigh {
3547                object_id: *object_id,
3548                asked_version: version,
3549                latest_version: obj_ref.1,
3550            });
3551        }
3552
3553        if version < obj_ref.1 {
3554            // Read past objects
3555            return Ok(match self.read_object_at_version(object_id, version)? {
3556                Some((object, layout)) => {
3557                    let obj_ref = object.compute_object_reference();
3558                    PastObjectRead::VersionFound(obj_ref, object, layout)
3559                }
3560
3561                None => PastObjectRead::VersionNotFound(*object_id, version),
3562            });
3563        }
3564
3565        if !obj_ref.2.is_alive() {
3566            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3567        }
3568
3569        match self.read_object_at_version(object_id, obj_ref.1)? {
3570            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3571            None => {
3572                error!(
3573                    "Object with in parent_entry is missing from object store, datastore is \
3574                     inconsistent",
3575                );
3576                Err(UserInputError::ObjectNotFound {
3577                    object_id: *object_id,
3578                    version: Some(obj_ref.1),
3579                }
3580                .into())
3581            }
3582        }
3583    }
3584
3585    #[instrument(level = "trace", skip_all)]
3586    fn read_object_at_version(
3587        &self,
3588        object_id: &ObjectID,
3589        version: SequenceNumber,
3590    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3591        let Some(object) = self
3592            .get_object_cache_reader()
3593            .try_get_object_by_key(object_id, version)?
3594        else {
3595            return Ok(None);
3596        };
3597
3598        let layout = self.get_object_layout(&object)?;
3599        Ok(Some((object, layout)))
3600    }
3601
3602    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3603        let layout = object
3604            .data
3605            .try_as_move()
3606            .map(|object| {
3607                into_struct_layout(
3608                    self.load_epoch_store_one_call_per_task()
3609                        .executor()
3610                        // TODO(cache) - must read through cache
3611                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3612                        .get_annotated_layout(&object.type_().clone().into())?,
3613                )
3614            })
3615            .transpose()?;
3616        Ok(layout)
3617    }
3618
3619    fn get_owner_at_version(
3620        &self,
3621        object_id: &ObjectID,
3622        version: SequenceNumber,
3623    ) -> IotaResult<Owner> {
3624        self.get_object_store()
3625            .try_get_object_by_key(object_id, version)?
3626            .ok_or_else(|| {
3627                IotaError::from(UserInputError::ObjectNotFound {
3628                    object_id: *object_id,
3629                    version: Some(version),
3630                })
3631            })
3632            .map(|o| o.owner)
3633    }
3634
3635    #[instrument(level = "trace", skip_all)]
3636    pub fn get_owner_objects(
3637        &self,
3638        owner: IotaAddress,
3639        // If `Some`, the query will start from the next item after the specified cursor
3640        cursor: Option<ObjectID>,
3641        limit: usize,
3642        filter: Option<IotaObjectDataFilter>,
3643    ) -> IotaResult<Vec<ObjectInfo>> {
3644        if let Some(indexes) = &self.indexes {
3645            indexes.get_owner_objects(owner, cursor, limit, filter)
3646        } else {
3647            Err(IotaError::IndexStoreNotAvailable)
3648        }
3649    }
3650
3651    #[instrument(level = "trace", skip_all)]
3652    pub fn get_owned_coins_iterator_with_cursor(
3653        &self,
3654        owner: IotaAddress,
3655        // If `Some`, the query will start from the next item after the specified cursor
3656        cursor: (String, ObjectID),
3657        limit: usize,
3658        one_coin_type_only: bool,
3659    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3660        if let Some(indexes) = &self.indexes {
3661            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3662        } else {
3663            Err(IotaError::IndexStoreNotAvailable)
3664        }
3665    }
3666
3667    #[instrument(level = "trace", skip_all)]
3668    pub fn get_owner_objects_iterator(
3669        &self,
3670        owner: IotaAddress,
3671        // If `Some`, the query will start from the next item after the specified cursor
3672        cursor: Option<ObjectID>,
3673        filter: Option<IotaObjectDataFilter>,
3674    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3675        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3676        if let Some(indexes) = &self.indexes {
3677            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3678        } else {
3679            Err(IotaError::IndexStoreNotAvailable)
3680        }
3681    }
3682
3683    #[instrument(level = "trace", skip_all)]
3684    pub async fn get_move_objects<T>(
3685        &self,
3686        owner: IotaAddress,
3687        type_: MoveObjectType,
3688    ) -> IotaResult<Vec<T>>
3689    where
3690        T: DeserializeOwned,
3691    {
3692        let object_ids = self
3693            .get_owner_objects_iterator(owner, None, None)?
3694            .filter(|o| match &o.type_ {
3695                ObjectType::Struct(s) => &type_ == s,
3696                ObjectType::Package => false,
3697            })
3698            .map(|info| ObjectKey(info.object_id, info.version))
3699            .collect::<Vec<_>>();
3700        let mut move_objects = vec![];
3701
3702        let objects = self
3703            .get_object_store()
3704            .try_multi_get_objects_by_key(&object_ids)?;
3705
3706        for (o, id) in objects.into_iter().zip(object_ids) {
3707            let object = o.ok_or_else(|| {
3708                IotaError::from(UserInputError::ObjectNotFound {
3709                    object_id: id.0,
3710                    version: Some(id.1),
3711                })
3712            })?;
3713            let move_object = object.data.try_as_move().ok_or_else(|| {
3714                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3715            })?;
3716            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3717                IotaError::ObjectDeserialization {
3718                    error: format!("{e}"),
3719                }
3720            })?);
3721        }
3722        Ok(move_objects)
3723    }
3724
3725    #[instrument(level = "trace", skip_all)]
3726    pub fn get_dynamic_fields(
3727        &self,
3728        owner: ObjectID,
3729        // If `Some`, the query will start from the next item after the specified cursor
3730        cursor: Option<ObjectID>,
3731        limit: usize,
3732    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3733        Ok(self
3734            .get_dynamic_fields_iterator(owner, cursor)?
3735            .take(limit)
3736            .collect::<Result<Vec<_>, _>>()?)
3737    }
3738
3739    fn get_dynamic_fields_iterator(
3740        &self,
3741        owner: ObjectID,
3742        // If `Some`, the query will start from the next item after the specified cursor
3743        cursor: Option<ObjectID>,
3744    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3745    {
3746        if let Some(indexes) = &self.indexes {
3747            indexes.get_dynamic_fields_iterator(owner, cursor)
3748        } else {
3749            Err(IotaError::IndexStoreNotAvailable)
3750        }
3751    }
3752
3753    #[instrument(level = "trace", skip_all)]
3754    pub fn get_dynamic_field_object_id(
3755        &self,
3756        owner: ObjectID,
3757        name_type: TypeTag,
3758        name_bcs_bytes: &[u8],
3759    ) -> IotaResult<Option<ObjectID>> {
3760        if let Some(indexes) = &self.indexes {
3761            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3762        } else {
3763            Err(IotaError::IndexStoreNotAvailable)
3764        }
3765    }
3766
3767    #[instrument(level = "trace", skip_all)]
3768    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3769        Ok(self.get_indexes()?.next_sequence_number())
3770    }
3771
3772    #[instrument(level = "trace", skip_all)]
3773    pub async fn get_executed_transaction_and_effects(
3774        &self,
3775        digest: TransactionDigest,
3776        kv_store: Arc<TransactionKeyValueStore>,
3777    ) -> IotaResult<(Transaction, TransactionEffects)> {
3778        let transaction = kv_store.get_tx(digest).await?;
3779        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3780        Ok((transaction, effects))
3781    }
3782
3783    #[instrument(level = "trace", skip_all)]
3784    pub fn multi_get_checkpoint_by_sequence_number(
3785        &self,
3786        sequence_numbers: &[CheckpointSequenceNumber],
3787    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3788        Ok(self
3789            .checkpoint_store
3790            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3791    }
3792
3793    #[instrument(level = "trace", skip_all)]
3794    pub fn get_transaction_events(
3795        &self,
3796        digest: &TransactionEventsDigest,
3797    ) -> IotaResult<TransactionEvents> {
3798        self.get_transaction_cache_reader()
3799            .try_get_events(digest)?
3800            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3801    }
3802
3803    pub fn get_transaction_input_objects(
3804        &self,
3805        effects: &TransactionEffects,
3806    ) -> anyhow::Result<Vec<Object>> {
3807        let input_object_keys = effects
3808            .modified_at_versions()
3809            .into_iter()
3810            .map(|(object_id, version)| ObjectKey(object_id, version))
3811            .collect::<Vec<_>>();
3812
3813        let input_objects = self
3814            .get_object_store()
3815            .try_multi_get_objects_by_key(&input_object_keys)?
3816            .into_iter()
3817            .enumerate()
3818            .map(|(idx, maybe_object)| {
3819                maybe_object.ok_or_else(|| {
3820                    anyhow::anyhow!(
3821                        "missing input object key {:?} from tx {}",
3822                        input_object_keys[idx],
3823                        effects.transaction_digest()
3824                    )
3825                })
3826            })
3827            .collect::<anyhow::Result<Vec<_>>>()?;
3828        Ok(input_objects)
3829    }
3830
3831    pub fn get_transaction_output_objects(
3832        &self,
3833        effects: &TransactionEffects,
3834    ) -> anyhow::Result<Vec<Object>> {
3835        let output_object_keys = effects
3836            .all_changed_objects()
3837            .into_iter()
3838            .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3839            .collect::<Vec<_>>();
3840
3841        let output_objects = self
3842            .get_object_store()
3843            .try_multi_get_objects_by_key(&output_object_keys)?
3844            .into_iter()
3845            .enumerate()
3846            .map(|(idx, maybe_object)| {
3847                maybe_object.ok_or_else(|| {
3848                    anyhow::anyhow!(
3849                        "missing output object key {:?} from tx {}",
3850                        output_object_keys[idx],
3851                        effects.transaction_digest()
3852                    )
3853                })
3854            })
3855            .collect::<anyhow::Result<Vec<_>>>()?;
3856        Ok(output_objects)
3857    }
3858
3859    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3860        match &self.indexes {
3861            Some(i) => Ok(i.clone()),
3862            None => Err(IotaError::UnsupportedFeature {
3863                error: "extended object indexing is not enabled on this server".into(),
3864            }),
3865        }
3866    }
3867
3868    pub async fn get_transactions_for_tests(
3869        self: &Arc<Self>,
3870        filter: Option<TransactionFilter>,
3871        cursor: Option<TransactionDigest>,
3872        limit: Option<usize>,
3873        reverse: bool,
3874    ) -> IotaResult<Vec<TransactionDigest>> {
3875        let metrics = KeyValueStoreMetrics::new_for_tests();
3876        let kv_store = Arc::new(TransactionKeyValueStore::new(
3877            "rocksdb",
3878            metrics,
3879            self.clone(),
3880        ));
3881        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3882            .await
3883    }
3884
3885    #[instrument(level = "trace", skip_all)]
3886    pub async fn get_transactions(
3887        &self,
3888        kv_store: &Arc<TransactionKeyValueStore>,
3889        filter: Option<TransactionFilter>,
3890        // If `Some`, the query will start from the next item after the specified cursor
3891        cursor: Option<TransactionDigest>,
3892        limit: Option<usize>,
3893        reverse: bool,
3894    ) -> IotaResult<Vec<TransactionDigest>> {
3895        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3896            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3897            let iter = checkpoint_contents.iter().map(|c| c.transaction);
3898            if reverse {
3899                let iter = iter
3900                    .rev()
3901                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3902                    .skip(usize::from(cursor.is_some()));
3903                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3904            } else {
3905                let iter = iter
3906                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3907                    .skip(usize::from(cursor.is_some()));
3908                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3909            }
3910        }
3911        self.get_indexes()?
3912            .get_transactions(filter, cursor, limit, reverse)
3913    }
3914
3915    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3916        &self.checkpoint_store
3917    }
3918
3919    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3920        self.get_checkpoint_store()
3921            .get_highest_executed_checkpoint_seq_number()?
3922            .ok_or(IotaError::UserInput {
3923                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3924            })
3925    }
3926
3927    #[cfg(msim)]
3928    pub fn get_highest_pruned_checkpoint_for_testing(
3929        &self,
3930    ) -> IotaResult<CheckpointSequenceNumber> {
3931        self.database_for_testing()
3932            .perpetual_tables
3933            .get_highest_pruned_checkpoint()
3934    }
3935
3936    #[instrument(level = "trace", skip_all)]
3937    pub fn get_checkpoint_summary_by_sequence_number(
3938        &self,
3939        sequence_number: CheckpointSequenceNumber,
3940    ) -> IotaResult<CheckpointSummary> {
3941        let verified_checkpoint = self
3942            .get_checkpoint_store()
3943            .get_checkpoint_by_sequence_number(sequence_number)?;
3944        match verified_checkpoint {
3945            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3946            None => Err(IotaError::UserInput {
3947                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3948            }),
3949        }
3950    }
3951
3952    #[instrument(level = "trace", skip_all)]
3953    pub fn get_checkpoint_summary_by_digest(
3954        &self,
3955        digest: CheckpointDigest,
3956    ) -> IotaResult<CheckpointSummary> {
3957        let verified_checkpoint = self
3958            .get_checkpoint_store()
3959            .get_checkpoint_by_digest(&digest)?;
3960        match verified_checkpoint {
3961            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3962            None => Err(IotaError::UserInput {
3963                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3964            }),
3965        }
3966    }
3967
3968    #[instrument(level = "trace", skip_all)]
3969    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3970        if is_system_package(package_id) {
3971            return self.find_genesis_txn_digest();
3972        }
3973        Ok(self
3974            .get_object_read(&package_id)?
3975            .into_object()?
3976            .previous_transaction)
3977    }
3978
3979    #[instrument(level = "trace", skip_all)]
3980    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3981        let summary = self
3982            .get_verified_checkpoint_by_sequence_number(0)?
3983            .into_message();
3984        let content = self.get_checkpoint_contents(summary.content_digest)?;
3985        let genesis_transaction = content.enumerate_transactions(&summary).next();
3986        Ok(genesis_transaction
3987            .ok_or(IotaError::UserInput {
3988                error: UserInputError::GenesisTransactionNotFound,
3989            })?
3990            .1
3991            .transaction)
3992    }
3993
3994    #[instrument(level = "trace", skip_all)]
3995    pub fn get_verified_checkpoint_by_sequence_number(
3996        &self,
3997        sequence_number: CheckpointSequenceNumber,
3998    ) -> IotaResult<VerifiedCheckpoint> {
3999        let verified_checkpoint = self
4000            .get_checkpoint_store()
4001            .get_checkpoint_by_sequence_number(sequence_number)?;
4002        match verified_checkpoint {
4003            Some(verified_checkpoint) => Ok(verified_checkpoint),
4004            None => Err(IotaError::UserInput {
4005                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4006            }),
4007        }
4008    }
4009
4010    #[instrument(level = "trace", skip_all)]
4011    pub fn get_verified_checkpoint_summary_by_digest(
4012        &self,
4013        digest: CheckpointDigest,
4014    ) -> IotaResult<VerifiedCheckpoint> {
4015        let verified_checkpoint = self
4016            .get_checkpoint_store()
4017            .get_checkpoint_by_digest(&digest)?;
4018        match verified_checkpoint {
4019            Some(verified_checkpoint) => Ok(verified_checkpoint),
4020            None => Err(IotaError::UserInput {
4021                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4022            }),
4023        }
4024    }
4025
4026    #[instrument(level = "trace", skip_all)]
4027    pub fn get_checkpoint_contents(
4028        &self,
4029        digest: CheckpointContentsDigest,
4030    ) -> IotaResult<CheckpointContents> {
4031        self.get_checkpoint_store()
4032            .get_checkpoint_contents(&digest)?
4033            .ok_or(IotaError::UserInput {
4034                error: UserInputError::CheckpointContentsNotFound(digest),
4035            })
4036    }
4037
4038    #[instrument(level = "trace", skip_all)]
4039    pub fn get_checkpoint_contents_by_sequence_number(
4040        &self,
4041        sequence_number: CheckpointSequenceNumber,
4042    ) -> IotaResult<CheckpointContents> {
4043        let verified_checkpoint = self
4044            .get_checkpoint_store()
4045            .get_checkpoint_by_sequence_number(sequence_number)?;
4046        match verified_checkpoint {
4047            Some(verified_checkpoint) => {
4048                let content_digest = verified_checkpoint.into_inner().content_digest;
4049                self.get_checkpoint_contents(content_digest)
4050            }
4051            None => Err(IotaError::UserInput {
4052                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4053            }),
4054        }
4055    }
4056
4057    #[instrument(level = "trace", skip_all)]
4058    pub async fn query_events(
4059        &self,
4060        kv_store: &Arc<TransactionKeyValueStore>,
4061        query: EventFilter,
4062        // If `Some`, the query will start from the next item after the specified cursor
4063        cursor: Option<EventID>,
4064        limit: usize,
4065        descending: bool,
4066    ) -> IotaResult<Vec<IotaEvent>> {
4067        let index_store = self.get_indexes()?;
4068
4069        // Get the tx_num from tx_digest
4070        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4071            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4072                IotaError::TransactionNotFound {
4073                    digest: cursor.tx_digest,
4074                },
4075            )?;
4076            (tx_seq, cursor.event_seq as usize)
4077        } else if descending {
4078            (u64::MAX, usize::MAX)
4079        } else {
4080            (0, 0)
4081        };
4082
4083        let limit = limit + 1;
4084        let mut event_keys = match query {
4085            EventFilter::All(filters) => {
4086                if filters.is_empty() {
4087                    index_store.all_events(tx_num, event_num, limit, descending)?
4088                } else {
4089                    return Err(IotaError::UserInput {
4090                        error: UserInputError::Unsupported(
4091                            "This query type does not currently support filter combinations"
4092                                .to_string(),
4093                        ),
4094                    });
4095                }
4096            }
4097            EventFilter::Transaction(digest) => {
4098                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4099            }
4100            EventFilter::MoveModule { package, module } => {
4101                let module_id = ModuleId::new(package.into(), module);
4102                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4103            }
4104            EventFilter::MoveEventType(struct_name) => index_store
4105                .events_by_move_event_struct_name(
4106                    &struct_name,
4107                    tx_num,
4108                    event_num,
4109                    limit,
4110                    descending,
4111                )?,
4112            EventFilter::Sender(sender) => {
4113                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4114            }
4115            EventFilter::TimeRange {
4116                start_time,
4117                end_time,
4118            } => index_store
4119                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4120            EventFilter::MoveEventModule { package, module } => index_store
4121                .events_by_move_event_module(
4122                    &ModuleId::new(package.into(), module),
4123                    tx_num,
4124                    event_num,
4125                    limit,
4126                    descending,
4127                )?,
4128            // not using "_ =>" because we want to make sure we remember to add new variants here
4129            EventFilter::Package(_)
4130            | EventFilter::MoveEventField { .. }
4131            | EventFilter::Any(_)
4132            | EventFilter::And(_, _)
4133            | EventFilter::Or(_, _) => {
4134                return Err(IotaError::UserInput {
4135                    error: UserInputError::Unsupported(
4136                        "This query type is not supported by the full node.".to_string(),
4137                    ),
4138                });
4139            }
4140        };
4141
4142        // skip one event if exclusive cursor is provided,
4143        // otherwise truncate to the original limit.
4144        if cursor.is_some() {
4145            if !event_keys.is_empty() {
4146                event_keys.remove(0);
4147            }
4148        } else {
4149            event_keys.truncate(limit - 1);
4150        }
4151
4152        // get the unique set of digests from the event_keys
4153        let transaction_digests = event_keys
4154            .iter()
4155            .map(|(_, digest, _, _)| *digest)
4156            .collect::<HashSet<_>>()
4157            .into_iter()
4158            .collect::<Vec<_>>();
4159
4160        let events = kv_store
4161            .multi_get_events_by_tx_digests(&transaction_digests)
4162            .await?;
4163
4164        let events_map: HashMap<_, _> =
4165            transaction_digests.iter().zip(events.into_iter()).collect();
4166
4167        let stored_events = event_keys
4168            .into_iter()
4169            .map(|k| {
4170                (
4171                    k,
4172                    events_map
4173                        .get(&k.1)
4174                        .expect("fetched digest is missing")
4175                        .clone()
4176                        .and_then(|e| e.data.get(k.2).cloned()),
4177                )
4178            })
4179            .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4180                event
4181                    .map(|e| (e, tx_digest, event_seq, timestamp))
4182                    .ok_or(IotaError::TransactionEventsNotFound { digest })
4183            })
4184            .collect::<Result<Vec<_>, _>>()?;
4185
4186        let epoch_store = self.load_epoch_store_one_call_per_task();
4187        let backing_store = self.get_backing_package_store().as_ref();
4188        let mut layout_resolver = epoch_store
4189            .executor()
4190            .type_layout_resolver(Box::new(backing_store));
4191        let mut events = vec![];
4192        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4193            events.push(IotaEvent::try_from(
4194                e.clone(),
4195                tx_digest,
4196                event_seq as u64,
4197                Some(timestamp),
4198                layout_resolver.get_annotated_layout(&e.type_)?,
4199            )?)
4200        }
4201        Ok(events)
4202    }
4203
4204    pub async fn insert_genesis_object(&self, object: Object) {
4205        self.get_reconfig_api()
4206            .try_insert_genesis_object(object)
4207            .expect("Cannot insert genesis object")
4208    }
4209
4210    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4211        futures::future::join_all(
4212            objects
4213                .iter()
4214                .map(|o| self.insert_genesis_object(o.clone())),
4215        )
4216        .await;
4217    }
4218
4219    /// Make a status response for a transaction
4220    #[instrument(level = "trace", skip_all)]
4221    pub fn get_transaction_status(
4222        &self,
4223        transaction_digest: &TransactionDigest,
4224        epoch_store: &Arc<AuthorityPerEpochStore>,
4225    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4226        // TODO: In the case of read path, we should not have to re-sign the effects.
4227        if let Some(effects) =
4228            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4229        {
4230            if let Some(transaction) = self
4231                .get_transaction_cache_reader()
4232                .try_get_transaction_block(transaction_digest)?
4233            {
4234                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4235                let events = if let Some(digest) = effects.events_digest() {
4236                    self.get_transaction_events(digest)?
4237                } else {
4238                    TransactionEvents::default()
4239                };
4240                return Ok(Some((
4241                    (*transaction).clone().into_message(),
4242                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4243                )));
4244            } else {
4245                // The read of effects and read of transaction are not atomic. It's possible
4246                // that we reverted the transaction (during epoch change) in
4247                // between the above two reads, and we end up having effects but
4248                // not transaction. In this case, we just fall through.
4249                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4250            }
4251        }
4252        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4253            self.metrics.tx_already_processed.inc();
4254            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4255            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4256        } else {
4257            Ok(None)
4258        }
4259    }
4260
4261    /// Get the signed effects of the given transaction. If the effects was
4262    /// signed in a previous epoch, re-sign it so that the caller is able to
4263    /// form a cert of the effects in the current epoch.
4264    #[instrument(level = "trace", skip_all)]
4265    pub fn get_signed_effects_and_maybe_resign(
4266        &self,
4267        transaction_digest: &TransactionDigest,
4268        epoch_store: &Arc<AuthorityPerEpochStore>,
4269    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4270        let effects = self
4271            .get_transaction_cache_reader()
4272            .try_get_executed_effects(transaction_digest)?;
4273        match effects {
4274            Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4275            None => Ok(None),
4276        }
4277    }
4278
4279    #[instrument(level = "trace", skip_all)]
4280    pub(crate) fn sign_effects(
4281        &self,
4282        effects: TransactionEffects,
4283        epoch_store: &Arc<AuthorityPerEpochStore>,
4284    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4285        let tx_digest = *effects.transaction_digest();
4286        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4287            Some(sig) if sig.epoch == epoch_store.epoch() => {
4288                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4289            }
4290            _ => {
4291                // If the transaction was executed in previous epochs, the validator will
4292                // re-sign the effects with new current epoch so that a client is always able to
4293                // obtain an effects certificate at the current epoch.
4294                //
4295                // Why is this necessary? Consider the following case:
4296                // - assume there are 4 validators
4297                // - Quorum driver gets 2 signed effects before reconfig halt
4298                // - The tx makes it into final checkpoint.
4299                // - 2 validators go away and are replaced in the new epoch.
4300                // - The new epoch begins.
4301                // - The quorum driver cannot complete the partial effects cert from the
4302                //   previous epoch, because it may not be able to reach either of the 2 former
4303                //   validators.
4304                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4305                //   the new epoch, the QD can make a new effects cert and return it to the
4306                //   client.
4307                //
4308                // This is a considered a short-term workaround. Eventually, Quorum Driver
4309                // should be able to return either an effects certificate, -or-
4310                // a proof of inclusion in a checkpoint. In the case above, the
4311                // Quorum Driver would return a proof of inclusion in the final
4312                // checkpoint, and this code would no longer be necessary.
4313                debug!(
4314                    ?tx_digest,
4315                    epoch=?epoch_store.epoch(),
4316                    "Re-signing the effects with the current epoch"
4317                );
4318
4319                let sig = AuthoritySignInfo::new(
4320                    epoch_store.epoch(),
4321                    &effects,
4322                    Intent::iota_app(IntentScope::TransactionEffects),
4323                    self.name,
4324                    &*self.secret,
4325                );
4326
4327                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4328
4329                epoch_store.insert_effects_digest_and_signature(
4330                    &tx_digest,
4331                    effects.digest(),
4332                    &sig,
4333                )?;
4334
4335                effects
4336            }
4337        };
4338
4339        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4340            signed_effects,
4341        ))
4342    }
4343
4344    // Returns coin objects for indexing for fullnode if indexing is enabled.
4345    #[instrument(level = "trace", skip_all)]
4346    fn fullnode_only_get_tx_coins_for_indexing(
4347        &self,
4348        inner_temporary_store: &InnerTemporaryStore,
4349        epoch_store: &Arc<AuthorityPerEpochStore>,
4350    ) -> Option<TxCoins> {
4351        if self.indexes.is_none() || self.is_validator(epoch_store) {
4352            return None;
4353        }
4354        let written_coin_objects = inner_temporary_store
4355            .written
4356            .iter()
4357            .filter_map(|(k, v)| {
4358                if v.is_coin() {
4359                    Some((*k, v.clone()))
4360                } else {
4361                    None
4362                }
4363            })
4364            .collect();
4365        let input_coin_objects = inner_temporary_store
4366            .input_objects
4367            .iter()
4368            .filter_map(|(k, v)| {
4369                if v.is_coin() {
4370                    Some((*k, v.clone()))
4371                } else {
4372                    None
4373                }
4374            })
4375            .collect::<ObjectMap>();
4376        Some((input_coin_objects, written_coin_objects))
4377    }
4378
4379    /// Get the TransactionEnvelope that currently locks the given object, if
4380    /// any. Since object locks are only valid for one epoch, we also need
4381    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4382    /// no lock records for the given object can be found.
4383    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4384    /// object record is at a different version.
4385    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4386    /// certain transaction. Returns None if the a lock record is
4387    /// initialized for the given ObjectRef but not yet locked by any
4388    /// transaction,     or cannot find the transaction in transaction
4389    /// table, because of data race etc.
4390    #[instrument(level = "trace", skip_all)]
4391    pub async fn get_transaction_lock(
4392        &self,
4393        object_ref: &ObjectRef,
4394        epoch_store: &AuthorityPerEpochStore,
4395    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4396        let lock_info = self
4397            .get_object_cache_reader()
4398            .try_get_lock(*object_ref, epoch_store)?;
4399        let lock_info = match lock_info {
4400            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4401                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4402                    provided_obj_ref: *object_ref,
4403                    current_version: locked_ref.1,
4404                }
4405                .into());
4406            }
4407            ObjectLockStatus::Initialized => {
4408                return Ok(None);
4409            }
4410            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4411        };
4412
4413        epoch_store.get_signed_transaction(&lock_info)
4414    }
4415
4416    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4417        self.get_object_cache_reader().try_get_objects(objects)
4418    }
4419
4420    /// Non-fallible version of `try_get_objects`.
4421    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4422        self.try_get_objects(objects)
4423            .await
4424            .expect("storage access failed")
4425    }
4426
4427    pub async fn try_get_object_or_tombstone(
4428        &self,
4429        object_id: ObjectID,
4430    ) -> IotaResult<Option<ObjectRef>> {
4431        self.get_object_cache_reader()
4432            .try_get_latest_object_ref_or_tombstone(object_id)
4433    }
4434
4435    /// Non-fallible version of `try_get_object_or_tombstone`.
4436    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4437        self.try_get_object_or_tombstone(object_id)
4438            .await
4439            .expect("storage access failed")
4440    }
4441
4442    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4443    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4444    /// upgrade.
4445    ///
4446    /// This method can be used to dynamic adjust the amount of buffer. If set
4447    /// to 0, the upgrade will go through with only 2f+1 votes.
4448    ///
4449    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4450    /// should have the same value), or you risk halting the chain.
4451    pub fn set_override_protocol_upgrade_buffer_stake(
4452        &self,
4453        expected_epoch: EpochId,
4454        buffer_stake_bps: u64,
4455    ) -> IotaResult {
4456        let epoch_store = self.load_epoch_store_one_call_per_task();
4457        let actual_epoch = epoch_store.epoch();
4458        if actual_epoch != expected_epoch {
4459            return Err(IotaError::WrongEpoch {
4460                expected_epoch,
4461                actual_epoch,
4462            });
4463        }
4464
4465        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4466    }
4467
4468    pub fn clear_override_protocol_upgrade_buffer_stake(
4469        &self,
4470        expected_epoch: EpochId,
4471    ) -> IotaResult {
4472        let epoch_store = self.load_epoch_store_one_call_per_task();
4473        let actual_epoch = epoch_store.epoch();
4474        if actual_epoch != expected_epoch {
4475            return Err(IotaError::WrongEpoch {
4476                expected_epoch,
4477                actual_epoch,
4478            });
4479        }
4480
4481        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4482    }
4483
4484    /// Get the set of system packages that are compiled in to this build, if
4485    /// those packages are compatible with the current versions of those
4486    /// packages on-chain.
4487    pub async fn get_available_system_packages(
4488        &self,
4489        binary_config: &BinaryConfig,
4490    ) -> Vec<ObjectRef> {
4491        let mut results = vec![];
4492
4493        let system_packages = BuiltInFramework::iter_system_packages();
4494
4495        // Add extra framework packages during simtest
4496        #[cfg(msim)]
4497        let extra_packages = framework_injection::get_extra_packages(self.name);
4498        #[cfg(msim)]
4499        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4500
4501        for system_package in system_packages {
4502            let modules = system_package.modules().to_vec();
4503            // In simtests, we could override the current built-in framework packages.
4504            #[cfg(msim)]
4505            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4506                .unwrap_or(modules);
4507
4508            let Some(obj_ref) = iota_framework::compare_system_package(
4509                &self.get_object_store(),
4510                &system_package.id,
4511                &modules,
4512                system_package.dependencies.to_vec(),
4513                binary_config,
4514            )
4515            .await
4516            else {
4517                return vec![];
4518            };
4519            results.push(obj_ref);
4520        }
4521
4522        results
4523    }
4524
4525    /// Return the new versions, module bytes, and dependencies for the packages
4526    /// that have been committed to for a framework upgrade, in
4527    /// `system_packages`.  Loads the module contents from the binary, and
4528    /// performs the following checks:
4529    ///
4530    /// - Whether its contents matches what is on-chain already, in which case
4531    ///   no upgrade is required, and its contents are omitted from the output.
4532    /// - Whether the contents in the binary can form a package whose digest
4533    ///   matches the input, meaning the framework will be upgraded, and this
4534    ///   authority can satisfy that upgrade, in which case the contents are
4535    ///   included in the output.
4536    ///
4537    /// If the current version of the framework can't be loaded, the binary does
4538    /// not contain the bytes for that framework ID, or the resulting
4539    /// package fails the digest check, `None` is returned indicating that
4540    /// this authority cannot run the upgrade that the network voted on.
4541    async fn get_system_package_bytes(
4542        &self,
4543        system_packages: Vec<ObjectRef>,
4544        binary_config: &BinaryConfig,
4545    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4546        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4547        let objects = self.get_objects(&ids).await;
4548
4549        let mut res = Vec::with_capacity(system_packages.len());
4550        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4551            let prev_transaction = match object {
4552                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4553                    // Skip this one because it doesn't need to be upgraded.
4554                    info!("Framework {} does not need updating", system_package_ref.0);
4555                    continue;
4556                }
4557
4558                Some(cur_object) => cur_object.previous_transaction,
4559                None => TransactionDigest::genesis_marker(),
4560            };
4561
4562            #[cfg(msim)]
4563            let SystemPackage {
4564                id: _,
4565                bytes,
4566                dependencies,
4567            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4568                .unwrap_or_else(|| {
4569                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4570                });
4571
4572            #[cfg(not(msim))]
4573            let SystemPackage {
4574                id: _,
4575                bytes,
4576                dependencies,
4577            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4578
4579            let modules: Vec<_> = bytes
4580                .iter()
4581                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4582                .collect();
4583
4584            let new_object = Object::new_system_package(
4585                &modules,
4586                system_package_ref.1,
4587                dependencies.clone(),
4588                prev_transaction,
4589            );
4590
4591            let new_ref = new_object.compute_object_reference();
4592            if new_ref != system_package_ref {
4593                error!(
4594                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4595                );
4596                return None;
4597            }
4598
4599            res.push((system_package_ref.1, bytes, dependencies));
4600        }
4601
4602        Some(res)
4603    }
4604
4605    /// Returns the new protocol version and system packages that the network
4606    /// has voted to upgrade to. If the proposed protocol version is not
4607    /// supported, None is returned.
4608    fn is_protocol_version_supported_v1(
4609        proposed_protocol_version: ProtocolVersion,
4610        committee: &Committee,
4611        capabilities: Vec<AuthorityCapabilitiesV1>,
4612        mut buffer_stake_bps: u64,
4613    ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
4614        if buffer_stake_bps > 10000 {
4615            warn!("clamping buffer_stake_bps to 10000");
4616            buffer_stake_bps = 10000;
4617        }
4618
4619        // For each validator, gather the protocol version and system packages that it
4620        // would like to upgrade to in the next epoch.
4621        let mut desired_upgrades: Vec<_> = capabilities
4622            .into_iter()
4623            .filter_map(|mut cap| {
4624                // A validator that lists no packages is voting against any change at all.
4625                if cap.available_system_packages.is_empty() {
4626                    return None;
4627                }
4628
4629                cap.available_system_packages.sort();
4630
4631                info!(
4632                    "validator {:?} supports {:?} with system packages: {:?}",
4633                    cap.authority.concise(),
4634                    cap.supported_protocol_versions,
4635                    cap.available_system_packages,
4636                );
4637
4638                // A validator that only supports the current protocol version is also voting
4639                // against any change, because framework upgrades always require a protocol
4640                // version bump.
4641                cap.supported_protocol_versions
4642                    .get_version_digest(proposed_protocol_version)
4643                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4644            })
4645            .collect();
4646
4647        // There can only be one set of votes that have a majority, find one if it
4648        // exists.
4649        desired_upgrades.sort();
4650        desired_upgrades
4651            .into_iter()
4652            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4653            .into_iter()
4654            .find_map(|((digest, packages), group)| {
4655                // should have been filtered out earlier.
4656                assert!(!packages.is_empty());
4657
4658                let mut stake_aggregator: StakeAggregator<(), true> =
4659                    StakeAggregator::new(Arc::new(committee.clone()));
4660
4661                for (_, _, authority) in group {
4662                    stake_aggregator.insert_generic(authority, ());
4663                }
4664
4665                let total_votes = stake_aggregator.total_votes();
4666                let quorum_threshold = committee.quorum_threshold();
4667                let f = committee.total_votes() - committee.quorum_threshold();
4668
4669                // multiple by buffer_stake_bps / 10000, rounded up.
4670                let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
4671                let effective_threshold = quorum_threshold + buffer_stake;
4672
4673                info!(
4674                    protocol_config_digest = ?digest,
4675                    ?total_votes,
4676                    ?quorum_threshold,
4677                    ?buffer_stake_bps,
4678                    ?effective_threshold,
4679                    ?proposed_protocol_version,
4680                    ?packages,
4681                    "support for upgrade"
4682                );
4683
4684                let has_support = total_votes >= effective_threshold;
4685                has_support.then_some((proposed_protocol_version, packages))
4686            })
4687    }
4688
4689    /// Selects the highest supported protocol version and system packages that
4690    /// the network has voted to upgrade to. If no upgrade is supported,
4691    /// returns the current protocol version and system packages.
4692    fn choose_protocol_version_and_system_packages_v1(
4693        current_protocol_version: ProtocolVersion,
4694        committee: &Committee,
4695        capabilities: Vec<AuthorityCapabilitiesV1>,
4696        buffer_stake_bps: u64,
4697    ) -> (ProtocolVersion, Vec<ObjectRef>) {
4698        let mut next_protocol_version = current_protocol_version;
4699        let mut system_packages = vec![];
4700
4701        // Finds the highest supported protocol version and system packages by
4702        // incrementing the proposed protocol version by one until no further
4703        // upgrades are supported.
4704        while let Some((version, packages)) = Self::is_protocol_version_supported_v1(
4705            next_protocol_version + 1,
4706            committee,
4707            capabilities.clone(),
4708            buffer_stake_bps,
4709        ) {
4710            next_protocol_version = version;
4711            system_packages = packages;
4712        }
4713
4714        (next_protocol_version, system_packages)
4715    }
4716
4717    #[instrument(level = "debug", skip_all)]
4718    fn create_authenticator_state_tx(
4719        &self,
4720        epoch_store: &Arc<AuthorityPerEpochStore>,
4721    ) -> Option<EndOfEpochTransactionKind> {
4722        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4723            info!("authenticator state transactions not enabled");
4724            return None;
4725        }
4726
4727        let authenticator_state_exists = epoch_store.authenticator_state_exists();
4728        let tx = if authenticator_state_exists {
4729            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4730            let min_epoch =
4731                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4732            let authenticator_obj_initial_shared_version = epoch_store
4733                .epoch_start_config()
4734                .authenticator_obj_initial_shared_version()
4735                .expect("initial version must exist");
4736
4737            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4738                min_epoch,
4739                authenticator_obj_initial_shared_version,
4740            );
4741
4742            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4743
4744            tx
4745        } else {
4746            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4747            info!("Creating AuthenticatorStateCreate tx");
4748            tx
4749        };
4750        Some(tx)
4751    }
4752
4753    /// Creates and execute the advance epoch transaction to effects without
4754    /// committing it to the database. The effects of the change epoch tx
4755    /// are only written to the database after a certified checkpoint has been
4756    /// formed and executed by CheckpointExecutor.
4757    ///
4758    /// When a framework upgraded has been decided on, but the validator does
4759    /// not have the new versions of the packages locally, the validator
4760    /// cannot form the ChangeEpochTx. In this case it returns Err,
4761    /// indicating that the checkpoint builder should give up trying to make the
4762    /// final checkpoint. As long as the network is able to create a certified
4763    /// checkpoint (which should be ensured by the capabilities vote), it
4764    /// will arrive via state sync and be executed by CheckpointExecutor.
4765    #[instrument(level = "error", skip_all)]
4766    pub async fn create_and_execute_advance_epoch_tx(
4767        &self,
4768        epoch_store: &Arc<AuthorityPerEpochStore>,
4769        gas_cost_summary: &GasCostSummary,
4770        checkpoint: CheckpointSequenceNumber,
4771        epoch_start_timestamp_ms: CheckpointTimestamp,
4772    ) -> anyhow::Result<(
4773        IotaSystemState,
4774        Option<SystemEpochInfoEvent>,
4775        TransactionEffects,
4776    )> {
4777        let mut txns = Vec::new();
4778
4779        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4780            txns.push(tx);
4781        }
4782
4783        let next_epoch = epoch_store.epoch() + 1;
4784
4785        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4786
4787        let (next_epoch_protocol_version, next_epoch_system_packages) =
4788            Self::choose_protocol_version_and_system_packages_v1(
4789                epoch_store.protocol_version(),
4790                epoch_store.committee(),
4791                epoch_store
4792                    .get_capabilities_v1()
4793                    .expect("read capabilities from db cannot fail"),
4794                buffer_stake_bps,
4795            );
4796
4797        // since system packages are created during the current epoch, they should abide
4798        // by the rules of the current epoch, including the current epoch's max
4799        // Move binary format version
4800        let config = epoch_store.protocol_config();
4801        let binary_config = to_binary_config(config);
4802        let Some(next_epoch_system_package_bytes) = self
4803            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4804            .await
4805        else {
4806            error!(
4807                "upgraded system packages {:?} are not locally available, cannot create \
4808                ChangeEpochTx. validator binary must be upgraded to the correct version!",
4809                next_epoch_system_packages
4810            );
4811            // the checkpoint builder will keep retrying forever when it hits this error.
4812            // Eventually, one of two things will happen:
4813            // - The operator will upgrade this binary to one that has the new packages
4814            //   locally, and this function will succeed.
4815            // - The final checkpoint will be certified by other validators, we will receive
4816            //   it via state sync, and execute it. This will upgrade the framework
4817            //   packages, reconfigure, and most likely shut down in the new epoch (this
4818            //   validator likely doesn't support the new protocol version, or else it
4819            //   should have had the packages.)
4820            bail!("missing system packages: cannot form ChangeEpochTx");
4821        };
4822
4823        // ChangeEpochV2 requires that both options are set - ProtocolDefinedBaseFee and
4824        // MaxCommitteeMembersCount.
4825        if config.protocol_defined_base_fee()
4826            && config.max_committee_members_count_as_option().is_some()
4827        {
4828            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4829                next_epoch,
4830                next_epoch_protocol_version,
4831                gas_cost_summary.storage_cost,
4832                gas_cost_summary.computation_cost,
4833                gas_cost_summary.computation_cost_burned,
4834                gas_cost_summary.storage_rebate,
4835                gas_cost_summary.non_refundable_storage_fee,
4836                epoch_start_timestamp_ms,
4837                next_epoch_system_package_bytes,
4838            ));
4839        } else {
4840            txns.push(EndOfEpochTransactionKind::new_change_epoch(
4841                next_epoch,
4842                next_epoch_protocol_version,
4843                gas_cost_summary.storage_cost,
4844                gas_cost_summary.computation_cost,
4845                gas_cost_summary.storage_rebate,
4846                gas_cost_summary.non_refundable_storage_fee,
4847                epoch_start_timestamp_ms,
4848                next_epoch_system_package_bytes,
4849            ));
4850        }
4851
4852        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4853
4854        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4855            tx.clone(),
4856            epoch_store.epoch(),
4857            checkpoint,
4858        );
4859
4860        let tx_digest = executable_tx.digest();
4861
4862        info!(
4863            ?next_epoch,
4864            ?next_epoch_protocol_version,
4865            ?next_epoch_system_packages,
4866            computation_cost=?gas_cost_summary.computation_cost,
4867            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4868            storage_cost=?gas_cost_summary.storage_cost,
4869            storage_rebate=?gas_cost_summary.storage_rebate,
4870            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4871            ?tx_digest,
4872            "Creating advance epoch transaction"
4873        );
4874
4875        fail_point_async!("change_epoch_tx_delay");
4876        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
4877
4878        // The tx could have been executed by state sync already - if so simply return
4879        // an error. The checkpoint builder will shortly be terminated by
4880        // reconfiguration anyway.
4881        if self
4882            .get_transaction_cache_reader()
4883            .try_is_tx_already_executed(tx_digest)?
4884        {
4885            warn!("change epoch tx has already been executed via state sync");
4886            bail!("change epoch tx has already been executed via state sync",);
4887        }
4888
4889        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
4890
4891        // We must manually assign the shared object versions to the transaction before
4892        // executing it. This is because we do not sequence end-of-epoch
4893        // transactions through consensus.
4894        epoch_store.assign_shared_object_versions_idempotent(
4895            self.get_object_cache_reader().as_ref(),
4896            &[executable_tx.clone()],
4897        )?;
4898
4899        let input_objects =
4900            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4901
4902        let (temporary_store, effects, _execution_error_opt) =
4903            self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4904        let system_obj = get_iota_system_state(&temporary_store.written)
4905            .expect("change epoch tx must write to system object");
4906        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
4907        let system_epoch_info_event = temporary_store
4908            .events
4909            .data
4910            .into_iter()
4911            .find(|event| event.is_system_epoch_info_event())
4912            .map(SystemEpochInfoEvent::from);
4913        // The system epoch info event can be `None` in case if the `advance_epoch`
4914        // Move function call failed and was executed in the safe mode.
4915        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4916
4917        // We must write tx and effects to the state sync tables so that state sync is
4918        // able to deliver to the transaction to CheckpointExecutor after it is
4919        // included in a certified checkpoint.
4920        self.get_state_sync_store()
4921            .try_insert_transaction_and_effects(&tx, &effects)
4922            .map_err(|err| {
4923                let err: anyhow::Error = err.into();
4924                err
4925            })?;
4926
4927        info!(
4928            "Effects summary of the change epoch transaction: {:?}",
4929            effects.summary_for_debug()
4930        );
4931        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
4932        // The change epoch transaction cannot fail to execute.
4933        assert!(effects.status().is_ok());
4934        Ok((system_obj, system_epoch_info_event, effects))
4935    }
4936
4937    /// This function is called at the very end of the epoch.
4938    /// This step is required before updating new epoch in the db and calling
4939    /// reopen_epoch_db.
4940    #[instrument(level = "error", skip_all)]
4941    async fn revert_uncommitted_epoch_transactions(
4942        &self,
4943        epoch_store: &AuthorityPerEpochStore,
4944    ) -> IotaResult {
4945        {
4946            let state = epoch_store.get_reconfig_state_write_lock_guard();
4947            if state.should_accept_user_certs() {
4948                // Need to change this so that consensus adapter do not accept certificates from
4949                // user. This can happen if our local validator did not initiate
4950                // epoch change locally, but 2f+1 nodes already concluded the
4951                // epoch.
4952                //
4953                // This lock is essentially a barrier for
4954                // `epoch_store.pending_consensus_certificates` table we are reading on the line
4955                // after this block
4956                epoch_store.close_user_certs(state);
4957            }
4958            // lock is dropped here
4959        }
4960        let pending_certificates = epoch_store.pending_consensus_certificates();
4961        info!(
4962            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
4963            pending_certificates.len(),
4964            pending_certificates,
4965        );
4966        for digest in pending_certificates {
4967            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
4968                info!(
4969                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
4970                    digest
4971                );
4972                continue;
4973            }
4974            info!("Reverting {:?} at the end of epoch", digest);
4975            epoch_store.revert_executed_transaction(&digest)?;
4976            self.get_reconfig_api().try_revert_state_update(&digest)?;
4977        }
4978        info!("All uncommitted local transactions reverted");
4979        Ok(())
4980    }
4981
4982    #[instrument(level = "error", skip_all)]
4983    async fn reopen_epoch_db(
4984        &self,
4985        cur_epoch_store: &AuthorityPerEpochStore,
4986        new_committee: Committee,
4987        epoch_start_configuration: EpochStartConfiguration,
4988        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4989        epoch_last_checkpoint: CheckpointSequenceNumber,
4990    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
4991        let new_epoch = new_committee.epoch;
4992        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
4993        assert_eq!(
4994            epoch_start_configuration.epoch_start_state().epoch(),
4995            new_committee.epoch
4996        );
4997        fail_point!("before-open-new-epoch-store");
4998        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
4999            self.name,
5000            new_committee,
5001            epoch_start_configuration,
5002            self.get_backing_package_store().clone(),
5003            self.get_object_store().clone(),
5004            expensive_safety_check_config,
5005            cur_epoch_store.get_chain_identifier(),
5006            epoch_last_checkpoint,
5007        );
5008        self.epoch_store.store(new_epoch_store.clone());
5009        Ok(new_epoch_store)
5010    }
5011
5012    #[cfg(test)]
5013    pub(crate) fn iter_live_object_set_for_testing(
5014        &self,
5015    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5016        self.get_accumulator_store()
5017            .iter_cached_live_object_set_for_testing()
5018    }
5019
5020    #[cfg(test)]
5021    pub(crate) fn shutdown_execution_for_test(&self) {
5022        self.tx_execution_shutdown
5023            .lock()
5024            .take()
5025            .unwrap()
5026            .send(())
5027            .unwrap();
5028    }
5029
5030    /// NOTE: this function is only to be used for fuzzing and testing. Never
5031    /// use in prod
5032    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5033        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5034        self.get_object_cache_reader()
5035            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5036        self.get_reconfig_api()
5037            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5038    }
5039}
5040
5041pub struct RandomnessRoundReceiver {
5042    authority_state: Arc<AuthorityState>,
5043    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5044}
5045
5046impl RandomnessRoundReceiver {
5047    pub fn spawn(
5048        authority_state: Arc<AuthorityState>,
5049        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5050    ) -> JoinHandle<()> {
5051        let rrr = RandomnessRoundReceiver {
5052            authority_state,
5053            randomness_rx,
5054        };
5055        spawn_monitored_task!(rrr.run())
5056    }
5057
5058    async fn run(mut self) {
5059        info!("RandomnessRoundReceiver event loop started");
5060
5061        loop {
5062            tokio::select! {
5063                maybe_recv = self.randomness_rx.recv() => {
5064                    if let Some((epoch, round, bytes)) = maybe_recv {
5065                        self.handle_new_randomness(epoch, round, bytes);
5066                    } else {
5067                        break;
5068                    }
5069                },
5070            }
5071        }
5072
5073        info!("RandomnessRoundReceiver event loop ended");
5074    }
5075
5076    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5077    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5078        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5079        if epoch_store.epoch() != epoch {
5080            warn!(
5081                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5082                epoch_store.epoch()
5083            );
5084            return;
5085        }
5086        let transaction = VerifiedTransaction::new_randomness_state_update(
5087            epoch,
5088            round,
5089            bytes,
5090            epoch_store
5091                .epoch_start_config()
5092                .randomness_obj_initial_shared_version(),
5093        );
5094        debug!(
5095            "created randomness state update transaction with digest: {:?}",
5096            transaction.digest()
5097        );
5098        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5099        let digest = *transaction.digest();
5100
5101        // Randomness state updates contain the full bls signature for the random round,
5102        // which cannot necessarily be reconstructed again later. Therefore we must
5103        // immediately persist this transaction. If we crash before its outputs
5104        // are committed, this ensures we will be able to re-execute it.
5105        self.authority_state
5106            .get_cache_commit()
5107            .persist_transaction(&transaction);
5108
5109        // Send transaction to TransactionManager for execution.
5110        self.authority_state
5111            .transaction_manager()
5112            .enqueue(vec![transaction], &epoch_store);
5113
5114        let authority_state = self.authority_state.clone();
5115        spawn_monitored_task!(async move {
5116            // Wait for transaction execution in a separate task, to avoid deadlock in case
5117            // of out-of-order randomness generation. (Each
5118            // RandomnessStateUpdate depends on the output of the
5119            // RandomnessStateUpdate from the previous round.)
5120            //
5121            // We set a very long timeout so that in case this gets stuck for some reason,
5122            // the validator will eventually crash rather than continuing in a
5123            // zombie mode.
5124            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5125            let result = tokio::time::timeout(
5126                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5127                authority_state
5128                    .get_transaction_cache_reader()
5129                    .try_notify_read_executed_effects(&[digest]),
5130            )
5131            .await;
5132            let result = match result {
5133                Ok(result) => result,
5134                Err(_) => {
5135                    if cfg!(debug_assertions) {
5136                        // Crash on randomness update execution timeout in debug builds.
5137                        panic!(
5138                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5139                        );
5140                    }
5141                    warn!(
5142                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5143                    );
5144                    // Continue waiting as long as necessary in non-debug builds.
5145                    authority_state
5146                        .get_transaction_cache_reader()
5147                        .try_notify_read_executed_effects(&[digest])
5148                        .await
5149                }
5150            };
5151
5152            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5153            let effects = effects.pop().expect("should return effects");
5154            if *effects.status() != ExecutionStatus::Success {
5155                fatal!(
5156                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5157                );
5158            }
5159            debug!(
5160                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5161            );
5162        });
5163    }
5164}
5165
5166#[async_trait]
5167impl TransactionKeyValueStoreTrait for AuthorityState {
5168    async fn multi_get(
5169        &self,
5170        transaction_keys: &[TransactionDigest],
5171        effects_keys: &[TransactionDigest],
5172    ) -> IotaResult<KVStoreTransactionData> {
5173        let txns = if !transaction_keys.is_empty() {
5174            self.get_transaction_cache_reader()
5175                .try_multi_get_transaction_blocks(transaction_keys)?
5176                .into_iter()
5177                .map(|t| t.map(|t| (*t).clone().into_inner()))
5178                .collect()
5179        } else {
5180            vec![]
5181        };
5182
5183        let fx = if !effects_keys.is_empty() {
5184            self.get_transaction_cache_reader()
5185                .try_multi_get_executed_effects(effects_keys)?
5186        } else {
5187            vec![]
5188        };
5189
5190        Ok((txns, fx))
5191    }
5192
5193    async fn multi_get_checkpoints(
5194        &self,
5195        checkpoint_summaries: &[CheckpointSequenceNumber],
5196        checkpoint_contents: &[CheckpointSequenceNumber],
5197        checkpoint_summaries_by_digest: &[CheckpointDigest],
5198    ) -> IotaResult<(
5199        Vec<Option<CertifiedCheckpointSummary>>,
5200        Vec<Option<CheckpointContents>>,
5201        Vec<Option<CertifiedCheckpointSummary>>,
5202    )> {
5203        // TODO: use multi-get methods if it ever becomes important (unlikely)
5204        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5205        let store = self.get_checkpoint_store();
5206        for seq in checkpoint_summaries {
5207            let checkpoint = store
5208                .get_checkpoint_by_sequence_number(*seq)?
5209                .map(|c| c.into_inner());
5210
5211            summaries.push(checkpoint);
5212        }
5213
5214        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5215        for seq in checkpoint_contents {
5216            let checkpoint = store
5217                .get_checkpoint_by_sequence_number(*seq)?
5218                .and_then(|summary| {
5219                    store
5220                        .get_checkpoint_contents(&summary.content_digest)
5221                        .expect("db read cannot fail")
5222                });
5223            contents.push(checkpoint);
5224        }
5225
5226        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5227        for digest in checkpoint_summaries_by_digest {
5228            let checkpoint = store
5229                .get_checkpoint_by_digest(digest)?
5230                .map(|c| c.into_inner());
5231            summaries_by_digest.push(checkpoint);
5232        }
5233
5234        Ok((summaries, contents, summaries_by_digest))
5235    }
5236
5237    async fn get_transaction_perpetual_checkpoint(
5238        &self,
5239        digest: TransactionDigest,
5240    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5241        self.get_checkpoint_cache()
5242            .try_get_transaction_perpetual_checkpoint(&digest)
5243            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5244    }
5245
5246    async fn get_object(
5247        &self,
5248        object_id: ObjectID,
5249        version: VersionNumber,
5250    ) -> IotaResult<Option<Object>> {
5251        self.get_object_cache_reader()
5252            .try_get_object_by_key(&object_id, version)
5253    }
5254
5255    async fn multi_get_transactions_perpetual_checkpoints(
5256        &self,
5257        digests: &[TransactionDigest],
5258    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5259        let res = self
5260            .get_checkpoint_cache()
5261            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5262
5263        Ok(res
5264            .into_iter()
5265            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5266            .collect())
5267    }
5268
5269    #[instrument(skip(self))]
5270    async fn multi_get_events_by_tx_digests(
5271        &self,
5272        digests: &[TransactionDigest],
5273    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5274        if digests.is_empty() {
5275            return Ok(vec![]);
5276        }
5277        let events_digests: Vec<_> = self
5278            .get_transaction_cache_reader()
5279            .try_multi_get_executed_effects(digests)?
5280            .into_iter()
5281            .map(|t| t.and_then(|t| t.events_digest().cloned()))
5282            .collect();
5283        let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5284        let mut events = self
5285            .get_transaction_cache_reader()
5286            .try_multi_get_events(&non_empty_events)?
5287            .into_iter();
5288        Ok(events_digests
5289            .into_iter()
5290            .map(|ev| ev.and_then(|_| events.next()?))
5291            .collect())
5292    }
5293}
5294
5295#[cfg(msim)]
5296pub mod framework_injection {
5297    use std::{
5298        cell::RefCell,
5299        collections::{BTreeMap, BTreeSet},
5300    };
5301
5302    use iota_framework::{BuiltInFramework, SystemPackage};
5303    use iota_types::{
5304        base_types::{AuthorityName, ObjectID},
5305        is_system_package,
5306    };
5307    use move_binary_format::CompiledModule;
5308
5309    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5310
5311    // Thread local cache because all simtests run in a single unique thread.
5312    thread_local! {
5313        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5314    }
5315
5316    type Framework = Vec<CompiledModule>;
5317
5318    pub type PackageUpgradeCallback =
5319        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5320
5321    enum PackageOverrideConfig {
5322        Global(Framework),
5323        PerValidator(PackageUpgradeCallback),
5324    }
5325
5326    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5327        modules
5328            .iter()
5329            .map(|m| {
5330                let mut buf = Vec::new();
5331                m.serialize_with_version(m.version, &mut buf).unwrap();
5332                buf
5333            })
5334            .collect()
5335    }
5336
5337    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5338        OVERRIDE.with(|bs| {
5339            bs.borrow_mut()
5340                .insert(package_id, PackageOverrideConfig::Global(modules))
5341        });
5342    }
5343
5344    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5345        OVERRIDE.with(|bs| {
5346            bs.borrow_mut()
5347                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5348        });
5349    }
5350
5351    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5352        OVERRIDE.with(|cfg| {
5353            cfg.borrow().get(package_id).and_then(|entry| match entry {
5354                PackageOverrideConfig::Global(framework) => {
5355                    Some(compiled_modules_to_bytes(framework))
5356                }
5357                PackageOverrideConfig::PerValidator(func) => {
5358                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5359                }
5360            })
5361        })
5362    }
5363
5364    pub fn get_override_modules(
5365        package_id: &ObjectID,
5366        name: AuthorityName,
5367    ) -> Option<Vec<CompiledModule>> {
5368        OVERRIDE.with(|cfg| {
5369            cfg.borrow().get(package_id).and_then(|entry| match entry {
5370                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5371                PackageOverrideConfig::PerValidator(func) => func(name),
5372            })
5373        })
5374    }
5375
5376    pub fn get_override_system_package(
5377        package_id: &ObjectID,
5378        name: AuthorityName,
5379    ) -> Option<SystemPackage> {
5380        let bytes = get_override_bytes(package_id, name)?;
5381        let dependencies = if is_system_package(*package_id) {
5382            BuiltInFramework::get_package_by_id(package_id)
5383                .dependencies
5384                .to_vec()
5385        } else {
5386            // Assume that entirely new injected packages depend on all existing system
5387            // packages.
5388            BuiltInFramework::all_package_ids()
5389        };
5390        Some(SystemPackage {
5391            id: *package_id,
5392            bytes,
5393            dependencies,
5394        })
5395    }
5396
5397    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5398        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5399        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5400            cfg.borrow()
5401                .keys()
5402                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5403                .collect()
5404        });
5405
5406        extra
5407            .into_iter()
5408            .map(|package| SystemPackage {
5409                id: package,
5410                bytes: get_override_bytes(&package, name).unwrap(),
5411                dependencies: BuiltInFramework::all_package_ids(),
5412            })
5413            .collect()
5414    }
5415}
5416
5417#[derive(Debug, Serialize, Deserialize, Clone)]
5418pub struct ObjDumpFormat {
5419    pub id: ObjectID,
5420    pub version: VersionNumber,
5421    pub digest: ObjectDigest,
5422    pub object: Object,
5423}
5424
5425impl ObjDumpFormat {
5426    fn new(object: Object) -> Self {
5427        let oref = object.compute_object_reference();
5428        Self {
5429            id: oref.0,
5430            version: oref.1,
5431            digest: oref.2,
5432            object,
5433        }
5434    }
5435}
5436
5437#[derive(Debug, Serialize, Deserialize, Clone)]
5438pub struct NodeStateDump {
5439    pub tx_digest: TransactionDigest,
5440    pub sender_signed_data: SenderSignedData,
5441    pub executed_epoch: u64,
5442    pub reference_gas_price: u64,
5443    pub protocol_version: u64,
5444    pub epoch_start_timestamp_ms: u64,
5445    pub computed_effects: TransactionEffects,
5446    pub expected_effects_digest: TransactionEffectsDigest,
5447    pub relevant_system_packages: Vec<ObjDumpFormat>,
5448    pub shared_objects: Vec<ObjDumpFormat>,
5449    pub loaded_child_objects: Vec<ObjDumpFormat>,
5450    pub modified_at_versions: Vec<ObjDumpFormat>,
5451    pub runtime_reads: Vec<ObjDumpFormat>,
5452    pub input_objects: Vec<ObjDumpFormat>,
5453}
5454
5455impl NodeStateDump {
5456    pub fn new(
5457        tx_digest: &TransactionDigest,
5458        effects: &TransactionEffects,
5459        expected_effects_digest: TransactionEffectsDigest,
5460        object_store: &dyn ObjectStore,
5461        epoch_store: &Arc<AuthorityPerEpochStore>,
5462        inner_temporary_store: &InnerTemporaryStore,
5463        certificate: &VerifiedExecutableTransaction,
5464    ) -> IotaResult<Self> {
5465        // Epoch info
5466        let executed_epoch = epoch_store.epoch();
5467        let reference_gas_price = epoch_store.reference_gas_price();
5468        let epoch_start_config = epoch_store.epoch_start_config();
5469        let protocol_version = epoch_store.protocol_version().as_u64();
5470        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5471
5472        // Record all system packages at this version
5473        let mut relevant_system_packages = Vec::new();
5474        for sys_package_id in BuiltInFramework::all_package_ids() {
5475            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5476                relevant_system_packages.push(ObjDumpFormat::new(w))
5477            }
5478        }
5479
5480        // Record all the shared objects
5481        let mut shared_objects = Vec::new();
5482        for kind in effects.input_shared_objects() {
5483            match kind {
5484                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5485                    if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5486                        shared_objects.push(ObjDumpFormat::new(w))
5487                    }
5488                }
5489                InputSharedObject::ReadDeleted(..)
5490                | InputSharedObject::MutateDeleted(..)
5491                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
5492                                                           * objects. */
5493            }
5494        }
5495
5496        // Record all loaded child objects
5497        // Child objects which are read but not mutated are not tracked anywhere else
5498        let mut loaded_child_objects = Vec::new();
5499        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5500            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5501                loaded_child_objects.push(ObjDumpFormat::new(w))
5502            }
5503        }
5504
5505        // Record all modified objects
5506        let mut modified_at_versions = Vec::new();
5507        for (id, ver) in effects.modified_at_versions() {
5508            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5509                modified_at_versions.push(ObjDumpFormat::new(w))
5510            }
5511        }
5512
5513        // Packages read at runtime, which were not previously loaded into the temoorary
5514        // store Some packages may be fetched at runtime and wont show up in
5515        // input objects
5516        let mut runtime_reads = Vec::new();
5517        for obj in inner_temporary_store
5518            .runtime_packages_loaded_from_db
5519            .values()
5520        {
5521            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5522        }
5523
5524        // All other input objects should already be in `inner_temporary_store.objects`
5525
5526        Ok(Self {
5527            tx_digest: *tx_digest,
5528            executed_epoch,
5529            reference_gas_price,
5530            epoch_start_timestamp_ms,
5531            protocol_version,
5532            relevant_system_packages,
5533            shared_objects,
5534            loaded_child_objects,
5535            modified_at_versions,
5536            runtime_reads,
5537            sender_signed_data: certificate.clone().into_message(),
5538            input_objects: inner_temporary_store
5539                .input_objects
5540                .values()
5541                .map(|o| ObjDumpFormat::new(o.clone()))
5542                .collect(),
5543            computed_effects: effects.clone(),
5544            expected_effects_digest,
5545        })
5546    }
5547
5548    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5549        let mut objects = Vec::new();
5550        objects.extend(self.relevant_system_packages.clone());
5551        objects.extend(self.shared_objects.clone());
5552        objects.extend(self.loaded_child_objects.clone());
5553        objects.extend(self.modified_at_versions.clone());
5554        objects.extend(self.runtime_reads.clone());
5555        objects.extend(self.input_objects.clone());
5556        objects
5557    }
5558
5559    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5560        let file_name = format!(
5561            "{}_{}_NODE_DUMP.json",
5562            self.tx_digest,
5563            AuthorityState::unixtime_now_ms()
5564        );
5565        let mut path = path.to_path_buf();
5566        path.push(&file_name);
5567        let mut file = File::create(path.clone())?;
5568        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5569        Ok(path)
5570    }
5571
5572    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5573        let file = File::open(path)?;
5574        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5575    }
5576}