iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49    key_value_store::{
50        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51    },
52    key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57    IOTA_SYSTEM_ADDRESS, TypeTag,
58    account_abstraction::{
59        account::AuthenticatorFunctionRefV1Key,
60        authenticator_function::{
61            AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62            AuthenticatorFunctionRefV1,
63        },
64    },
65    authenticator_state::get_authenticator_state,
66    base_types::*,
67    committee::{Committee, EpochId, ProtocolVersion},
68    crypto::{
69        AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70        default_hash,
71    },
72    deny_list_v1::check_coin_deny_list_v1_during_signing,
73    digests::{ChainIdentifier, Digest},
74    dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75    effects::{
76        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77        TransactionEvents, VerifiedSignedTransactionEffects,
78    },
79    error::{ExecutionError, IotaError, IotaResult, UserInputError},
80    event::{Event, EventID, SystemEpochInfoEvent},
81    executable_transaction::VerifiedExecutableTransaction,
82    execution_config_utils::to_binary_config,
83    execution_status::ExecutionStatus,
84    fp_ensure,
85    gas::{GasCostSummary, IotaGasStatus},
86    gas_coin::NANOS_PER_IOTA,
87    inner_temporary_store::{
88        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89        WrittenObjects,
90    },
91    iota_system_state::{
92        IotaSystemState, IotaSystemStateTrait,
93        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94    },
95    is_system_package,
96    layout_resolver::{LayoutResolver, into_struct_layout},
97    message_envelope::Message,
98    messages_checkpoint::{
99        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103    },
104    messages_consensus::AuthorityCapabilitiesV1,
105    messages_grpc::{
106        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108        TransactionStatus,
109    },
110    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111    move_authenticator::MoveAuthenticator,
112    object::{
113        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114        bounded_visitor::BoundedVisitor,
115    },
116    storage::{
117        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118    },
119    supported_protocol_versions::{
120        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121    },
122    transaction::*,
123    transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131    register_histogram_vec_with_registry, register_histogram_with_registry,
132    register_int_counter_vec_with_registry, register_int_counter_with_registry,
133    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139    task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152    authority::{
153        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157        authority_store_tables::AuthorityPrunerTables,
158        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159    },
160    authority_client::NetworkAuthorityClient,
161    checkpoints::CheckpointStore,
162    congestion_tracker::CongestionTracker,
163    consensus_adapter::ConsensusAdapter,
164    epoch::committee_store::CommitteeStore,
165    execution_cache::{
166        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168        TransactionCacheRead,
169    },
170    execution_driver::execution_process,
171    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
172    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
173    metrics::{LatencyObserver, RateTracker},
174    module_cache_metrics::ResolverMetrics,
175    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
176    stake_aggregator::StakeAggregator,
177    state_accumulator::{AccumulatorStore, StateAccumulator},
178    subscription_handler::SubscriptionHandler,
179    transaction_input_loader::TransactionInputLoader,
180    transaction_manager::TransactionManager,
181    transaction_outputs::TransactionOutputs,
182    validator_tx_finalizer::ValidatorTxFinalizer,
183    verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223mod authority_store_migrations;
224pub mod authority_store_pruner;
225pub mod authority_store_tables;
226pub mod authority_store_types;
227pub mod epoch_start_configuration;
228pub mod shared_object_congestion_tracker;
229pub mod shared_object_version_manager;
230pub mod suggested_gas_price_calculator;
231pub mod test_authority_builder;
232pub mod transaction_deferral;
233
234pub(crate) mod authority_store;
235pub mod backpressure;
236
237/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
238pub struct AuthorityMetrics {
239    tx_orders: IntCounter,
240    total_certs: IntCounter,
241    total_cert_attempts: IntCounter,
242    total_effects: IntCounter,
243    pub shared_obj_tx: IntCounter,
244    sponsored_tx: IntCounter,
245    tx_already_processed: IntCounter,
246    num_input_objs: Histogram,
247    num_shared_objects: Histogram,
248    batch_size: Histogram,
249
250    authority_state_handle_transaction_latency: Histogram,
251
252    execute_certificate_latency_single_writer: Histogram,
253    execute_certificate_latency_shared_object: Histogram,
254
255    internal_execution_latency: Histogram,
256    execution_load_input_objects_latency: Histogram,
257    prepare_certificate_latency: Histogram,
258    commit_certificate_latency: Histogram,
259    db_checkpoint_latency: Histogram,
260
261    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
262    pub(crate) transaction_manager_num_missing_objects: IntGauge,
263    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
264    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
265    pub(crate) transaction_manager_num_ready: IntGauge,
266    pub(crate) transaction_manager_object_cache_size: IntGauge,
267    pub(crate) transaction_manager_object_cache_hits: IntCounter,
268    pub(crate) transaction_manager_object_cache_misses: IntCounter,
269    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
270    pub(crate) transaction_manager_package_cache_size: IntGauge,
271    pub(crate) transaction_manager_package_cache_hits: IntCounter,
272    pub(crate) transaction_manager_package_cache_misses: IntCounter,
273    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
274    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
275
276    pub(crate) execution_driver_executed_transactions: IntCounter,
277    pub(crate) execution_driver_dispatch_queue: IntGauge,
278    pub(crate) execution_queueing_delay_s: Histogram,
279    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
280    pub(crate) execution_gas_latency_ratio: Histogram,
281
282    pub(crate) skipped_consensus_txns: IntCounter,
283    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
284
285    pub(crate) authority_overload_status: IntGauge,
286    pub(crate) authority_load_shedding_percentage: IntGauge,
287
288    pub(crate) transaction_overload_sources: IntCounterVec,
289
290    /// Post processing metrics
291    post_processing_total_events_emitted: IntCounter,
292    post_processing_total_tx_indexed: IntCounter,
293    post_processing_total_tx_had_event_processed: IntCounter,
294    post_processing_total_failures: IntCounter,
295
296    /// Consensus handler metrics
297    pub consensus_handler_processed: IntCounterVec,
298    pub consensus_handler_transaction_sizes: HistogramVec,
299    pub consensus_handler_num_low_scoring_authorities: IntGauge,
300    pub consensus_handler_scores: IntGaugeVec,
301    pub consensus_handler_deferred_transactions: IntCounter,
302    pub consensus_handler_congested_transactions: IntCounter,
303    pub consensus_handler_cancelled_transactions: IntCounter,
304    pub consensus_handler_max_object_costs: IntGaugeVec,
305    pub consensus_committed_subdags: IntCounterVec,
306    pub consensus_committed_messages: IntGaugeVec,
307    pub consensus_committed_user_transactions: IntGaugeVec,
308    pub consensus_handler_leader_round: IntGauge,
309    pub consensus_calculated_throughput: IntGauge,
310    pub consensus_calculated_throughput_profile: IntGauge,
311
312    pub limits_metrics: Arc<LimitsMetrics>,
313
314    /// bytecode verifier metrics for tracking timeouts
315    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
316
317    /// Count of zklogin signatures
318    pub zklogin_sig_count: IntCounter,
319    /// Count of multisig signatures
320    pub multisig_sig_count: IntCounter,
321
322    // Tracks recent average txn queueing delay between when it is ready for execution
323    // until it starts executing.
324    pub execution_queueing_latency: LatencyObserver,
325
326    // Tracks the rate of transactions become ready for execution in transaction manager.
327    // The need for the Mutex is that the tracker is updated in transaction manager and read
328    // in the overload_monitor. There should be low mutex contention because
329    // transaction manager is single threaded and the read rate in overload_monitor is
330    // low. In the case where transaction manager becomes multi-threaded, we can
331    // create one rate tracker per thread.
332    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
333
334    // Tracks the rate of transactions starts execution in execution driver.
335    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
336    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
337}
338
339// Override default Prom buckets for positive numbers in 0-10M range
340const POSITIVE_INT_BUCKETS: &[f64] = &[
341    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
342    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
343    7000000., 10000000.,
344];
345
346const LATENCY_SEC_BUCKETS: &[f64] = &[
347    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
348    10., 20., 30., 60., 90.,
349];
350
351// Buckets for low latency samples. Starts from 10us.
352const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
353    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
354    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
355];
356
357const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
358    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
359    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
360];
361
362/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
363pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
364
365impl AuthorityMetrics {
366    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
367        let execute_certificate_latency = register_histogram_vec_with_registry!(
368            "authority_state_execute_certificate_latency",
369            "Latency of executing certificates, including waiting for inputs",
370            &["tx_type"],
371            LATENCY_SEC_BUCKETS.to_vec(),
372            registry,
373        )
374        .unwrap();
375
376        let execute_certificate_latency_single_writer =
377            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
378        let execute_certificate_latency_shared_object =
379            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
380
381        Self {
382            tx_orders: register_int_counter_with_registry!(
383                "total_transaction_orders",
384                "Total number of transaction orders",
385                registry,
386            )
387            .unwrap(),
388            total_certs: register_int_counter_with_registry!(
389                "total_transaction_certificates",
390                "Total number of transaction certificates handled",
391                registry,
392            )
393            .unwrap(),
394            total_cert_attempts: register_int_counter_with_registry!(
395                "total_handle_certificate_attempts",
396                "Number of calls to handle_certificate",
397                registry,
398            )
399            .unwrap(),
400            // total_effects == total transactions finished
401            total_effects: register_int_counter_with_registry!(
402                "total_transaction_effects",
403                "Total number of transaction effects produced",
404                registry,
405            )
406            .unwrap(),
407
408            shared_obj_tx: register_int_counter_with_registry!(
409                "num_shared_obj_tx",
410                "Number of transactions involving shared objects",
411                registry,
412            )
413            .unwrap(),
414
415            sponsored_tx: register_int_counter_with_registry!(
416                "num_sponsored_tx",
417                "Number of sponsored transactions",
418                registry,
419            )
420            .unwrap(),
421
422            tx_already_processed: register_int_counter_with_registry!(
423                "num_tx_already_processed",
424                "Number of transaction orders already processed previously",
425                registry,
426            )
427            .unwrap(),
428            num_input_objs: register_histogram_with_registry!(
429                "num_input_objects",
430                "Distribution of number of input TX objects per TX",
431                POSITIVE_INT_BUCKETS.to_vec(),
432                registry,
433            )
434            .unwrap(),
435            num_shared_objects: register_histogram_with_registry!(
436                "num_shared_objects",
437                "Number of shared input objects per TX",
438                POSITIVE_INT_BUCKETS.to_vec(),
439                registry,
440            )
441            .unwrap(),
442            batch_size: register_histogram_with_registry!(
443                "batch_size",
444                "Distribution of size of transaction batch",
445                POSITIVE_INT_BUCKETS.to_vec(),
446                registry,
447            )
448            .unwrap(),
449            authority_state_handle_transaction_latency: register_histogram_with_registry!(
450                "authority_state_handle_transaction_latency",
451                "Latency of handling transactions",
452                LATENCY_SEC_BUCKETS.to_vec(),
453                registry,
454            )
455            .unwrap(),
456            execute_certificate_latency_single_writer,
457            execute_certificate_latency_shared_object,
458            internal_execution_latency: register_histogram_with_registry!(
459                "authority_state_internal_execution_latency",
460                "Latency of actual certificate executions",
461                LATENCY_SEC_BUCKETS.to_vec(),
462                registry,
463            )
464            .unwrap(),
465            execution_load_input_objects_latency: register_histogram_with_registry!(
466                "authority_state_execution_load_input_objects_latency",
467                "Latency of loading input objects for execution",
468                LOW_LATENCY_SEC_BUCKETS.to_vec(),
469                registry,
470            )
471            .unwrap(),
472            prepare_certificate_latency: register_histogram_with_registry!(
473                "authority_state_prepare_certificate_latency",
474                "Latency of executing certificates, before committing the results",
475                LATENCY_SEC_BUCKETS.to_vec(),
476                registry,
477            )
478            .unwrap(),
479            commit_certificate_latency: register_histogram_with_registry!(
480                "authority_state_commit_certificate_latency",
481                "Latency of committing certificate execution results",
482                LATENCY_SEC_BUCKETS.to_vec(),
483                registry,
484            )
485            .unwrap(),
486            db_checkpoint_latency: register_histogram_with_registry!(
487                "db_checkpoint_latency",
488                "Latency of checkpointing dbs",
489                LATENCY_SEC_BUCKETS.to_vec(),
490                registry,
491            ).unwrap(),
492            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
493                "transaction_manager_num_enqueued_certificates",
494                "Current number of certificates enqueued to TransactionManager",
495                &["result"],
496                registry,
497            )
498            .unwrap(),
499            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
500                "transaction_manager_num_missing_objects",
501                "Current number of missing objects in TransactionManager",
502                registry,
503            )
504            .unwrap(),
505            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
506                "transaction_manager_num_pending_certificates",
507                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
508                registry,
509            )
510            .unwrap(),
511            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
512                "transaction_manager_num_executing_certificates",
513                "Number of executing certificates, including queued and actually running certificates",
514                registry,
515            )
516            .unwrap(),
517            transaction_manager_num_ready: register_int_gauge_with_registry!(
518                "transaction_manager_num_ready",
519                "Number of ready transactions in TransactionManager",
520                registry,
521            )
522            .unwrap(),
523            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
524                "transaction_manager_object_cache_size",
525                "Current size of object-availability cache in TransactionManager",
526                registry,
527            )
528            .unwrap(),
529            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
530                "transaction_manager_object_cache_hits",
531                "Number of object-availability cache hits in TransactionManager",
532                registry,
533            )
534            .unwrap(),
535            authority_overload_status: register_int_gauge_with_registry!(
536                "authority_overload_status",
537                "Whether authority is current experiencing overload and enters load shedding mode.",
538                registry)
539            .unwrap(),
540            authority_load_shedding_percentage: register_int_gauge_with_registry!(
541                "authority_load_shedding_percentage",
542                "The percentage of transactions is shed when the authority is in load shedding mode.",
543                registry)
544            .unwrap(),
545            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
546                "transaction_manager_object_cache_misses",
547                "Number of object-availability cache misses in TransactionManager",
548                registry,
549            )
550            .unwrap(),
551            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
552                "transaction_manager_object_cache_evictions",
553                "Number of object-availability cache evictions in TransactionManager",
554                registry,
555            )
556            .unwrap(),
557            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
558                "transaction_manager_package_cache_size",
559                "Current size of package-availability cache in TransactionManager",
560                registry,
561            )
562            .unwrap(),
563            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
564                "transaction_manager_package_cache_hits",
565                "Number of package-availability cache hits in TransactionManager",
566                registry,
567            )
568            .unwrap(),
569            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
570                "transaction_manager_package_cache_misses",
571                "Number of package-availability cache misses in TransactionManager",
572                registry,
573            )
574            .unwrap(),
575            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
576                "transaction_manager_package_cache_evictions",
577                "Number of package-availability cache evictions in TransactionManager",
578                registry,
579            )
580            .unwrap(),
581            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
582                "transaction_manager_transaction_queue_age_s",
583                "Time spent in waiting for transaction in the queue",
584                LATENCY_SEC_BUCKETS.to_vec(),
585                registry,
586            )
587            .unwrap(),
588            transaction_overload_sources: register_int_counter_vec_with_registry!(
589                "transaction_overload_sources",
590                "Number of times each source indicates transaction overload.",
591                &["source"],
592                registry)
593            .unwrap(),
594            execution_driver_executed_transactions: register_int_counter_with_registry!(
595                "execution_driver_executed_transactions",
596                "Cumulative number of transaction executed by execution driver",
597                registry,
598            )
599            .unwrap(),
600            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
601                "execution_driver_dispatch_queue",
602                "Number of transaction pending in execution driver dispatch queue",
603                registry,
604            )
605            .unwrap(),
606            execution_queueing_delay_s: register_histogram_with_registry!(
607                "execution_queueing_delay_s",
608                "Queueing delay between a transaction is ready for execution until it starts executing.",
609                LATENCY_SEC_BUCKETS.to_vec(),
610                registry
611            )
612            .unwrap(),
613            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
614                "prepare_cert_gas_latency_ratio",
615                "The ratio of computation gas divided by VM execution latency.",
616                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
617                registry
618            )
619            .unwrap(),
620            execution_gas_latency_ratio: register_histogram_with_registry!(
621                "execution_gas_latency_ratio",
622                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
623                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
624                registry
625            )
626            .unwrap(),
627            skipped_consensus_txns: register_int_counter_with_registry!(
628                "skipped_consensus_txns",
629                "Total number of consensus transactions skipped",
630                registry,
631            )
632            .unwrap(),
633            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
634                "skipped_consensus_txns_cache_hit",
635                "Total number of consensus transactions skipped because of local cache hit",
636                registry,
637            )
638            .unwrap(),
639            post_processing_total_events_emitted: register_int_counter_with_registry!(
640                "post_processing_total_events_emitted",
641                "Total number of events emitted in post processing",
642                registry,
643            )
644            .unwrap(),
645            post_processing_total_tx_indexed: register_int_counter_with_registry!(
646                "post_processing_total_tx_indexed",
647                "Total number of txes indexed in post processing",
648                registry,
649            )
650            .unwrap(),
651            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
652                "post_processing_total_tx_had_event_processed",
653                "Total number of txes finished event processing in post processing",
654                registry,
655            )
656            .unwrap(),
657            post_processing_total_failures: register_int_counter_with_registry!(
658                "post_processing_total_failures",
659                "Total number of failure in post processing",
660                registry,
661            )
662            .unwrap(),
663            consensus_handler_processed: register_int_counter_vec_with_registry!(
664                "consensus_handler_processed",
665                "Number of transactions processed by consensus handler",
666                &["class"],
667                registry
668            ).unwrap(),
669            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
670                "consensus_handler_transaction_sizes",
671                "Sizes of each type of transactions processed by consensus handler",
672                &["class"],
673                POSITIVE_INT_BUCKETS.to_vec(),
674                registry
675            ).unwrap(),
676            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
677                "consensus_handler_num_low_scoring_authorities",
678                "Number of low scoring authorities based on reputation scores from consensus",
679                registry
680            ).unwrap(),
681            consensus_handler_scores: register_int_gauge_vec_with_registry!(
682                "consensus_handler_scores",
683                "scores from consensus for each authority",
684                &["authority"],
685                registry,
686            ).unwrap(),
687            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
688                "consensus_handler_deferred_transactions",
689                "Number of transactions deferred by consensus handler",
690                registry,
691            ).unwrap(),
692            consensus_handler_congested_transactions: register_int_counter_with_registry!(
693                "consensus_handler_congested_transactions",
694                "Number of transactions deferred by consensus handler due to congestion",
695                registry,
696            ).unwrap(),
697            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
698                "consensus_handler_cancelled_transactions",
699                "Number of transactions cancelled by consensus handler",
700                registry,
701            ).unwrap(),
702            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
703                "consensus_handler_max_congestion_control_object_costs",
704                "Max object costs for congestion control in the current consensus commit",
705                &["commit_type"],
706                registry,
707            ).unwrap(),
708            consensus_committed_subdags: register_int_counter_vec_with_registry!(
709                "consensus_committed_subdags",
710                "Number of committed subdags, sliced by leader",
711                &["authority"],
712                registry,
713            ).unwrap(),
714            consensus_committed_messages: register_int_gauge_vec_with_registry!(
715                "consensus_committed_messages",
716                "Total number of committed consensus messages, sliced by author",
717                &["authority"],
718                registry,
719            ).unwrap(),
720            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
721                "consensus_committed_user_transactions",
722                "Number of committed user transactions, sliced by submitter",
723                &["authority"],
724                registry,
725            ).unwrap(),
726            consensus_handler_leader_round: register_int_gauge_with_registry!(
727                "consensus_handler_leader_round",
728                "The leader round of the current consensus output being processed in the consensus handler",
729                registry,
730            ).unwrap(),
731            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
732            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
733            zklogin_sig_count: register_int_counter_with_registry!(
734                "zklogin_sig_count",
735                "Count of zkLogin signatures",
736                registry,
737            )
738            .unwrap(),
739            multisig_sig_count: register_int_counter_with_registry!(
740                "multisig_sig_count",
741                "Count of zkLogin signatures",
742                registry,
743            )
744            .unwrap(),
745            consensus_calculated_throughput: register_int_gauge_with_registry!(
746                "consensus_calculated_throughput",
747                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
748                registry,
749            ).unwrap(),
750            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
751                "consensus_calculated_throughput_profile",
752                "The current active calculated throughput profile",
753                registry
754            ).unwrap(),
755            execution_queueing_latency: LatencyObserver::new(),
756            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
758        }
759    }
760
761    /// Reset metrics that contain `hostname` as one of the labels. This is
762    /// needed to avoid retaining metrics for long-gone committee members and
763    /// only exposing metrics for the committee in the current epoch.
764    pub fn reset_on_reconfigure(&self) {
765        self.consensus_committed_messages.reset();
766        self.consensus_handler_scores.reset();
767        self.consensus_committed_user_transactions.reset();
768    }
769}
770
771/// a Trait object for `Signer` that is:
772/// - Pin, i.e. confined to one place in memory (we don't want to copy private
773///   keys).
774/// - Sync, i.e. can be safely shared between threads.
775///
776/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
777pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
778
779pub struct AuthorityState {
780    // Fixed size, static, identity of the authority
781    /// The name of this authority.
782    pub name: AuthorityName,
783    /// The signature key of the authority.
784    pub secret: StableSyncAuthoritySigner,
785
786    /// The database
787    input_loader: TransactionInputLoader,
788    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
789
790    epoch_store: ArcSwap<AuthorityPerEpochStore>,
791
792    /// This lock denotes current 'execution epoch'.
793    /// Execution acquires read lock, checks certificate epoch and holds it
794    /// until all writes are complete. Reconfiguration acquires write lock,
795    /// changes the epoch and revert all transactions from previous epoch
796    /// that are executed but did not make into checkpoint.
797    execution_lock: RwLock<EpochId>,
798
799    pub indexes: Option<Arc<IndexStore>>,
800    pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
801
802    pub subscription_handler: Arc<SubscriptionHandler>,
803    pub checkpoint_store: Arc<CheckpointStore>,
804
805    committee_store: Arc<CommitteeStore>,
806
807    /// Manages pending certificates and their missing input objects.
808    transaction_manager: Arc<TransactionManager>,
809
810    /// Shuts down the execution task. Used only in testing.
811    #[cfg_attr(not(test), expect(unused))]
812    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
813
814    pub metrics: Arc<AuthorityMetrics>,
815    _pruner: AuthorityStorePruner,
816    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
817
818    /// Take db checkpoints of different dbs
819    db_checkpoint_config: DBCheckpointConfig,
820
821    pub config: NodeConfig,
822
823    /// Current overload status in this authority. Updated periodically.
824    pub overload_info: AuthorityOverloadInfo,
825
826    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
827
828    /// The chain identifier is derived from the digest of the genesis
829    /// checkpoint.
830    chain_identifier: ChainIdentifier,
831
832    pub(crate) congestion_tracker: Arc<CongestionTracker>,
833}
834
835/// The authority state encapsulates all state, drives execution, and ensures
836/// safety.
837///
838/// Note the authority operations can be accessed through a read ref (&) and do
839/// not require &mut. Internally a database is synchronized through a mutex
840/// lock.
841///
842/// Repeating valid commands should produce no changes and return no error.
843impl AuthorityState {
844    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
845        epoch_store.committee().authority_exists(&self.name)
846    }
847
848    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
849        epoch_store
850            .active_validators()
851            .iter()
852            .any(|a| AuthorityName::from(a) == self.name)
853    }
854
855    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
856        !self.is_committee_validator(epoch_store)
857    }
858
859    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
860        &self.committee_store
861    }
862
863    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
864        self.committee_store.clone()
865    }
866
867    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
868        &self.config.authority_overload_config
869    }
870
871    pub fn get_epoch_state_commitments(
872        &self,
873        epoch: EpochId,
874    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
875        self.checkpoint_store.get_epoch_state_commitments(epoch)
876    }
877
878    /// This is a private method and should be kept that way. It doesn't check
879    /// whether the provided transaction is a system transaction, and hence
880    /// can only be called internally.
881    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
882    async fn handle_transaction_impl(
883        &self,
884        transaction: VerifiedTransaction,
885        epoch_store: &Arc<AuthorityPerEpochStore>,
886    ) -> IotaResult<VerifiedSignedTransaction> {
887        // Ensure that validator cannot reconfigure while we are signing the tx
888        let _execution_lock = self.execution_lock_for_signing()?;
889
890        let protocol_config = epoch_store.protocol_config();
891        let reference_gas_price = epoch_store.reference_gas_price();
892
893        let epoch = epoch_store.epoch();
894
895        let tx_data = transaction.data().transaction_data();
896
897        // Note: the deny checks may do redundant package loads but:
898        // - they only load packages when there is an active package deny map
899        // - the loads are cached anyway
900        iota_transaction_checks::deny::check_transaction_for_signing(
901            tx_data,
902            transaction.tx_signatures(),
903            &transaction.input_objects()?,
904            &tx_data.receiving_objects(),
905            &self.config.transaction_deny_config,
906            self.get_backing_package_store().as_ref(),
907        )?;
908
909        // Load all transaction-related input objects.
910        // Authenticator input objects and the account objects are loaded in the same
911        // call if there are `MoveAuthenticator` signatures present in the transaction.
912        let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
913            self.read_objects_for_signing(&transaction, epoch)?;
914
915        // Get the `MoveAuthenticator`s, if any.
916        let move_authenticators = transaction.move_authenticators();
917
918        // Check the inputs for signing.
919        // If there are `MoveAuthenticator` signatures, their input objects and the
920        // account objects are also checked and must be provided.
921        // It is also checked if there is enough gas to execute the transaction and its
922        // authenticators.
923        let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
924            .check_transaction_inputs_for_signing(
925                protocol_config,
926                reference_gas_price,
927                tx_data,
928                tx_input_objects,
929                &tx_receiving_objects,
930                &move_authenticators,
931                per_authenticator_inputs,
932            )?;
933
934        // Get the input objects for the authenticators, if there are
935        // `MoveAuthenticator`s.
936        let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
937            .iter()
938            .map(|i| &i.0)
939            .collect();
940
941        // Check if any of the sender, the transaction input objects, the receiving
942        // objects and the authenticator input objects are in the coin deny
943        // list, which would prevent the transaction from being signed.
944        check_coin_deny_list_v1_during_signing(
945            tx_data.sender(),
946            &tx_checked_input_objects,
947            &tx_receiving_objects,
948            &per_authenticator_checked_input_objects,
949            &self.get_object_store(),
950        )?;
951
952        // If there are `MoveAuthenticator` signatures, execute them and check if they
953        // all succeed.
954        if !move_authenticators.is_empty() {
955            let aggregated_authenticator_input_objects =
956                iota_transaction_checks::aggregate_authenticator_input_objects(
957                    &per_authenticator_checked_input_objects,
958                )?;
959
960            debug_assert_eq!(
961                move_authenticators.len(),
962                per_authenticator_checked_inputs.len(),
963                "Move authenticators amount must match the number of checked authenticator inputs"
964            );
965
966            let move_authenticators = move_authenticators
967                .into_iter()
968                .zip(per_authenticator_checked_inputs)
969                .map(
970                    |(
971                        move_authenticator,
972                        (authenticator_checked_input_objects, authenticator_function_ref),
973                    )| {
974                        (
975                            move_authenticator.to_owned(),
976                            authenticator_function_ref,
977                            authenticator_checked_input_objects,
978                        )
979                    },
980                )
981                .collect();
982
983            // It is supposed that `MoveAuthenticator` availability is checked in
984            // `SenderSignedData::validity_check`.
985
986            // Serialize the TransactionData for the auth context before decomposing.
987            let tx_data_bytes =
988                bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
989
990            let (kind, signer, gas_data) = tx_data.execution_parts();
991
992            // Execute the Move authenticators.
993            let validation_result = epoch_store.executor().authenticate_transaction(
994                self.get_backing_store().as_ref(),
995                protocol_config,
996                self.metrics.limits_metrics.clone(),
997                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
998                epoch_store
999                    .epoch_start_config()
1000                    .epoch_data()
1001                    .epoch_start_timestamp(),
1002                gas_data,
1003                gas_status,
1004                move_authenticators,
1005                aggregated_authenticator_input_objects,
1006                kind,
1007                signer,
1008                transaction.digest().to_owned(),
1009                tx_data_bytes,
1010                &mut None,
1011            );
1012
1013            if let Err(validation_error) = validation_result {
1014                return Err(IotaError::MoveAuthenticatorExecutionFailure {
1015                    error: validation_error.to_string(),
1016                });
1017            }
1018        }
1019
1020        let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
1021
1022        let signed_transaction =
1023            VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1024
1025        // Check and write locks, to signed transaction, into the database
1026        // The call to self.set_transaction_lock checks the lock is not conflicting,
1027        // and returns ConflictingTransaction error in case there is a lock on a
1028        // different existing transaction.
1029        self.get_cache_writer().try_acquire_transaction_locks(
1030            epoch_store,
1031            &owned_objects,
1032            signed_transaction.clone(),
1033        )?;
1034
1035        Ok(signed_transaction)
1036    }
1037
1038    /// Initiate a new transaction.
1039    #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()))]
1040    pub async fn handle_transaction(
1041        &self,
1042        epoch_store: &Arc<AuthorityPerEpochStore>,
1043        transaction: VerifiedTransaction,
1044    ) -> IotaResult<HandleTransactionResponse> {
1045        let tx_digest = *transaction.digest();
1046        debug!("handle_transaction");
1047
1048        // Ensure an idempotent answer.
1049        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1050            return Ok(HandleTransactionResponse { status });
1051        }
1052
1053        let _metrics_guard = self
1054            .metrics
1055            .authority_state_handle_transaction_latency
1056            .start_timer();
1057        self.metrics.tx_orders.inc();
1058
1059        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1060        match signed {
1061            Ok(s) => {
1062                if self.is_committee_validator(epoch_store) {
1063                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1064                        let tx = s.clone();
1065                        let validator_tx_finalizer = validator_tx_finalizer.clone();
1066                        let cache_reader = self.get_transaction_cache_reader().clone();
1067                        let epoch_store = epoch_store.clone();
1068                        spawn_monitored_task!(epoch_store.within_alive_epoch(
1069                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1070                        ));
1071                    }
1072                }
1073                Ok(HandleTransactionResponse {
1074                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1075                })
1076            }
1077            // It happens frequently that while we are checking the validity of the transaction, it
1078            // has just been executed.
1079            // In that case, we could still return Ok to avoid showing confusing errors.
1080            Err(err) => Ok(HandleTransactionResponse {
1081                status: self
1082                    .get_transaction_status(&tx_digest, epoch_store)?
1083                    .ok_or(err)?
1084                    .1,
1085            }),
1086        }
1087    }
1088
1089    pub fn check_system_overload_at_signing(&self) -> bool {
1090        self.config
1091            .authority_overload_config
1092            .check_system_overload_at_signing
1093    }
1094
1095    pub fn check_system_overload_at_execution(&self) -> bool {
1096        self.config
1097            .authority_overload_config
1098            .check_system_overload_at_execution
1099    }
1100
1101    pub(crate) fn check_system_overload(
1102        &self,
1103        consensus_adapter: &Arc<ConsensusAdapter>,
1104        tx_data: &SenderSignedData,
1105        do_authority_overload_check: bool,
1106    ) -> IotaResult {
1107        if do_authority_overload_check {
1108            self.check_authority_overload(tx_data).tap_err(|_| {
1109                self.update_overload_metrics("execution_queue");
1110            })?;
1111        }
1112        self.transaction_manager
1113            .check_execution_overload(self.overload_config(), tx_data)
1114            .tap_err(|_| {
1115                self.update_overload_metrics("execution_pending");
1116            })?;
1117        consensus_adapter.check_consensus_overload().tap_err(|_| {
1118            self.update_overload_metrics("consensus");
1119        })?;
1120
1121        let pending_tx_count = self
1122            .get_cache_commit()
1123            .approximate_pending_transaction_count();
1124        if pending_tx_count
1125            > self
1126                .config
1127                .execution_cache_config
1128                .writeback_cache
1129                .backpressure_threshold_for_rpc()
1130        {
1131            return Err(IotaError::ValidatorOverloadedRetryAfter {
1132                retry_after_secs: 10,
1133            });
1134        }
1135
1136        Ok(())
1137    }
1138
1139    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1140        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1141            return Ok(());
1142        }
1143
1144        let load_shedding_percentage = self
1145            .overload_info
1146            .load_shedding_percentage
1147            .load(Ordering::Relaxed);
1148        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1149    }
1150
1151    fn update_overload_metrics(&self, source: &str) {
1152        self.metrics
1153            .transaction_overload_sources
1154            .with_label_values(&[source])
1155            .inc();
1156    }
1157
1158    /// Executes a certificate for its effects.
1159    #[instrument(level = "trace", skip_all)]
1160    pub async fn execute_certificate(
1161        &self,
1162        certificate: &VerifiedCertificate,
1163        epoch_store: &Arc<AuthorityPerEpochStore>,
1164    ) -> IotaResult<TransactionEffects> {
1165        let _metrics_guard = if certificate.contains_shared_object() {
1166            self.metrics
1167                .execute_certificate_latency_shared_object
1168                .start_timer()
1169        } else {
1170            self.metrics
1171                .execute_certificate_latency_single_writer
1172                .start_timer()
1173        };
1174        trace!("execute_certificate");
1175
1176        self.metrics.total_cert_attempts.inc();
1177
1178        if !certificate.contains_shared_object() {
1179            // Shared object transactions need to be sequenced by the consensus before
1180            // enqueueing for execution, done in
1181            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1182            // object transactions, they can be enqueued for execution immediately.
1183            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1184        }
1185
1186        // tx could be reverted when epoch ends, so we must be careful not to return a
1187        // result here after the epoch ends.
1188        epoch_store
1189            .within_alive_epoch(self.notify_read_effects(certificate))
1190            .await
1191            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1192            .and_then(|r| r)
1193    }
1194
1195    /// Internal logic to execute a certificate.
1196    ///
1197    /// Guarantees that
1198    /// - If input objects are available, return no permanent failure.
1199    /// - Execution and output commit are atomic. i.e. outputs are only written
1200    ///   to storage,
1201    /// on successful execution; crashed execution has no observable effect and
1202    /// can be retried.
1203    ///
1204    /// It is caller's responsibility to ensure input objects are available and
1205    /// locks are set. If this cannot be satisfied by the caller,
1206    /// execute_certificate() should be called instead.
1207    ///
1208    /// Should only be called within iota-core.
1209    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1210    pub fn try_execute_immediately(
1211        &self,
1212        certificate: &VerifiedExecutableTransaction,
1213        expected_effects_digest: Option<TransactionEffectsDigest>,
1214        epoch_store: &Arc<AuthorityPerEpochStore>,
1215    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1216        let _scope = monitored_scope("Execution::try_execute_immediately");
1217        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1218
1219        let tx_digest = certificate.digest();
1220
1221        // Acquire a lock to prevent concurrent executions of the same transaction.
1222        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1223
1224        // The cert could have been processed by a concurrent attempt of the same cert,
1225        // so check if the effects have already been written.
1226        if let Some(effects) = self
1227            .get_transaction_cache_reader()
1228            .try_get_executed_effects(tx_digest)?
1229        {
1230            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1231                assert_eq!(
1232                    effects.digest(),
1233                    expected_effects_digest_inner,
1234                    "Unexpected effects digest for transaction {tx_digest:?}"
1235                );
1236            }
1237            tx_guard.release();
1238            return Ok((effects, None));
1239        }
1240
1241        let (tx_input_objects, per_authenticator_inputs) =
1242            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1243
1244        // If no expected_effects_digest was provided, try to get it from storage.
1245        // We could be re-executing a previously executed but uncommitted transaction,
1246        // perhaps after restarting with a new binary. In this situation, if
1247        // we have published an effects signature, we must be sure not to
1248        // equivocate. TODO: read from cache instead of DB
1249        let expected_effects_digest =
1250            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1251
1252        self.process_certificate(
1253            tx_guard,
1254            certificate,
1255            tx_input_objects,
1256            per_authenticator_inputs,
1257            expected_effects_digest,
1258            epoch_store,
1259        )
1260        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1261        .tap_ok(
1262            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1263        )
1264    }
1265
1266    pub fn read_objects_for_execution(
1267        &self,
1268        tx_lock: &CertLockGuard,
1269        certificate: &VerifiedExecutableTransaction,
1270        epoch_store: &Arc<AuthorityPerEpochStore>,
1271    ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1272        let _scope = monitored_scope("Execution::load_input_objects");
1273        let _metrics_guard = self
1274            .metrics
1275            .execution_load_input_objects_latency
1276            .start_timer();
1277
1278        let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1279
1280        let input_objects = self.input_loader.read_objects_for_execution(
1281            epoch_store,
1282            &certificate.key(),
1283            tx_lock,
1284            &input_objects,
1285            epoch_store.epoch(),
1286        )?;
1287
1288        certificate.split_input_objects_into_groups_for_reading(input_objects)
1289    }
1290
1291    /// Test only wrapper for `try_execute_immediately()` above, useful for
1292    /// checking errors if the pre-conditions are not satisfied, and
1293    /// executing change epoch transactions.
1294    pub fn try_execute_for_test(
1295        &self,
1296        certificate: &VerifiedCertificate,
1297    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1298        let epoch_store = self.epoch_store_for_testing();
1299        let (effects, execution_error_opt) = self.try_execute_immediately(
1300            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1301            None,
1302            &epoch_store,
1303        )?;
1304        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1305        Ok((signed_effects, execution_error_opt))
1306    }
1307
1308    /// Non-fallible version of `try_execute_for_test()`.
1309    pub fn execute_for_test(
1310        &self,
1311        certificate: &VerifiedCertificate,
1312    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1313        self.try_execute_for_test(certificate)
1314            .expect("try_execute_for_test should not fail")
1315    }
1316
1317    pub async fn notify_read_effects(
1318        &self,
1319        certificate: &VerifiedCertificate,
1320    ) -> IotaResult<TransactionEffects> {
1321        self.get_transaction_cache_reader()
1322            .try_notify_read_executed_effects(&[*certificate.digest()])
1323            .await
1324            .map(|mut r| r.pop().expect("must return correct number of effects"))
1325    }
1326
1327    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1328        self.get_object_cache_reader()
1329            .try_check_owned_objects_are_live(owned_object_refs)
1330    }
1331
1332    /// This function captures the required state to debug a forked transaction.
1333    /// The dump is written to a file in dir `path`, with name prefixed by the
1334    /// transaction digest. NOTE: Since this info escapes the validator
1335    /// context, make sure not to leak any private info here
1336    pub(crate) fn debug_dump_transaction_state(
1337        &self,
1338        tx_digest: &TransactionDigest,
1339        effects: &TransactionEffects,
1340        expected_effects_digest: TransactionEffectsDigest,
1341        inner_temporary_store: &InnerTemporaryStore,
1342        certificate: &VerifiedExecutableTransaction,
1343        debug_dump_config: &StateDebugDumpConfig,
1344    ) -> IotaResult<PathBuf> {
1345        let dump_dir = debug_dump_config
1346            .dump_file_directory
1347            .as_ref()
1348            .cloned()
1349            .unwrap_or(std::env::temp_dir());
1350        let epoch_store = self.load_epoch_store_one_call_per_task();
1351
1352        NodeStateDump::new(
1353            tx_digest,
1354            effects,
1355            expected_effects_digest,
1356            self.get_object_store().as_ref(),
1357            &epoch_store,
1358            inner_temporary_store,
1359            certificate,
1360        )?
1361        .write_to_file(&dump_dir)
1362        .map_err(|e| IotaError::FileIO(e.to_string()))
1363    }
1364
1365    #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?certificate.digest(), sender = ?certificate.data().transaction_data().gas_owner().to_string()))]
1366    pub(crate) fn process_certificate(
1367        &self,
1368        tx_guard: CertTxGuard,
1369        certificate: &VerifiedExecutableTransaction,
1370        tx_input_objects: InputObjects,
1371        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1372        expected_effects_digest: Option<TransactionEffectsDigest>,
1373        epoch_store: &Arc<AuthorityPerEpochStore>,
1374    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1375        let process_certificate_start_time = tokio::time::Instant::now();
1376        let digest = *certificate.digest();
1377
1378        let _scope = monitored_scope("Execution::process_certificate");
1379
1380        fail_point_if!("correlated-crash-process-certificate", || {
1381            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1382                iota_simulator::task::kill_current_node(None);
1383            }
1384        });
1385
1386        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1387        // Any caller that verifies the signatures on the certificate will have already
1388        // checked the epoch. But paths that don't verify sigs (e.g. execution
1389        // from checkpoint, reading from db) present the possibility of an epoch
1390        // mismatch. If this cert is not finalzied in previous epoch, then it's
1391        // invalid.
1392        let execution_guard = match execution_guard {
1393            Ok(execution_guard) => execution_guard,
1394            Err(err) => {
1395                tx_guard.release();
1396                return Err(err);
1397            }
1398        };
1399        // Since we obtain a reference to the epoch store before taking the execution
1400        // lock, it's possible that reconfiguration has happened and they no
1401        // longer match.
1402        if *execution_guard != epoch_store.epoch() {
1403            tx_guard.release();
1404            info!("The epoch of the execution_guard doesn't match the epoch store");
1405            return Err(IotaError::WrongEpoch {
1406                expected_epoch: epoch_store.epoch(),
1407                actual_epoch: *execution_guard,
1408            });
1409        }
1410
1411        // Errors originating from prepare_certificate may be transient (failure to read
1412        // locks) or non-transient (transaction input is invalid, move vm
1413        // errors). However, all errors from this function occur before we have
1414        // written anything to the db, so we commit the tx guard and rely on the
1415        // client to retry the tx (if it was transient).
1416        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1417            &execution_guard,
1418            certificate,
1419            tx_input_objects,
1420            per_authenticator_inputs,
1421            epoch_store,
1422        ) {
1423            Err(e) => {
1424                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1425                tx_guard.release();
1426                return Err(e);
1427            }
1428            Ok(res) => res,
1429        };
1430
1431        if let Some(expected_effects_digest) = expected_effects_digest {
1432            if effects.digest() != expected_effects_digest {
1433                // We dont want to mask the original error, so we log it and continue.
1434                match self.debug_dump_transaction_state(
1435                    &digest,
1436                    &effects,
1437                    expected_effects_digest,
1438                    &inner_temporary_store,
1439                    certificate,
1440                    &self.config.state_debug_dump_config,
1441                ) {
1442                    Ok(out_path) => {
1443                        info!(
1444                            "Dumped node state for transaction {} to {}",
1445                            digest,
1446                            out_path.as_path().display().to_string()
1447                        );
1448                    }
1449                    Err(e) => {
1450                        error!("Error dumping state for transaction {}: {e}", digest);
1451                    }
1452                }
1453                error!(
1454                    tx_digest = ?digest,
1455                    ?expected_effects_digest,
1456                    actual_effects = ?effects,
1457                    "fork detected!"
1458                );
1459                panic!(
1460                    "Transaction {} is expected to have effects digest {}, but got {}!",
1461                    digest,
1462                    expected_effects_digest,
1463                    effects.digest(),
1464                );
1465            }
1466        }
1467
1468        fail_point!("crash");
1469
1470        self.commit_certificate(
1471            certificate,
1472            inner_temporary_store,
1473            &effects,
1474            tx_guard,
1475            execution_guard,
1476            epoch_store,
1477        )?;
1478
1479        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1480            certificate.data().transaction_data().kind()
1481        {
1482            if let Some(err) = &execution_error_opt {
1483                debug_fatal!("Authenticator state update failed: {:?}", err);
1484            }
1485            epoch_store.update_authenticator_state(auth_state);
1486
1487            // double check that the signature verifier always matches the authenticator
1488            // state
1489            if cfg!(debug_assertions) {
1490                let authenticator_state = get_authenticator_state(self.get_object_store())
1491                    .expect("Read cannot fail")
1492                    .expect("Authenticator state must exist");
1493
1494                let mut sys_jwks: Vec<_> = authenticator_state
1495                    .active_jwks
1496                    .into_iter()
1497                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1498                    .collect();
1499                let mut active_jwks: Vec<_> = epoch_store
1500                    .signature_verifier
1501                    .get_jwks()
1502                    .into_iter()
1503                    .collect();
1504                sys_jwks.sort();
1505                active_jwks.sort();
1506
1507                assert_eq!(sys_jwks, active_jwks);
1508            }
1509        }
1510
1511        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1512        if elapsed > 0.0 {
1513            self.metrics
1514                .execution_gas_latency_ratio
1515                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1516        };
1517        Ok((effects, execution_error_opt))
1518    }
1519
1520    #[instrument(level = "trace", skip_all)]
1521    fn commit_certificate(
1522        &self,
1523        certificate: &VerifiedExecutableTransaction,
1524        inner_temporary_store: InnerTemporaryStore,
1525        effects: &TransactionEffects,
1526        tx_guard: CertTxGuard,
1527        _execution_guard: ExecutionLockReadGuard<'_>,
1528        epoch_store: &Arc<AuthorityPerEpochStore>,
1529    ) -> IotaResult {
1530        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1531            monitored_scope("Execution::commit_certificate");
1532        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1533
1534        let tx_key = certificate.key();
1535        let tx_digest = certificate.digest();
1536        let input_object_count = inner_temporary_store.input_objects.len();
1537        let shared_object_count = effects.input_shared_objects().len();
1538
1539        let output_keys = inner_temporary_store.get_output_keys(effects);
1540
1541        // index certificate
1542        let _ = self
1543            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1544            .tap_err(|e| {
1545                self.metrics.post_processing_total_failures.inc();
1546                error!(?tx_digest, "tx post processing failed: {e}");
1547            });
1548
1549        // The insertion to epoch_store is not atomic with the insertion to the
1550        // perpetual store. This is OK because we insert to the epoch store
1551        // first. And during lookups we always look up in the perpetual store first.
1552        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1553
1554        // Allow testing what happens if we crash here.
1555        fail_point!("crash");
1556
1557        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1558            certificate.clone().into_unsigned(),
1559            effects.clone(),
1560            inner_temporary_store,
1561        );
1562        self.get_cache_writer()
1563            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1564
1565        if certificate.transaction_data().is_end_of_epoch_tx() {
1566            // At the end of epoch, since system packages may have been upgraded, force
1567            // reload them in the cache.
1568            self.get_object_cache_reader()
1569                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1570        }
1571
1572        // commit_certificate finished, the tx is fully committed to the store.
1573        tx_guard.commit_tx();
1574
1575        // Notifies transaction manager about transaction and output objects committed.
1576        // This provides necessary information to transaction manager to start executing
1577        // additional ready transactions.
1578        self.transaction_manager
1579            .notify_commit(tx_digest, output_keys, epoch_store);
1580
1581        self.update_metrics(certificate, input_object_count, shared_object_count);
1582
1583        Ok(())
1584    }
1585
1586    fn update_metrics(
1587        &self,
1588        certificate: &VerifiedExecutableTransaction,
1589        input_object_count: usize,
1590        shared_object_count: usize,
1591    ) {
1592        // count signature by scheme, for zklogin and multisig
1593        if certificate.has_zklogin_sig() {
1594            self.metrics.zklogin_sig_count.inc();
1595        } else if certificate.has_upgraded_multisig() {
1596            self.metrics.multisig_sig_count.inc();
1597        }
1598
1599        self.metrics.total_effects.inc();
1600        self.metrics.total_certs.inc();
1601
1602        if shared_object_count > 0 {
1603            self.metrics.shared_obj_tx.inc();
1604        }
1605
1606        if certificate.is_sponsored_tx() {
1607            self.metrics.sponsored_tx.inc();
1608        }
1609
1610        self.metrics
1611            .num_input_objs
1612            .observe(input_object_count as f64);
1613        self.metrics
1614            .num_shared_objects
1615            .observe(shared_object_count as f64);
1616        self.metrics.batch_size.observe(
1617            certificate
1618                .data()
1619                .intent_message()
1620                .value
1621                .kind()
1622                .num_commands() as f64,
1623        );
1624    }
1625
1626    /// prepare_certificate validates the transaction input, and executes the
1627    /// certificate, returning effects, output objects, events, etc.
1628    ///
1629    /// It reads state from the db (both owned and shared locks), but it has no
1630    /// side effects.
1631    ///
1632    /// It can be generally understood that a failure of prepare_certificate
1633    /// indicates a non-transient error, e.g. the transaction input is
1634    /// somehow invalid, the correct locks are not held, etc. However, this
1635    /// is not entirely true, as a transient db read error may also cause
1636    /// this function to fail.
1637    #[instrument(level = "trace", skip_all)]
1638    fn prepare_certificate(
1639        &self,
1640        _execution_guard: &ExecutionLockReadGuard<'_>,
1641        certificate: &VerifiedExecutableTransaction,
1642        tx_input_objects: InputObjects,
1643        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1644        epoch_store: &Arc<AuthorityPerEpochStore>,
1645    ) -> IotaResult<(
1646        InnerTemporaryStore,
1647        TransactionEffects,
1648        Option<ExecutionError>,
1649    )> {
1650        let _scope = monitored_scope("Execution::prepare_certificate");
1651        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1652        let prepare_certificate_start_time = tokio::time::Instant::now();
1653
1654        let protocol_config = epoch_store.protocol_config();
1655
1656        let reference_gas_price = epoch_store.reference_gas_price();
1657
1658        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1659        let epoch_start_timestamp = epoch_store
1660            .epoch_start_config()
1661            .epoch_data()
1662            .epoch_start_timestamp();
1663
1664        let backing_store = self.get_backing_store().as_ref();
1665
1666        let tx_digest = *certificate.digest();
1667
1668        // TODO: We need to move this to a more appropriate place to avoid redundant
1669        // checks.
1670        let tx_data = certificate.data().transaction_data();
1671        tx_data.validity_check(protocol_config)?;
1672
1673        let (kind, signer, gas_data) = tx_data.execution_parts();
1674
1675        let move_authenticators = certificate.move_authenticators();
1676
1677        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1678        let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1679            .is_empty()
1680        {
1681            // No Move authentication required, proceed to execute the transaction directly.
1682
1683            // The cost of partially re-auditing a transaction before execution is
1684            // tolerated.
1685            let (tx_gas_status, tx_checked_input_objects) =
1686                iota_transaction_checks::check_certificate_input(
1687                    certificate,
1688                    tx_input_objects,
1689                    protocol_config,
1690                    reference_gas_price,
1691                )?;
1692
1693            let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1694            self.check_owned_locks(&owned_object_refs)?;
1695            epoch_store.executor().execute_transaction_to_effects(
1696                backing_store,
1697                protocol_config,
1698                self.metrics.limits_metrics.clone(),
1699                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1700                // cyclic dependency w/ iota-adapter
1701                self.config
1702                    .expensive_safety_check_config
1703                    .enable_deep_per_tx_iota_conservation_check(),
1704                self.config.certificate_deny_config.certificate_deny_set(),
1705                &epoch_id,
1706                epoch_start_timestamp,
1707                tx_checked_input_objects,
1708                gas_data,
1709                tx_gas_status,
1710                kind,
1711                signer,
1712                tx_digest,
1713                &mut None,
1714            )
1715        } else {
1716            // One or more `MoveAuthenticator` signatures present — authenticate each and
1717            // then execute the transaction.
1718            // It is supposed that `MoveAuthenticator` availability is checked in
1719            // `SenderSignedData::validity_check`.
1720
1721            debug_assert_eq!(
1722                move_authenticators.len(),
1723                per_authenticator_inputs.len(),
1724                "Move authenticators amount must match the number of authenticator inputs"
1725            );
1726
1727            let per_authenticator_inputs = move_authenticators
1728                .iter()
1729                .zip(per_authenticator_inputs)
1730                .map(
1731                    |(move_authenticator, (authenticator_input_objects, account_object))| {
1732                        // Check basic `object_to_authenticate` preconditions and get its
1733                        // components.
1734                        let (
1735                            auth_account_object_id,
1736                            auth_account_object_seq_number,
1737                            auth_account_object_digest,
1738                        ) = move_authenticator.object_to_authenticate_components()?;
1739
1740                        let signer = move_authenticator.address()?;
1741
1742                        let authenticator_function_ref_for_execution = self.check_move_account(
1743                            auth_account_object_id,
1744                            auth_account_object_seq_number,
1745                            auth_account_object_digest,
1746                            account_object,
1747                            &signer,
1748                        )?;
1749
1750                        Ok((
1751                            authenticator_input_objects,
1752                            authenticator_function_ref_for_execution,
1753                        ))
1754                    },
1755                )
1756                .collect::<IotaResult<Vec<_>>>()?;
1757
1758            let per_authenticator_input_objects = per_authenticator_inputs
1759                .iter()
1760                .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1761                .collect::<Vec<_>>();
1762
1763            // Serialize the TransactionData for the auth context.
1764            let tx_data_bytes =
1765                bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1766
1767            // Check the `MoveAuthenticator` input objects.
1768            // The `MoveAuthenticator` receiving objects are checked on the signing step.
1769            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
1770            // not a part of the transaction data.
1771            let authenticator_gas_budget = protocol_config.max_auth_gas();
1772            let (
1773                gas_status,
1774                per_authenticator_checked_input_objects,
1775                authenticator_and_tx_checked_input_objects,
1776            ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1777                certificate,
1778                tx_input_objects,
1779                per_authenticator_input_objects,
1780                authenticator_gas_budget,
1781                protocol_config,
1782                reference_gas_price,
1783            )?;
1784
1785            debug_assert_eq!(
1786                move_authenticators.len(),
1787                per_authenticator_checked_input_objects.len(),
1788                "Move authenticators amount must match the number of checked authenticator inputs"
1789            );
1790
1791            let move_authenticators = move_authenticators
1792                .into_iter()
1793                .zip(per_authenticator_inputs)
1794                .zip(per_authenticator_checked_input_objects)
1795                .map(
1796                    |(
1797                        (move_authenticator, (_, authenticator_function_ref_for_execution)),
1798                        authenticator_checked_input_objects,
1799                    )| {
1800                        (
1801                            move_authenticator.to_owned(),
1802                            authenticator_function_ref_for_execution,
1803                            authenticator_checked_input_objects,
1804                        )
1805                    },
1806                )
1807                .collect::<Vec<_>>();
1808
1809            let owned_object_refs = authenticator_and_tx_checked_input_objects
1810                .inner()
1811                .filter_owned_objects();
1812            self.check_owned_locks(&owned_object_refs)?;
1813
1814            epoch_store
1815                .executor()
1816                .authenticate_then_execute_transaction_to_effects(
1817                    backing_store,
1818                    protocol_config,
1819                    self.metrics.limits_metrics.clone(),
1820                    self.config
1821                        .expensive_safety_check_config
1822                        .enable_deep_per_tx_iota_conservation_check(),
1823                    self.config.certificate_deny_config.certificate_deny_set(),
1824                    &epoch_id,
1825                    epoch_start_timestamp,
1826                    gas_data,
1827                    gas_status,
1828                    move_authenticators,
1829                    authenticator_and_tx_checked_input_objects,
1830                    kind,
1831                    signer,
1832                    tx_digest,
1833                    tx_data_bytes,
1834                    &mut None,
1835                )
1836        };
1837
1838        fail_point_if!("cp_execution_nondeterminism", || {
1839            #[cfg(msim)]
1840            self.create_fail_state(certificate, epoch_store, &mut effects);
1841        });
1842
1843        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1844        if elapsed > 0.0 {
1845            self.metrics
1846                .prepare_cert_gas_latency_ratio
1847                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1848        }
1849
1850        Ok((inner_temp_store, effects, execution_error_opt.err()))
1851    }
1852
1853    pub fn prepare_certificate_for_benchmark(
1854        &self,
1855        certificate: &VerifiedExecutableTransaction,
1856        input_objects: InputObjects,
1857        epoch_store: &Arc<AuthorityPerEpochStore>,
1858    ) -> IotaResult<(
1859        InnerTemporaryStore,
1860        TransactionEffects,
1861        Option<ExecutionError>,
1862    )> {
1863        let lock = RwLock::new(epoch_store.epoch());
1864        let execution_guard = lock.try_read().unwrap();
1865
1866        self.prepare_certificate(
1867            &execution_guard,
1868            certificate,
1869            input_objects,
1870            vec![],
1871            epoch_store,
1872        )
1873    }
1874
1875    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1876    /// `VmChecks::Enabled` instead.
1877    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1878    #[allow(clippy::type_complexity)]
1879    pub fn dry_exec_transaction(
1880        &self,
1881        transaction: TransactionData,
1882        transaction_digest: TransactionDigest,
1883    ) -> IotaResult<(
1884        DryRunTransactionBlockResponse,
1885        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1886        TransactionEffects,
1887        Option<ObjectID>,
1888    )> {
1889        let epoch_store = self.load_epoch_store_one_call_per_task();
1890        if !self.is_fullnode(&epoch_store) {
1891            return Err(IotaError::UnsupportedFeature {
1892                error: "dry-exec is only supported on fullnodes".to_string(),
1893            });
1894        }
1895
1896        if transaction.kind().is_system_tx() {
1897            return Err(IotaError::UnsupportedFeature {
1898                error: "dry-exec does not support system transactions".to_string(),
1899            });
1900        }
1901
1902        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1903    }
1904
1905    #[allow(clippy::type_complexity)]
1906    pub fn dry_exec_transaction_for_benchmark(
1907        &self,
1908        transaction: TransactionData,
1909        transaction_digest: TransactionDigest,
1910    ) -> IotaResult<(
1911        DryRunTransactionBlockResponse,
1912        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1913        TransactionEffects,
1914        Option<ObjectID>,
1915    )> {
1916        let epoch_store = self.load_epoch_store_one_call_per_task();
1917        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1918    }
1919
1920    #[instrument(level = "trace", skip_all)]
1921    #[allow(clippy::type_complexity)]
1922    fn dry_exec_transaction_impl(
1923        &self,
1924        epoch_store: &AuthorityPerEpochStore,
1925        transaction: TransactionData,
1926        transaction_digest: TransactionDigest,
1927    ) -> IotaResult<(
1928        DryRunTransactionBlockResponse,
1929        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1930        TransactionEffects,
1931        Option<ObjectID>,
1932    )> {
1933        // Cheap validity checks for a transaction, including input size limits.
1934        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1935
1936        let input_object_kinds = transaction.input_objects()?;
1937        let receiving_object_refs = transaction.receiving_objects();
1938
1939        iota_transaction_checks::deny::check_transaction_for_signing(
1940            &transaction,
1941            &[],
1942            &input_object_kinds,
1943            &receiving_object_refs,
1944            &self.config.transaction_deny_config,
1945            self.get_backing_package_store().as_ref(),
1946        )?;
1947
1948        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1949            // We don't want to cache this transaction since it's a dry run.
1950            None,
1951            &input_object_kinds,
1952            &receiving_object_refs,
1953            epoch_store.epoch(),
1954        )?;
1955
1956        // make a gas object if one was not provided
1957        let mut transaction = transaction;
1958        let reference_gas_price = epoch_store.reference_gas_price();
1959        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1960            let sender = transaction.gas_owner();
1961            let gas_object_id = ObjectID::random();
1962            let gas_object = Object::new_move(
1963                MoveObject::new_gas_coin(
1964                    OBJECT_START_VERSION,
1965                    gas_object_id,
1966                    SIMULATION_GAS_COIN_VALUE,
1967                ),
1968                Owner::AddressOwner(sender),
1969                TransactionDigest::genesis_marker(),
1970            );
1971            let gas_object_ref = gas_object.compute_object_reference();
1972            // Add gas object to transaction gas payment
1973            transaction.gas_data_mut().payment = vec![gas_object_ref];
1974            (
1975                iota_transaction_checks::check_transaction_input_with_given_gas(
1976                    epoch_store.protocol_config(),
1977                    reference_gas_price,
1978                    &transaction,
1979                    input_objects,
1980                    receiving_objects,
1981                    gas_object,
1982                    &self.metrics.bytecode_verifier_metrics,
1983                    &self.config.verifier_signing_config,
1984                )?,
1985                Some(gas_object_id),
1986            )
1987        } else {
1988            // `MoveAuthenticator`s are not supported in dry runs, so we set the
1989            // `authenticator_gas_budget` to 0.
1990            let authenticator_gas_budget = 0;
1991
1992            (
1993                iota_transaction_checks::check_transaction_input(
1994                    epoch_store.protocol_config(),
1995                    reference_gas_price,
1996                    &transaction,
1997                    input_objects,
1998                    &receiving_objects,
1999                    &self.metrics.bytecode_verifier_metrics,
2000                    &self.config.verifier_signing_config,
2001                    authenticator_gas_budget,
2002                )?,
2003                None,
2004            )
2005        };
2006
2007        let protocol_config = epoch_store.protocol_config();
2008        let (kind, signer, gas_data) = transaction.execution_parts();
2009
2010        let silent = true;
2011        let executor = iota_execution::executor(protocol_config, silent, None)
2012            .expect("Creating an executor should not fail here");
2013
2014        let expensive_checks = false;
2015        let (inner_temp_store, _, effects, execution_error) = executor
2016            .execute_transaction_to_effects(
2017                self.get_backing_store().as_ref(),
2018                protocol_config,
2019                self.metrics.limits_metrics.clone(),
2020                expensive_checks,
2021                self.config.certificate_deny_config.certificate_deny_set(),
2022                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2023                epoch_store
2024                    .epoch_start_config()
2025                    .epoch_data()
2026                    .epoch_start_timestamp(),
2027                checked_input_objects,
2028                gas_data,
2029                gas_status,
2030                kind,
2031                signer,
2032                transaction_digest,
2033                &mut None,
2034            );
2035        let tx_digest = *effects.transaction_digest();
2036
2037        let module_cache =
2038            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2039
2040        let mut layout_resolver =
2041            epoch_store
2042                .executor()
2043                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2044                    &inner_temp_store,
2045                    self.get_backing_package_store(),
2046                )));
2047        // Returning empty vector here because we recalculate changes in the rpc layer.
2048        let object_changes = Vec::new();
2049
2050        // Returning empty vector here because we recalculate changes in the rpc layer.
2051        let balance_changes = Vec::new();
2052
2053        let written_with_kind = effects
2054            .created()
2055            .into_iter()
2056            .map(|(oref, _)| (oref, WriteKind::Create))
2057            .chain(
2058                effects
2059                    .unwrapped()
2060                    .into_iter()
2061                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2062            )
2063            .chain(
2064                effects
2065                    .mutated()
2066                    .into_iter()
2067                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2068            )
2069            .map(|(oref, kind)| {
2070                let obj = inner_temp_store.written.get(&oref.0).unwrap();
2071                // TODO: Avoid clones.
2072                (oref.0, (oref, obj.clone(), kind))
2073            })
2074            .collect();
2075
2076        let execution_error_source = execution_error
2077            .as_ref()
2078            .err()
2079            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2080
2081        Ok((
2082            DryRunTransactionBlockResponse {
2083                // to avoid cloning `transaction`, fields are populated in this order
2084                suggested_gas_price: self
2085                    .congestion_tracker
2086                    .get_prediction_suggested_gas_price(&transaction),
2087                input: IotaTransactionBlockData::try_from_with_module_cache(
2088                    transaction,
2089                    &module_cache,
2090                    tx_digest,
2091                )
2092                .map_err(|e| IotaError::TransactionSerialization {
2093                    error: format!(
2094                        "Failed to convert transaction to IotaTransactionBlockData: {e}",
2095                    ),
2096                })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
2097                effects: effects.clone().try_into()?,
2098                events: IotaTransactionBlockEvents::try_from(
2099                    inner_temp_store.events.clone(),
2100                    tx_digest,
2101                    None,
2102                    layout_resolver.as_mut(),
2103                )?,
2104                object_changes,
2105                balance_changes,
2106                execution_error_source,
2107            },
2108            written_with_kind,
2109            effects,
2110            mock_gas,
2111        ))
2112    }
2113
2114    pub fn simulate_transaction(
2115        &self,
2116        mut transaction: TransactionData,
2117        checks: VmChecks,
2118    ) -> IotaResult<SimulateTransactionResult> {
2119        if transaction.kind().is_system_tx() {
2120            return Err(IotaError::UnsupportedFeature {
2121                error: "simulate does not support system transactions".to_string(),
2122            });
2123        }
2124
2125        let epoch_store = self.load_epoch_store_one_call_per_task();
2126        if !self.is_fullnode(&epoch_store) {
2127            return Err(IotaError::UnsupportedFeature {
2128                error: "simulate is only supported on fullnodes".to_string(),
2129            });
2130        }
2131
2132        // Cheap validity checks for a transaction, including input size limits.
2133        // This does not check if gas objects are missing since we may create a
2134        // mock gas object. It checks for other transaction input validity.
2135        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2136
2137        let input_object_kinds = transaction.input_objects()?;
2138        let receiving_object_refs = transaction.receiving_objects();
2139
2140        // Since we need to simulate a validator signing the transaction, the first step
2141        // is to check if some transaction elements are denied.
2142        iota_transaction_checks::deny::check_transaction_for_signing(
2143            &transaction,
2144            &[],
2145            &input_object_kinds,
2146            &receiving_object_refs,
2147            &self.config.transaction_deny_config,
2148            self.get_backing_package_store().as_ref(),
2149        )?;
2150
2151        // Load input and receiving objects
2152        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2153            // We don't want to cache this transaction since it's a simulation.
2154            None,
2155            &input_object_kinds,
2156            &receiving_object_refs,
2157            epoch_store.epoch(),
2158        )?;
2159
2160        // Create a mock gas object if one was not provided
2161        let mock_gas_id = if transaction.gas().is_empty() {
2162            let mock_gas_object = Object::new_move(
2163                MoveObject::new_gas_coin(
2164                    OBJECT_START_VERSION,
2165                    ObjectID::MAX,
2166                    SIMULATION_GAS_COIN_VALUE,
2167                ),
2168                Owner::AddressOwner(transaction.gas_data().owner),
2169                TransactionDigest::genesis_marker(),
2170            );
2171            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2172            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2173            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2174            Some(mock_gas_object.id())
2175        } else {
2176            None
2177        };
2178
2179        let protocol_config = epoch_store.protocol_config();
2180
2181        // `MoveAuthenticator`s are not supported in simulation, so we set the
2182        // `authenticator_gas_budget` to 0.
2183        let authenticator_gas_budget = 0;
2184
2185        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
2186        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
2187        let (gas_status, checked_input_objects) = if checks.enabled() {
2188            iota_transaction_checks::check_transaction_input(
2189                protocol_config,
2190                epoch_store.reference_gas_price(),
2191                &transaction,
2192                input_objects,
2193                &receiving_objects,
2194                &self.metrics.bytecode_verifier_metrics,
2195                &self.config.verifier_signing_config,
2196                authenticator_gas_budget,
2197            )?
2198        } else {
2199            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2200                protocol_config,
2201                transaction.kind(),
2202                input_objects,
2203                receiving_objects,
2204            )?;
2205            let gas_status = IotaGasStatus::new(
2206                transaction.gas_budget(),
2207                transaction.gas_price(),
2208                epoch_store.reference_gas_price(),
2209                protocol_config,
2210            )?;
2211
2212            (gas_status, checked_input_objects)
2213        };
2214
2215        // Create a new executor for the simulation
2216        let executor = iota_execution::executor(
2217            protocol_config,
2218            true, // silent
2219            None,
2220        )
2221        .expect("Creating an executor should not fail here");
2222
2223        // Execute the simulation
2224        let (kind, signer, gas_data) = transaction.execution_parts();
2225        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2226            self.get_backing_store().as_ref(),
2227            protocol_config,
2228            self.metrics.limits_metrics.clone(),
2229            false, // expensive_checks
2230            self.config.certificate_deny_config.certificate_deny_set(),
2231            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2232            epoch_store
2233                .epoch_start_config()
2234                .epoch_data()
2235                .epoch_start_timestamp(),
2236            checked_input_objects,
2237            gas_data,
2238            gas_status,
2239            kind,
2240            signer,
2241            transaction.digest(),
2242            checks.disabled(),
2243        );
2244
2245        // In the case of a dev inspect, the execution_result could be filled with some
2246        // values. Else, execution_result is empty in the case of a dry run.
2247        Ok(SimulateTransactionResult {
2248            input_objects: inner_temp_store.input_objects,
2249            output_objects: inner_temp_store.written,
2250            events: effects.events_digest().map(|_| inner_temp_store.events),
2251            effects,
2252            execution_result,
2253            suggested_gas_price: self
2254                .congestion_tracker
2255                .get_prediction_suggested_gas_price(&transaction),
2256            mock_gas_id,
2257        })
2258    }
2259
2260    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2261    /// `VmChecks::DISABLED` instead.
2262    /// The object ID for gas can be any
2263    /// object ID, even for an uncreated object
2264    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2265    pub async fn dev_inspect_transaction_block(
2266        &self,
2267        sender: IotaAddress,
2268        transaction_kind: TransactionKind,
2269        gas_price: Option<u64>,
2270        gas_budget: Option<u64>,
2271        gas_sponsor: Option<IotaAddress>,
2272        gas_objects: Option<Vec<ObjectRef>>,
2273        show_raw_txn_data_and_effects: Option<bool>,
2274        skip_checks: Option<bool>,
2275    ) -> IotaResult<DevInspectResults> {
2276        let epoch_store = self.load_epoch_store_one_call_per_task();
2277
2278        if !self.is_fullnode(&epoch_store) {
2279            return Err(IotaError::UnsupportedFeature {
2280                error: "dev-inspect is only supported on fullnodes".to_string(),
2281            });
2282        }
2283
2284        if transaction_kind.is_system_tx() {
2285            return Err(IotaError::UnsupportedFeature {
2286                error: "system transactions are not supported".to_string(),
2287            });
2288        }
2289
2290        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2291        let skip_checks = skip_checks.unwrap_or(true);
2292        let reference_gas_price = epoch_store.reference_gas_price();
2293        let protocol_config = epoch_store.protocol_config();
2294        let max_tx_gas = protocol_config.max_tx_gas();
2295
2296        let price = gas_price.unwrap_or(reference_gas_price);
2297        let budget = gas_budget.unwrap_or(max_tx_gas);
2298        let owner = gas_sponsor.unwrap_or(sender);
2299        // Payment might be empty here, but it's fine we'll have to deal with it later
2300        // after reading all the input objects.
2301        let payment = gas_objects.unwrap_or_default();
2302        let mut transaction = TransactionData::V1(TransactionDataV1 {
2303            kind: transaction_kind.clone(),
2304            sender,
2305            gas_data: GasData {
2306                payment,
2307                owner,
2308                price,
2309                budget,
2310            },
2311            expiration: TransactionExpiration::None,
2312        });
2313
2314        let raw_txn_data = if show_raw_txn_data_and_effects {
2315            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2316                error: "Failed to serialize transaction during dev inspect".to_string(),
2317            })?
2318        } else {
2319            vec![]
2320        };
2321
2322        transaction.validity_check_no_gas_check(protocol_config)?;
2323
2324        let input_object_kinds = transaction.input_objects()?;
2325        let receiving_object_refs = transaction.receiving_objects();
2326
2327        iota_transaction_checks::deny::check_transaction_for_signing(
2328            &transaction,
2329            &[],
2330            &input_object_kinds,
2331            &receiving_object_refs,
2332            &self.config.transaction_deny_config,
2333            self.get_backing_package_store().as_ref(),
2334        )?;
2335
2336        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2337            // We don't want to cache this transaction since it's a dev inspect.
2338            None,
2339            &input_object_kinds,
2340            &receiving_object_refs,
2341            epoch_store.epoch(),
2342        )?;
2343
2344        let (gas_status, checked_input_objects) = if skip_checks {
2345            // If we are skipping checks, then we call the check_dev_inspect_input function
2346            // which will perform only lightweight checks on the transaction
2347            // input. And if the gas field is empty, that means we will
2348            // use the dummy gas object so we need to add it to the input objects vector.
2349            if transaction.gas().is_empty() {
2350                // Create and use a dummy gas object if there is no gas object provided.
2351                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2352                    SIMULATION_GAS_COIN_VALUE,
2353                    transaction.gas_owner(),
2354                );
2355                let gas_object_ref = dummy_gas_object.compute_object_reference();
2356                transaction.gas_data_mut().payment = vec![gas_object_ref];
2357                input_objects.push(ObjectReadResult::new(
2358                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2359                    dummy_gas_object.into(),
2360                ));
2361            }
2362            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2363                protocol_config,
2364                &transaction_kind,
2365                input_objects,
2366                receiving_objects,
2367            )?;
2368            let gas_status = IotaGasStatus::new(
2369                max_tx_gas,
2370                transaction.gas_price(),
2371                reference_gas_price,
2372                protocol_config,
2373            )?;
2374
2375            (gas_status, checked_input_objects)
2376        } else {
2377            // If we are not skipping checks, then we call the check_transaction_input
2378            // function and its dummy gas variant which will perform full
2379            // fledged checks just like a real transaction execution.
2380            if transaction.gas().is_empty() {
2381                // Create and use a dummy gas object if there is no gas object provided.
2382                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2383                    SIMULATION_GAS_COIN_VALUE,
2384                    transaction.gas_owner(),
2385                );
2386                let gas_object_ref = dummy_gas_object.compute_object_reference();
2387                transaction.gas_data_mut().payment = vec![gas_object_ref];
2388                iota_transaction_checks::check_transaction_input_with_given_gas(
2389                    epoch_store.protocol_config(),
2390                    reference_gas_price,
2391                    &transaction,
2392                    input_objects,
2393                    receiving_objects,
2394                    dummy_gas_object,
2395                    &self.metrics.bytecode_verifier_metrics,
2396                    &self.config.verifier_signing_config,
2397                )?
2398            } else {
2399                // `MoveAuthenticator`s are not supported in dev inspects, so we set the
2400                // `authenticator_gas_budget` to 0.
2401                let authenticator_gas_budget = 0;
2402
2403                iota_transaction_checks::check_transaction_input(
2404                    epoch_store.protocol_config(),
2405                    reference_gas_price,
2406                    &transaction,
2407                    input_objects,
2408                    &receiving_objects,
2409                    &self.metrics.bytecode_verifier_metrics,
2410                    &self.config.verifier_signing_config,
2411                    authenticator_gas_budget,
2412                )?
2413            }
2414        };
2415
2416        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2417            .expect("Creating an executor should not fail here");
2418        let gas_data = transaction.gas_data().clone();
2419        let intent_msg = IntentMessage::new(
2420            Intent {
2421                version: IntentVersion::V0,
2422                scope: IntentScope::TransactionData,
2423                app_id: IntentAppId::Iota,
2424            },
2425            transaction,
2426        );
2427        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2428        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2429            self.get_backing_store().as_ref(),
2430            protocol_config,
2431            self.metrics.limits_metrics.clone(),
2432            // expensive checks
2433            false,
2434            self.config.certificate_deny_config.certificate_deny_set(),
2435            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2436            epoch_store
2437                .epoch_start_config()
2438                .epoch_data()
2439                .epoch_start_timestamp(),
2440            checked_input_objects,
2441            gas_data,
2442            gas_status,
2443            transaction_kind,
2444            sender,
2445            transaction_digest,
2446            skip_checks,
2447        );
2448
2449        let raw_effects = if show_raw_txn_data_and_effects {
2450            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2451                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2452            })?
2453        } else {
2454            vec![]
2455        };
2456
2457        let mut layout_resolver =
2458            epoch_store
2459                .executor()
2460                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2461                    &inner_temp_store,
2462                    self.get_backing_package_store(),
2463                )));
2464
2465        DevInspectResults::new(
2466            effects,
2467            inner_temp_store.events.clone(),
2468            execution_result,
2469            raw_txn_data,
2470            raw_effects,
2471            layout_resolver.as_mut(),
2472        )
2473    }
2474
2475    // Only used for testing because of how epoch store is loaded.
2476    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2477        let epoch_store = self.epoch_store_for_testing();
2478        Ok(epoch_store.reference_gas_price())
2479    }
2480
2481    #[instrument(level = "trace", skip_all)]
2482    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2483        self.get_transaction_cache_reader()
2484            .try_is_tx_already_executed(digest)
2485    }
2486
2487    /// Non-fallible version of `try_is_tx_already_executed`.
2488    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2489        self.try_is_tx_already_executed(digest)
2490            .expect("storage access failed")
2491    }
2492
2493    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2494    #[instrument(level = "debug", skip_all, err)]
2495    fn index_tx(
2496        &self,
2497        indexes: &IndexStore,
2498        digest: &TransactionDigest,
2499        // TODO: index_tx really just need the transaction data here.
2500        cert: &VerifiedExecutableTransaction,
2501        effects: &TransactionEffects,
2502        events: &TransactionEvents,
2503        timestamp_ms: u64,
2504        tx_coins: Option<TxCoins>,
2505        written: &WrittenObjects,
2506        inner_temporary_store: &InnerTemporaryStore,
2507    ) -> IotaResult<u64> {
2508        let changes = self
2509            .process_object_index(effects, written, inner_temporary_store)
2510            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2511
2512        indexes.index_tx(
2513            cert.data().intent_message().value.sender(),
2514            cert.data()
2515                .intent_message()
2516                .value
2517                .input_objects()?
2518                .iter()
2519                .map(|o| o.object_id()),
2520            effects
2521                .all_changed_objects()
2522                .into_iter()
2523                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2524            cert.data()
2525                .intent_message()
2526                .value
2527                .move_calls()
2528                .into_iter()
2529                .map(|(package, module, function)| {
2530                    (*package, module.to_owned(), function.to_owned())
2531                }),
2532            events,
2533            changes,
2534            digest,
2535            timestamp_ms,
2536            tx_coins,
2537        )
2538    }
2539
2540    #[cfg(msim)]
2541    fn create_fail_state(
2542        &self,
2543        certificate: &VerifiedExecutableTransaction,
2544        epoch_store: &Arc<AuthorityPerEpochStore>,
2545        effects: &mut TransactionEffects,
2546    ) {
2547        use std::cell::RefCell;
2548        thread_local! {
2549            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2550        }
2551        if !certificate.data().intent_message().value.is_system_tx() {
2552            let committee = epoch_store.committee();
2553            let cur_stake = (**committee).weight(&self.name);
2554            if cur_stake > 0 {
2555                FAIL_STATE.with_borrow_mut(|fail_state| {
2556                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2557                    if fail_state.0 < committee.validity_threshold() {
2558                        fail_state.0 += cur_stake;
2559                        fail_state.1.insert(self.name);
2560                    }
2561
2562                    if fail_state.1.contains(&self.name) {
2563                        info!("cp_exec failing tx");
2564                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2565                    }
2566                });
2567            }
2568        }
2569    }
2570
2571    fn process_object_index(
2572        &self,
2573        effects: &TransactionEffects,
2574        written: &WrittenObjects,
2575        inner_temporary_store: &InnerTemporaryStore,
2576    ) -> IotaResult<ObjectIndexChanges> {
2577        let epoch_store = self.load_epoch_store_one_call_per_task();
2578        let mut layout_resolver =
2579            epoch_store
2580                .executor()
2581                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2582                    inner_temporary_store,
2583                    self.get_backing_package_store(),
2584                )));
2585
2586        let modified_at_version = effects
2587            .modified_at_versions()
2588            .into_iter()
2589            .collect::<HashMap<_, _>>();
2590
2591        let tx_digest = effects.transaction_digest();
2592        let mut deleted_owners = vec![];
2593        let mut deleted_dynamic_fields = vec![];
2594        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2595            let old_version = modified_at_version.get(&id).unwrap();
2596            // When we process the index, the latest object hasn't been written yet so
2597            // the old object must be present.
2598            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2599                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2600            ) {
2601                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2602                Owner::ObjectOwner(object_id) => {
2603                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2604                }
2605                _ => {}
2606            }
2607        }
2608
2609        let mut new_owners = vec![];
2610        let mut new_dynamic_fields = vec![];
2611
2612        for (oref, owner, kind) in effects.all_changed_objects() {
2613            let id = &oref.0;
2614            // For mutated objects, retrieve old owner and delete old index if there is a
2615            // owner change.
2616            if let WriteKind::Mutate = kind {
2617                let Some(old_version) = modified_at_version.get(id) else {
2618                    panic!(
2619                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2620                    );
2621                };
2622                // When we process the index, the latest object hasn't been written yet so
2623                // the old object must be present.
2624                let Some(old_object) = self
2625                    .get_object_store()
2626                    .try_get_object_by_key(id, *old_version)?
2627                else {
2628                    panic!(
2629                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2630                    );
2631                };
2632                if old_object.owner != owner {
2633                    match old_object.owner {
2634                        Owner::AddressOwner(addr) => {
2635                            deleted_owners.push((addr, *id));
2636                        }
2637                        Owner::ObjectOwner(object_id) => {
2638                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2639                        }
2640                        _ => {}
2641                    }
2642                }
2643            }
2644
2645            match owner {
2646                Owner::AddressOwner(addr) => {
2647                    // TODO: We can remove the object fetching after we added ObjectType to
2648                    // TransactionEffects
2649                    let new_object = written.get(id).unwrap_or_else(
2650                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2651                    );
2652                    assert_eq!(
2653                        new_object.version(),
2654                        oref.1,
2655                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2656                        tx_digest,
2657                        id,
2658                        new_object.version(),
2659                        oref.1
2660                    );
2661
2662                    let type_ = new_object
2663                        .type_()
2664                        .map(|type_| ObjectType::Struct(type_.clone()))
2665                        .unwrap_or(ObjectType::Package);
2666
2667                    new_owners.push((
2668                        (addr, *id),
2669                        ObjectInfo {
2670                            object_id: *id,
2671                            version: oref.1,
2672                            digest: oref.2,
2673                            type_,
2674                            owner,
2675                            previous_transaction: *effects.transaction_digest(),
2676                        },
2677                    ));
2678                }
2679                Owner::ObjectOwner(owner) => {
2680                    let new_object = written.get(id).unwrap_or_else(
2681                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2682                    );
2683                    assert_eq!(
2684                        new_object.version(),
2685                        oref.1,
2686                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2687                        tx_digest,
2688                        id,
2689                        new_object.version(),
2690                        oref.1
2691                    );
2692
2693                    let Some(df_info) = self
2694                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2695                        .unwrap_or_else(|e| {
2696                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2697                            None
2698                        }
2699                    )
2700                        else {
2701                            // Skip indexing for non dynamic field objects.
2702                            continue;
2703                        };
2704                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2705                }
2706                _ => {}
2707            }
2708        }
2709
2710        Ok(ObjectIndexChanges {
2711            deleted_owners,
2712            deleted_dynamic_fields,
2713            new_owners,
2714            new_dynamic_fields,
2715        })
2716    }
2717
2718    fn try_create_dynamic_field_info(
2719        &self,
2720        o: &Object,
2721        written: &WrittenObjects,
2722        resolver: &mut dyn LayoutResolver,
2723    ) -> IotaResult<Option<DynamicFieldInfo>> {
2724        // Skip if not a move object
2725        let Some(move_object) = o.data.try_as_move().cloned() else {
2726            return Ok(None);
2727        };
2728
2729        // We only index dynamic field objects
2730        if !move_object.type_().is_dynamic_field() {
2731            return Ok(None);
2732        }
2733
2734        let layout = resolver
2735            .get_annotated_layout(&move_object.type_().clone().into())?
2736            .into_layout();
2737
2738        let field =
2739            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2740                IotaError::ObjectDeserialization {
2741                    error: e.to_string(),
2742                }
2743            })?;
2744
2745        let type_ = field.kind;
2746        let name_type: TypeTag = field.name_layout.into();
2747        let bcs_name = field.name_bytes.to_owned();
2748
2749        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2750            .map_err(|e| {
2751                warn!("{e}");
2752                IotaError::ObjectDeserialization {
2753                    error: e.to_string(),
2754                }
2755            })?;
2756
2757        let name = DynamicFieldName {
2758            type_: name_type,
2759            value: IotaMoveValue::from(name_value).to_json_value(),
2760        };
2761
2762        let value_metadata = field.value_metadata().map_err(|e| {
2763            warn!("{e}");
2764            IotaError::ObjectDeserialization {
2765                error: e.to_string(),
2766            }
2767        })?;
2768
2769        Ok(Some(match value_metadata {
2770            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2771                name,
2772                bcs_name,
2773                type_,
2774                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2775                object_id: o.id(),
2776                version: o.version(),
2777                digest: o.digest(),
2778            },
2779
2780            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2781                // Find the actual object from storage using the object id obtained from the
2782                // wrapper.
2783
2784                // Try to find the object in the written objects first.
2785                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2786                    (
2787                        object.version(),
2788                        object.digest(),
2789                        object.data.type_().unwrap().clone(),
2790                    )
2791                } else {
2792                    // If not found, try to find it in the database.
2793                    let object = self
2794                        .get_object_store()
2795                        .try_get_object_by_key(&object_id, o.version())?
2796                        .ok_or_else(|| UserInputError::ObjectNotFound {
2797                            object_id,
2798                            version: Some(o.version()),
2799                        })?;
2800                    let version = object.version();
2801                    let digest = object.digest();
2802                    let object_type = object.data.type_().unwrap().clone();
2803                    (version, digest, object_type)
2804                };
2805
2806                DynamicFieldInfo {
2807                    name,
2808                    bcs_name,
2809                    type_,
2810                    object_type: object_type.to_string(),
2811                    object_id,
2812                    version,
2813                    digest,
2814                }
2815            }
2816        }))
2817    }
2818
2819    #[instrument(level = "trace", skip_all, err)]
2820    fn post_process_one_tx(
2821        &self,
2822        certificate: &VerifiedExecutableTransaction,
2823        effects: &TransactionEffects,
2824        inner_temporary_store: &InnerTemporaryStore,
2825        epoch_store: &Arc<AuthorityPerEpochStore>,
2826    ) -> IotaResult {
2827        if self.indexes.is_none() {
2828            return Ok(());
2829        }
2830
2831        let _scope = monitored_scope("Execution::post_process_one_tx");
2832
2833        let tx_digest = certificate.digest();
2834        let timestamp_ms = Self::unixtime_now_ms();
2835        let events = &inner_temporary_store.events;
2836        let written = &inner_temporary_store.written;
2837        let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2838            effects,
2839            inner_temporary_store,
2840            epoch_store,
2841        );
2842
2843        // Index tx
2844        if let Some(indexes) = &self.indexes {
2845            let _ = self
2846                .index_tx(
2847                    indexes.as_ref(),
2848                    tx_digest,
2849                    certificate,
2850                    effects,
2851                    events,
2852                    timestamp_ms,
2853                    tx_coins,
2854                    written,
2855                    inner_temporary_store,
2856                )
2857                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2858                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2859                .expect("Indexing tx should not fail");
2860
2861            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2862            let events = self.make_transaction_block_events(
2863                events.clone(),
2864                *tx_digest,
2865                timestamp_ms,
2866                epoch_store,
2867                inner_temporary_store,
2868            )?;
2869            // Emit events
2870            self.subscription_handler
2871                .process_tx(certificate.data().transaction_data(), &effects, &events)
2872                .tap_ok(|_| {
2873                    self.metrics
2874                        .post_processing_total_tx_had_event_processed
2875                        .inc()
2876                })
2877                .tap_err(|e| {
2878                    warn!(
2879                        ?tx_digest,
2880                        "Post processing - Couldn't process events for tx: {}", e
2881                    )
2882                })?;
2883
2884            self.metrics
2885                .post_processing_total_events_emitted
2886                .inc_by(events.data.len() as u64);
2887        };
2888        Ok(())
2889    }
2890
2891    fn make_transaction_block_events(
2892        &self,
2893        transaction_events: TransactionEvents,
2894        digest: TransactionDigest,
2895        timestamp_ms: u64,
2896        epoch_store: &Arc<AuthorityPerEpochStore>,
2897        inner_temporary_store: &InnerTemporaryStore,
2898    ) -> IotaResult<IotaTransactionBlockEvents> {
2899        let mut layout_resolver =
2900            epoch_store
2901                .executor()
2902                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2903                    inner_temporary_store,
2904                    self.get_backing_package_store(),
2905                )));
2906        IotaTransactionBlockEvents::try_from(
2907            transaction_events,
2908            digest,
2909            Some(timestamp_ms),
2910            layout_resolver.as_mut(),
2911        )
2912    }
2913
2914    pub fn unixtime_now_ms() -> u64 {
2915        let now = SystemTime::now()
2916            .duration_since(UNIX_EPOCH)
2917            .expect("Time went backwards")
2918            .as_millis();
2919        u64::try_from(now).expect("Travelling in time machine")
2920    }
2921
2922    #[instrument(level = "trace", skip_all)]
2923    pub async fn handle_transaction_info_request(
2924        &self,
2925        request: TransactionInfoRequest,
2926    ) -> IotaResult<TransactionInfoResponse> {
2927        let epoch_store = self.load_epoch_store_one_call_per_task();
2928        let (transaction, status) = self
2929            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2930            .ok_or(IotaError::TransactionNotFound {
2931                digest: request.transaction_digest,
2932            })?;
2933        Ok(TransactionInfoResponse {
2934            transaction,
2935            status,
2936        })
2937    }
2938
2939    #[instrument(level = "trace", skip_all)]
2940    pub async fn handle_object_info_request(
2941        &self,
2942        request: ObjectInfoRequest,
2943    ) -> IotaResult<ObjectInfoResponse> {
2944        let epoch_store = self.load_epoch_store_one_call_per_task();
2945
2946        let requested_object_seq = match request.request_kind {
2947            ObjectInfoRequestKind::LatestObjectInfo => {
2948                let (_, seq, _) = self
2949                    .try_get_object_or_tombstone(request.object_id)
2950                    .await?
2951                    .ok_or_else(|| {
2952                        IotaError::from(UserInputError::ObjectNotFound {
2953                            object_id: request.object_id,
2954                            version: None,
2955                        })
2956                    })?;
2957                seq
2958            }
2959            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2960        };
2961
2962        let object = self
2963            .get_object_store()
2964            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2965            .ok_or_else(|| {
2966                IotaError::from(UserInputError::ObjectNotFound {
2967                    object_id: request.object_id,
2968                    version: Some(requested_object_seq),
2969                })
2970            })?;
2971
2972        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2973            (request.generate_layout, object.data.try_as_move())
2974        {
2975            Some(into_struct_layout(
2976                epoch_store
2977                    .executor()
2978                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2979                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2980            )?)
2981        } else {
2982            None
2983        };
2984
2985        let lock = if !object.is_address_owned() {
2986            // Only address owned objects have locks.
2987            None
2988        } else {
2989            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2990                .await?
2991                .map(|s| s.into_inner())
2992        };
2993
2994        Ok(ObjectInfoResponse {
2995            object,
2996            layout,
2997            lock_for_debugging: lock,
2998        })
2999    }
3000
3001    #[instrument(level = "trace", skip_all)]
3002    pub fn handle_checkpoint_request(
3003        &self,
3004        request: &CheckpointRequest,
3005    ) -> IotaResult<CheckpointResponse> {
3006        let summary = if request.certified {
3007            let summary = match request.sequence_number {
3008                Some(seq) => self
3009                    .checkpoint_store
3010                    .get_checkpoint_by_sequence_number(seq)?,
3011                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3012            }
3013            .map(|v| v.into_inner());
3014            summary.map(CheckpointSummaryResponse::Certified)
3015        } else {
3016            let summary = match request.sequence_number {
3017                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3018                None => self
3019                    .checkpoint_store
3020                    .get_latest_locally_computed_checkpoint()?,
3021            };
3022            summary.map(CheckpointSummaryResponse::Pending)
3023        };
3024        let contents = match &summary {
3025            Some(s) => self
3026                .checkpoint_store
3027                .get_checkpoint_contents(&s.content_digest())?,
3028            None => None,
3029        };
3030        Ok(CheckpointResponse {
3031            checkpoint: summary,
3032            contents,
3033        })
3034    }
3035
3036    fn check_protocol_version(
3037        supported_protocol_versions: SupportedProtocolVersions,
3038        current_version: ProtocolVersion,
3039    ) {
3040        info!("current protocol version is now {:?}", current_version);
3041        info!("supported versions are: {:?}", supported_protocol_versions);
3042        if !supported_protocol_versions.is_version_supported(current_version) {
3043            let msg = format!(
3044                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3045            );
3046
3047            error!("{}", msg);
3048            eprintln!("{msg}");
3049
3050            #[cfg(not(msim))]
3051            std::process::exit(1);
3052
3053            #[cfg(msim)]
3054            iota_simulator::task::shutdown_current_node();
3055        }
3056    }
3057
3058    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
3059    pub async fn new(
3060        name: AuthorityName,
3061        secret: StableSyncAuthoritySigner,
3062        supported_protocol_versions: SupportedProtocolVersions,
3063        store: Arc<AuthorityStore>,
3064        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3065        epoch_store: Arc<AuthorityPerEpochStore>,
3066        committee_store: Arc<CommitteeStore>,
3067        indexes: Option<Arc<IndexStore>>,
3068        grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3069        checkpoint_store: Arc<CheckpointStore>,
3070        prometheus_registry: &Registry,
3071        genesis_objects: &[Object],
3072        db_checkpoint_config: &DBCheckpointConfig,
3073        config: NodeConfig,
3074        archive_readers: ArchiveReaderBalancer,
3075        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3076        chain_identifier: ChainIdentifier,
3077        pruner_db: Option<Arc<AuthorityPrunerTables>>,
3078    ) -> Arc<Self> {
3079        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3080
3081        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3082        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3083        let transaction_manager = Arc::new(TransactionManager::new(
3084            execution_cache_trait_pointers.object_cache_reader.clone(),
3085            execution_cache_trait_pointers
3086                .transaction_cache_reader
3087                .clone(),
3088            &epoch_store,
3089            tx_ready_certificates,
3090            metrics.clone(),
3091        ));
3092        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3093
3094        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3095            epoch_store.get_parent_path(),
3096            &config.authority_store_pruning_config,
3097        );
3098        let _pruner = AuthorityStorePruner::new(
3099            store.perpetual_tables.clone(),
3100            checkpoint_store.clone(),
3101            grpc_indexes_store.clone(),
3102            indexes.clone(),
3103            config.authority_store_pruning_config.clone(),
3104            epoch_store.committee().authority_exists(&name),
3105            epoch_store.epoch_start_state().epoch_duration_ms(),
3106            prometheus_registry,
3107            archive_readers,
3108            pruner_db,
3109        );
3110        let input_loader =
3111            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3112        let epoch = epoch_store.epoch();
3113        let rgp = epoch_store.reference_gas_price();
3114        let state = Arc::new(AuthorityState {
3115            name,
3116            secret,
3117            execution_lock: RwLock::new(epoch),
3118            epoch_store: ArcSwap::new(epoch_store.clone()),
3119            input_loader,
3120            execution_cache_trait_pointers,
3121            indexes,
3122            grpc_indexes_store,
3123            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3124            checkpoint_store,
3125            committee_store,
3126            transaction_manager,
3127            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3128            metrics,
3129            _pruner,
3130            _authority_per_epoch_pruner,
3131            db_checkpoint_config: db_checkpoint_config.clone(),
3132            config,
3133            overload_info: AuthorityOverloadInfo::default(),
3134            validator_tx_finalizer,
3135            chain_identifier,
3136            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3137        });
3138
3139        // Start a task to execute ready certificates.
3140        let authority_state = Arc::downgrade(&state);
3141        spawn_monitored_task!(execution_process(
3142            authority_state,
3143            rx_ready_certificates,
3144            rx_execution_shutdown,
3145        ));
3146        spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3147
3148        // TODO: This doesn't belong to the constructor of AuthorityState.
3149        state
3150            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3151            .expect("Error indexing genesis objects.");
3152
3153        state
3154    }
3155
3156    // TODO: Consolidate our traits to reduce the number of methods here.
3157    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3158        &self.execution_cache_trait_pointers.object_cache_reader
3159    }
3160
3161    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3162        &self.execution_cache_trait_pointers.transaction_cache_reader
3163    }
3164
3165    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3166        &self.execution_cache_trait_pointers.cache_writer
3167    }
3168
3169    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3170        &self.execution_cache_trait_pointers.backing_store
3171    }
3172
3173    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3174        &self.execution_cache_trait_pointers.backing_package_store
3175    }
3176
3177    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3178        &self.execution_cache_trait_pointers.object_store
3179    }
3180
3181    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3182        &self.execution_cache_trait_pointers.reconfig_api
3183    }
3184
3185    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3186        &self.execution_cache_trait_pointers.accumulator_store
3187    }
3188
3189    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3190        &self.execution_cache_trait_pointers.checkpoint_cache
3191    }
3192
3193    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3194        &self.execution_cache_trait_pointers.state_sync_store
3195    }
3196
3197    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3198        &self.execution_cache_trait_pointers.cache_commit
3199    }
3200
3201    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3202        self.execution_cache_trait_pointers
3203            .testing_api
3204            .database_for_testing()
3205    }
3206
3207    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3208        &self,
3209        config: NodeConfig,
3210        metrics: Arc<AuthorityStorePruningMetrics>,
3211    ) -> anyhow::Result<()> {
3212        let archive_readers =
3213            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3214        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3215            &self.database_for_testing().perpetual_tables,
3216            &self.checkpoint_store,
3217            self.grpc_indexes_store.as_deref(),
3218            None,
3219            config.authority_store_pruning_config,
3220            metrics,
3221            archive_readers,
3222            EPOCH_DURATION_MS_FOR_TESTING,
3223        )
3224        .await
3225    }
3226
3227    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3228        &self.transaction_manager
3229    }
3230
3231    /// Adds transactions / certificates to transaction manager for ordered
3232    /// execution.
3233    pub fn enqueue_transactions_for_execution(
3234        &self,
3235        txns: Vec<VerifiedExecutableTransaction>,
3236        epoch_store: &Arc<AuthorityPerEpochStore>,
3237    ) {
3238        self.transaction_manager.enqueue(txns, epoch_store)
3239    }
3240
3241    /// Adds certificates to transaction manager for ordered execution.
3242    pub fn enqueue_certificates_for_execution(
3243        &self,
3244        certs: Vec<VerifiedCertificate>,
3245        epoch_store: &Arc<AuthorityPerEpochStore>,
3246    ) {
3247        self.transaction_manager
3248            .enqueue_certificates(certs, epoch_store)
3249    }
3250
3251    pub fn enqueue_with_expected_effects_digest(
3252        &self,
3253        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3254        epoch_store: &AuthorityPerEpochStore,
3255    ) {
3256        self.transaction_manager
3257            .enqueue_with_expected_effects_digest(certs, epoch_store)
3258    }
3259
3260    fn create_owner_index_if_empty(
3261        &self,
3262        genesis_objects: &[Object],
3263        epoch_store: &Arc<AuthorityPerEpochStore>,
3264    ) -> IotaResult {
3265        let Some(index_store) = &self.indexes else {
3266            return Ok(());
3267        };
3268        if !index_store.is_empty() {
3269            return Ok(());
3270        }
3271
3272        let mut new_owners = vec![];
3273        let mut new_dynamic_fields = vec![];
3274        let mut layout_resolver = epoch_store
3275            .executor()
3276            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3277        for o in genesis_objects.iter() {
3278            match o.owner {
3279                Owner::AddressOwner(addr) => new_owners.push((
3280                    (addr, o.id()),
3281                    ObjectInfo::new(&o.compute_object_reference(), o),
3282                )),
3283                Owner::ObjectOwner(object_id) => {
3284                    let id = o.id();
3285                    let info = match self.try_create_dynamic_field_info(
3286                        o,
3287                        &BTreeMap::new(),
3288                        layout_resolver.as_mut(),
3289                    ) {
3290                        Ok(Some(info)) => info,
3291                        Ok(None) => continue,
3292                        Err(IotaError::UserInput {
3293                            error:
3294                                UserInputError::ObjectNotFound {
3295                                    object_id: not_found_id,
3296                                    version,
3297                                },
3298                        }) => {
3299                            warn!(
3300                                ?not_found_id,
3301                                ?version,
3302                                object_owner=?object_id,
3303                                field=?id,
3304                                "Skipping dynamic field: referenced genesis object not found"
3305                            );
3306                            continue;
3307                        }
3308                        Err(e) => return Err(e),
3309                    };
3310                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3311                }
3312                _ => {}
3313            }
3314        }
3315
3316        index_store.insert_genesis_objects(ObjectIndexChanges {
3317            deleted_owners: vec![],
3318            deleted_dynamic_fields: vec![],
3319            new_owners,
3320            new_dynamic_fields,
3321        })
3322    }
3323
3324    /// Attempts to acquire execution lock for an executable transaction.
3325    /// Returns the lock if the transaction is matching current executed epoch
3326    /// Returns None otherwise
3327    pub fn execution_lock_for_executable_transaction(
3328        &self,
3329        transaction: &VerifiedExecutableTransaction,
3330    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3331        let lock = self
3332            .execution_lock
3333            .try_read()
3334            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3335        if *lock == transaction.auth_sig().epoch() {
3336            Ok(lock)
3337        } else {
3338            Err(IotaError::WrongEpoch {
3339                expected_epoch: *lock,
3340                actual_epoch: transaction.auth_sig().epoch(),
3341            })
3342        }
3343    }
3344
3345    /// Acquires the execution lock for the duration of a transaction signing
3346    /// request. This prevents reconfiguration from starting until we are
3347    /// finished handling the signing request. Otherwise, in-memory lock
3348    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3349    /// while we are attempting to acquire locks for the transaction.
3350    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3351        self.execution_lock
3352            .try_read()
3353            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3354    }
3355
3356    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3357        self.execution_lock.write().await
3358    }
3359
3360    #[instrument(level = "error", skip_all)]
3361    pub async fn reconfigure(
3362        &self,
3363        cur_epoch_store: &AuthorityPerEpochStore,
3364        supported_protocol_versions: SupportedProtocolVersions,
3365        new_committee: Committee,
3366        epoch_start_configuration: EpochStartConfiguration,
3367        accumulator: Arc<StateAccumulator>,
3368        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3369        epoch_supply_change: i64,
3370        epoch_last_checkpoint: CheckpointSequenceNumber,
3371    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3372        Self::check_protocol_version(
3373            supported_protocol_versions,
3374            epoch_start_configuration
3375                .epoch_start_state()
3376                .protocol_version(),
3377        );
3378        self.metrics.reset_on_reconfigure();
3379        self.committee_store.insert_new_committee(&new_committee)?;
3380
3381        // Wait until no transactions are being executed.
3382        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3383
3384        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3385        cur_epoch_store.epoch_terminated().await;
3386
3387        let highest_locally_built_checkpoint_seq = self
3388            .checkpoint_store
3389            .get_latest_locally_computed_checkpoint()?
3390            .map(|c| *c.sequence_number())
3391            .unwrap_or(0);
3392
3393        assert!(
3394            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3395            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3396        );
3397        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3398            // if we built the last checkpoint locally (as opposed to receiving it from a
3399            // peer), then all shared_version_assignments except the one for the
3400            // ChangeEpoch transaction should have been removed
3401            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3402            // Note that while 1 is the typical value, 0 is possible if the node restarts
3403            // after committing the last checkpoint but before reconfiguring.
3404            if num_shared_version_assignments > 1 {
3405                // If this happens in prod, we have a memory leak, but not a correctness issue.
3406                debug_fatal!(
3407                    "all shared_version_assignments should have been removed \
3408                    (num_shared_version_assignments: {num_shared_version_assignments})"
3409                );
3410            }
3411        }
3412
3413        // Safe to reconfigure now. No transactions are being executed,
3414        // and no epoch-specific tasks are running.
3415
3416        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3417        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3418        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3419            .await?;
3420        self.get_reconfig_api()
3421            .clear_state_end_of_epoch(&execution_lock);
3422        self.check_system_consistency(
3423            cur_epoch_store,
3424            accumulator,
3425            expensive_safety_check_config,
3426            epoch_supply_change,
3427        )?;
3428        self.get_reconfig_api()
3429            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3430        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3431            if self
3432                .db_checkpoint_config
3433                .perform_db_checkpoints_at_epoch_end
3434            {
3435                let checkpoint_indexes = self
3436                    .db_checkpoint_config
3437                    .perform_index_db_checkpoints_at_epoch_end
3438                    .unwrap_or(false);
3439                let current_epoch = cur_epoch_store.epoch();
3440                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3441                self.checkpoint_all_dbs(
3442                    &epoch_checkpoint_path,
3443                    cur_epoch_store,
3444                    checkpoint_indexes,
3445                )?;
3446            }
3447        }
3448
3449        self.get_reconfig_api()
3450            .reconfigure_cache(&epoch_start_configuration)
3451            .await;
3452
3453        let new_epoch = new_committee.epoch;
3454        let new_epoch_store = self
3455            .reopen_epoch_db(
3456                cur_epoch_store,
3457                new_committee,
3458                epoch_start_configuration,
3459                expensive_safety_check_config,
3460                epoch_last_checkpoint,
3461            )
3462            .await?;
3463        assert_eq!(new_epoch_store.epoch(), new_epoch);
3464        self.transaction_manager.reconfigure(new_epoch);
3465        *execution_lock = new_epoch;
3466        // drop execution_lock after epoch store was updated
3467        // see also assert in AuthorityState::process_certificate
3468        // on the epoch store and execution lock epoch match
3469        Ok(new_epoch_store)
3470    }
3471
3472    /// Advance the epoch store to the next epoch for testing only.
3473    /// This only manually sets all the places where we have the epoch number.
3474    /// It doesn't properly reconfigure the node, hence should be only used for
3475    /// testing.
3476    pub async fn reconfigure_for_testing(&self) {
3477        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3478        let epoch_store = self.epoch_store_for_testing().clone();
3479        let protocol_config = epoch_store.protocol_config().clone();
3480        // The current protocol config used in the epoch store may have been overridden
3481        // and diverged from the protocol config definitions. That override may
3482        // have now been dropped when the initial guard was dropped. We reapply
3483        // the override before creating the new epoch store, to make sure that
3484        // the new epoch store has the same protocol config as the current one.
3485        // Since this is for testing only, we mostly like to keep the protocol config
3486        // the same across epochs.
3487        let _guard =
3488            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3489        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3490            self.get_backing_package_store().clone(),
3491            self.get_object_store().clone(),
3492            &self.config.expensive_safety_check_config,
3493            self.checkpoint_store
3494                .get_epoch_last_checkpoint(epoch_store.epoch())
3495                .unwrap()
3496                .map(|c| *c.sequence_number())
3497                .unwrap_or_default(),
3498        );
3499        let new_epoch = new_epoch_store.epoch();
3500        self.transaction_manager.reconfigure(new_epoch);
3501        self.epoch_store.store(new_epoch_store);
3502        epoch_store.epoch_terminated().await;
3503        *execution_lock = new_epoch;
3504    }
3505
3506    #[instrument(level = "error", skip_all)]
3507    fn check_system_consistency(
3508        &self,
3509        cur_epoch_store: &AuthorityPerEpochStore,
3510        accumulator: Arc<StateAccumulator>,
3511        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3512        epoch_supply_change: i64,
3513    ) -> IotaResult<()> {
3514        info!(
3515            "Performing iota conservation consistency check for epoch {}",
3516            cur_epoch_store.epoch()
3517        );
3518
3519        if cfg!(debug_assertions) {
3520            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3521        }
3522
3523        self.get_reconfig_api()
3524            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3525
3526        // check for root state hash consistency with live object set
3527        if expensive_safety_check_config.enable_state_consistency_check() {
3528            info!(
3529                "Performing state consistency check for epoch {}",
3530                cur_epoch_store.epoch()
3531            );
3532            self.expensive_check_is_consistent_state(
3533                accumulator,
3534                cur_epoch_store,
3535                cfg!(debug_assertions), // panic in debug mode only
3536            );
3537        }
3538
3539        if expensive_safety_check_config.enable_secondary_index_checks() {
3540            if let Some(indexes) = self.indexes.clone() {
3541                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3542                    .expect("secondary indexes are inconsistent");
3543            }
3544        }
3545
3546        Ok(())
3547    }
3548
3549    fn expensive_check_is_consistent_state(
3550        &self,
3551        accumulator: Arc<StateAccumulator>,
3552        cur_epoch_store: &AuthorityPerEpochStore,
3553        panic: bool,
3554    ) {
3555        let live_object_set_hash = accumulator.digest_live_object_set();
3556
3557        let root_state_hash: ECMHLiveObjectSetDigest = self
3558            .get_accumulator_store()
3559            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3560            .expect("Retrieving root state hash cannot fail")
3561            .expect("Root state hash for epoch must exist")
3562            .1
3563            .digest()
3564            .into();
3565
3566        let is_inconsistent = root_state_hash != live_object_set_hash;
3567        if is_inconsistent {
3568            if panic {
3569                panic!(
3570                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3571                );
3572            } else {
3573                error!(
3574                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3575                    root_state_hash, live_object_set_hash
3576                );
3577            }
3578        } else {
3579            info!("State consistency check passed");
3580        }
3581
3582        if !panic {
3583            accumulator.set_inconsistent_state(is_inconsistent);
3584        }
3585    }
3586
3587    pub fn current_epoch_for_testing(&self) -> EpochId {
3588        self.epoch_store_for_testing().epoch()
3589    }
3590
3591    #[instrument(level = "error", skip_all)]
3592    pub fn checkpoint_all_dbs(
3593        &self,
3594        checkpoint_path: &Path,
3595        cur_epoch_store: &AuthorityPerEpochStore,
3596        checkpoint_indexes: bool,
3597    ) -> IotaResult {
3598        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3599        let current_epoch = cur_epoch_store.epoch();
3600
3601        if checkpoint_path.exists() {
3602            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3603            return Ok(());
3604        }
3605
3606        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3607        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3608
3609        if checkpoint_path_tmp.exists() {
3610            fs::remove_dir_all(&checkpoint_path_tmp)
3611                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3612        }
3613
3614        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3615        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3616
3617        // NOTE: Do not change the order of invoking these checkpoint calls
3618        // We want to snapshot checkpoint db first to not race with state sync
3619        self.checkpoint_store
3620            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3621
3622        self.get_reconfig_api()
3623            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3624
3625        self.committee_store
3626            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3627
3628        if checkpoint_indexes {
3629            if let Some(indexes) = self.indexes.as_ref() {
3630                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3631            }
3632            if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3633                grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3634            }
3635        }
3636
3637        fs::rename(checkpoint_path_tmp, checkpoint_path)
3638            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3639        Ok(())
3640    }
3641
3642    /// Load the current epoch store. This can change during reconfiguration. To
3643    /// ensure that we never end up accessing different epoch stores in a
3644    /// single task, we need to make sure that this is called once per task.
3645    /// Each call needs to be carefully audited to ensure it is
3646    /// the case. This also means we should minimize the number of call-sites.
3647    /// Only call it when there is no way to obtain it from somewhere else.
3648    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3649        self.epoch_store.load()
3650    }
3651
3652    // Load the epoch store, should be used in tests only.
3653    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3654        self.load_epoch_store_one_call_per_task()
3655    }
3656
3657    pub fn clone_committee_for_testing(&self) -> Committee {
3658        Committee::clone(self.epoch_store_for_testing().committee())
3659    }
3660
3661    #[instrument(level = "trace", skip_all)]
3662    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3663        self.get_object_store()
3664            .try_get_object(object_id)
3665            .map_err(Into::into)
3666    }
3667
3668    /// Non-fallible version of `try_get_object`.
3669    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3670        self.try_get_object(object_id)
3671            .await
3672            .expect("storage access failed")
3673    }
3674
3675    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3676        Ok(self
3677            .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3678            .await?
3679            .expect("framework object should always exist")
3680            .compute_object_reference())
3681    }
3682
3683    // This function is only used for testing.
3684    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3685        self.get_object_cache_reader()
3686            .try_get_iota_system_state_object_unsafe()
3687    }
3688
3689    #[instrument(level = "trace", skip_all)]
3690    pub fn get_checkpoint_by_sequence_number(
3691        &self,
3692        sequence_number: CheckpointSequenceNumber,
3693    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3694        Ok(self
3695            .checkpoint_store
3696            .get_checkpoint_by_sequence_number(sequence_number)?)
3697    }
3698
3699    /// Wait for the given transactions to be included in a checkpoint.
3700    ///
3701    /// Returns a mapping from transaction digest to
3702    /// `(checkpoint_sequence_number, checkpoint_timestamp_ms)`.
3703    /// On timeout, returns partial results for any transactions that were
3704    /// already checkpointed.
3705    pub async fn wait_for_checkpoint_inclusion(
3706        &self,
3707        digests: &[TransactionDigest],
3708        timeout: Duration,
3709    ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3710        let epoch_store = self.load_epoch_store_one_call_per_task();
3711
3712        // Local cache so multiple transactions in the same checkpoint only
3713        // trigger a single checkpoint summary lookup.
3714        let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3715
3716        let results = epoch_store
3717            .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3718                *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3719                    self.get_checkpoint_by_sequence_number(seq)
3720                        .ok()
3721                        .flatten()
3722                        .map(|c| c.timestamp_ms)
3723                        .unwrap_or(0)
3724                })
3725            })
3726            .await?;
3727
3728        Ok(digests
3729            .iter()
3730            .copied()
3731            .zip(results)
3732            .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3733            .collect())
3734    }
3735
3736    #[instrument(level = "trace", skip_all)]
3737    pub fn get_transaction_checkpoint_for_tests(
3738        &self,
3739        digest: &TransactionDigest,
3740        epoch_store: &AuthorityPerEpochStore,
3741    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3742        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3743        let Some(checkpoint) = checkpoint else {
3744            return Ok(None);
3745        };
3746        let checkpoint = self
3747            .checkpoint_store
3748            .get_checkpoint_by_sequence_number(checkpoint)?;
3749        Ok(checkpoint)
3750    }
3751
3752    #[instrument(level = "trace", skip_all)]
3753    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3754        Ok(
3755            match self
3756                .get_object_cache_reader()
3757                .try_get_latest_object_or_tombstone(*object_id)?
3758            {
3759                Some((_, ObjectOrTombstone::Object(object))) => {
3760                    let layout = self.get_object_layout(&object)?;
3761                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3762                }
3763                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3764                None => ObjectRead::NotExists(*object_id),
3765            },
3766        )
3767    }
3768
3769    /// Chain Identifier is the digest of the genesis checkpoint.
3770    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3771        self.chain_identifier
3772    }
3773
3774    #[instrument(level = "trace", skip_all)]
3775    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3776    where
3777        T: DeserializeOwned,
3778    {
3779        let o = self.get_object_read(object_id)?.into_object()?;
3780        if let Some(move_object) = o.data.try_as_move() {
3781            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3782                IotaError::ObjectDeserialization {
3783                    error: format!("{e}"),
3784                }
3785            })?)
3786        } else {
3787            Err(IotaError::ObjectDeserialization {
3788                error: format!("Provided object : [{object_id}] is not a Move object."),
3789            })
3790        }
3791    }
3792
3793    /// This function aims to serve rpc reads on past objects and
3794    /// we don't expect it to be called for other purposes.
3795    /// Depending on the object pruning policies that will be enforced in the
3796    /// future there is no software-level guarantee/SLA to retrieve an object
3797    /// with an old version even if it exists/existed.
3798    #[instrument(level = "trace", skip_all)]
3799    pub fn get_past_object_read(
3800        &self,
3801        object_id: &ObjectID,
3802        version: SequenceNumber,
3803    ) -> IotaResult<PastObjectRead> {
3804        // Firstly we see if the object ever existed by getting its latest data
3805        let Some(obj_ref) = self
3806            .get_object_cache_reader()
3807            .try_get_latest_object_ref_or_tombstone(*object_id)?
3808        else {
3809            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3810        };
3811
3812        if version > obj_ref.1 {
3813            return Ok(PastObjectRead::VersionTooHigh {
3814                object_id: *object_id,
3815                asked_version: version,
3816                latest_version: obj_ref.1,
3817            });
3818        }
3819
3820        if version < obj_ref.1 {
3821            // Read past objects
3822            return Ok(match self.read_object_at_version(object_id, version)? {
3823                Some((object, layout)) => {
3824                    let obj_ref = object.compute_object_reference();
3825                    PastObjectRead::VersionFound(obj_ref, object, layout)
3826                }
3827
3828                None => PastObjectRead::VersionNotFound(*object_id, version),
3829            });
3830        }
3831
3832        if !obj_ref.2.is_alive() {
3833            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3834        }
3835
3836        match self.read_object_at_version(object_id, obj_ref.1)? {
3837            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3838            None => {
3839                error!(
3840                    "Object with in parent_entry is missing from object store, datastore is \
3841                     inconsistent",
3842                );
3843                Err(UserInputError::ObjectNotFound {
3844                    object_id: *object_id,
3845                    version: Some(obj_ref.1),
3846                }
3847                .into())
3848            }
3849        }
3850    }
3851
3852    #[instrument(level = "trace", skip_all)]
3853    fn read_object_at_version(
3854        &self,
3855        object_id: &ObjectID,
3856        version: SequenceNumber,
3857    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3858        let Some(object) = self
3859            .get_object_cache_reader()
3860            .try_get_object_by_key(object_id, version)?
3861        else {
3862            return Ok(None);
3863        };
3864
3865        let layout = self.get_object_layout(&object)?;
3866        Ok(Some((object, layout)))
3867    }
3868
3869    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3870        let layout = object
3871            .data
3872            .try_as_move()
3873            .map(|object| {
3874                into_struct_layout(
3875                    self.load_epoch_store_one_call_per_task()
3876                        .executor()
3877                        // TODO(cache) - must read through cache
3878                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3879                        .get_annotated_layout(&object.type_().clone().into())?,
3880                )
3881            })
3882            .transpose()?;
3883        Ok(layout)
3884    }
3885
3886    fn get_owner_at_version(
3887        &self,
3888        object_id: &ObjectID,
3889        version: SequenceNumber,
3890    ) -> IotaResult<Owner> {
3891        self.get_object_store()
3892            .try_get_object_by_key(object_id, version)?
3893            .ok_or_else(|| {
3894                IotaError::from(UserInputError::ObjectNotFound {
3895                    object_id: *object_id,
3896                    version: Some(version),
3897                })
3898            })
3899            .map(|o| o.owner)
3900    }
3901
3902    #[instrument(level = "trace", skip_all)]
3903    pub fn get_owner_objects(
3904        &self,
3905        owner: IotaAddress,
3906        // If `Some`, the query will start from the next item after the specified cursor
3907        cursor: Option<ObjectID>,
3908        limit: usize,
3909        filter: Option<IotaObjectDataFilter>,
3910    ) -> IotaResult<Vec<ObjectInfo>> {
3911        if let Some(indexes) = &self.indexes {
3912            indexes.get_owner_objects(owner, cursor, limit, filter)
3913        } else {
3914            Err(IotaError::IndexStoreNotAvailable)
3915        }
3916    }
3917
3918    #[instrument(level = "trace", skip_all)]
3919    pub fn get_owned_coins_iterator_with_cursor(
3920        &self,
3921        owner: IotaAddress,
3922        // If `Some`, the query will start from the next item after the specified cursor
3923        cursor: (String, ObjectID),
3924        limit: usize,
3925        one_coin_type_only: bool,
3926    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3927        if let Some(indexes) = &self.indexes {
3928            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3929        } else {
3930            Err(IotaError::IndexStoreNotAvailable)
3931        }
3932    }
3933
3934    #[instrument(level = "trace", skip_all)]
3935    pub fn get_owner_objects_iterator(
3936        &self,
3937        owner: IotaAddress,
3938        // If `Some`, the query will start from the next item after the specified cursor
3939        cursor: Option<ObjectID>,
3940        filter: Option<IotaObjectDataFilter>,
3941    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3942        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3943        if let Some(indexes) = &self.indexes {
3944            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3945        } else {
3946            Err(IotaError::IndexStoreNotAvailable)
3947        }
3948    }
3949
3950    #[instrument(level = "trace", skip_all)]
3951    pub async fn get_move_objects<T>(
3952        &self,
3953        owner: IotaAddress,
3954        type_: MoveObjectType,
3955    ) -> IotaResult<Vec<T>>
3956    where
3957        T: DeserializeOwned,
3958    {
3959        let object_ids = self
3960            .get_owner_objects_iterator(owner, None, None)?
3961            .filter(|o| match &o.type_ {
3962                ObjectType::Struct(s) => &type_ == s,
3963                ObjectType::Package => false,
3964            })
3965            .map(|info| ObjectKey(info.object_id, info.version))
3966            .collect::<Vec<_>>();
3967        let mut move_objects = vec![];
3968
3969        let objects = self
3970            .get_object_store()
3971            .try_multi_get_objects_by_key(&object_ids)?;
3972
3973        for (o, id) in objects.into_iter().zip(object_ids) {
3974            let object = o.ok_or_else(|| {
3975                IotaError::from(UserInputError::ObjectNotFound {
3976                    object_id: id.0,
3977                    version: Some(id.1),
3978                })
3979            })?;
3980            let move_object = object.data.try_as_move().ok_or_else(|| {
3981                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3982            })?;
3983            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3984                IotaError::ObjectDeserialization {
3985                    error: format!("{e}"),
3986                }
3987            })?);
3988        }
3989        Ok(move_objects)
3990    }
3991
3992    #[instrument(level = "trace", skip_all)]
3993    pub fn get_dynamic_fields(
3994        &self,
3995        owner: ObjectID,
3996        // If `Some`, the query will start from the next item after the specified cursor
3997        cursor: Option<ObjectID>,
3998        limit: usize,
3999    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4000        Ok(self
4001            .get_dynamic_fields_iterator(owner, cursor)?
4002            .take(limit)
4003            .collect::<Result<Vec<_>, _>>()?)
4004    }
4005
4006    fn get_dynamic_fields_iterator(
4007        &self,
4008        owner: ObjectID,
4009        // If `Some`, the query will start from the next item after the specified cursor
4010        cursor: Option<ObjectID>,
4011    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4012    {
4013        if let Some(indexes) = &self.indexes {
4014            indexes.get_dynamic_fields_iterator(owner, cursor)
4015        } else {
4016            Err(IotaError::IndexStoreNotAvailable)
4017        }
4018    }
4019
4020    #[instrument(level = "trace", skip_all)]
4021    pub fn get_dynamic_field_object_id(
4022        &self,
4023        owner: ObjectID,
4024        name_type: TypeTag,
4025        name_bcs_bytes: &[u8],
4026    ) -> IotaResult<Option<ObjectID>> {
4027        if let Some(indexes) = &self.indexes {
4028            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4029        } else {
4030            Err(IotaError::IndexStoreNotAvailable)
4031        }
4032    }
4033
4034    #[instrument(level = "trace", skip_all)]
4035    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4036        Ok(self.get_indexes()?.next_sequence_number())
4037    }
4038
4039    #[instrument(level = "trace", skip_all)]
4040    pub async fn get_executed_transaction_and_effects(
4041        &self,
4042        digest: TransactionDigest,
4043        kv_store: Arc<TransactionKeyValueStore>,
4044    ) -> IotaResult<(Transaction, TransactionEffects)> {
4045        let transaction = kv_store.get_tx(digest).await?;
4046        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4047        Ok((transaction, effects))
4048    }
4049
4050    #[instrument(level = "trace", skip_all)]
4051    pub fn multi_get_checkpoint_by_sequence_number(
4052        &self,
4053        sequence_numbers: &[CheckpointSequenceNumber],
4054    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4055        Ok(self
4056            .checkpoint_store
4057            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4058    }
4059
4060    #[instrument(level = "trace", skip_all)]
4061    pub fn get_transaction_events(
4062        &self,
4063        digest: &TransactionDigest,
4064    ) -> IotaResult<TransactionEvents> {
4065        self.get_transaction_cache_reader()
4066            .try_get_events(digest)?
4067            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4068    }
4069
4070    pub fn get_transaction_input_objects(
4071        &self,
4072        effects: &TransactionEffects,
4073    ) -> anyhow::Result<Vec<Object>> {
4074        iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4075            .map_err(Into::into)
4076    }
4077
4078    pub fn get_transaction_output_objects(
4079        &self,
4080        effects: &TransactionEffects,
4081    ) -> anyhow::Result<Vec<Object>> {
4082        iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4083            .map_err(Into::into)
4084    }
4085
4086    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4087        match &self.indexes {
4088            Some(i) => Ok(i.clone()),
4089            None => Err(IotaError::UnsupportedFeature {
4090                error: "extended object indexing is not enabled on this server".into(),
4091            }),
4092        }
4093    }
4094
4095    pub async fn get_transactions_for_tests(
4096        self: &Arc<Self>,
4097        filter: Option<TransactionFilter>,
4098        cursor: Option<TransactionDigest>,
4099        limit: Option<usize>,
4100        reverse: bool,
4101    ) -> IotaResult<Vec<TransactionDigest>> {
4102        let metrics = KeyValueStoreMetrics::new_for_tests();
4103        let kv_store = Arc::new(TransactionKeyValueStore::new(
4104            "rocksdb",
4105            metrics,
4106            self.clone(),
4107        ));
4108        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4109            .await
4110    }
4111
4112    #[instrument(level = "trace", skip_all)]
4113    pub async fn get_transactions(
4114        &self,
4115        kv_store: &Arc<TransactionKeyValueStore>,
4116        filter: Option<TransactionFilter>,
4117        // If `Some`, the query will start from the next item after the specified cursor
4118        cursor: Option<TransactionDigest>,
4119        limit: Option<usize>,
4120        reverse: bool,
4121    ) -> IotaResult<Vec<TransactionDigest>> {
4122        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4123            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4124            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4125            if reverse {
4126                let iter = iter
4127                    .rev()
4128                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4129                    .skip(usize::from(cursor.is_some()));
4130                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4131            } else {
4132                let iter = iter
4133                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4134                    .skip(usize::from(cursor.is_some()));
4135                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4136            }
4137        }
4138        self.get_indexes()?
4139            .get_transactions(filter, cursor, limit, reverse)
4140    }
4141
4142    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4143        &self.checkpoint_store
4144    }
4145
4146    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4147        self.get_checkpoint_store()
4148            .get_highest_executed_checkpoint_seq_number()?
4149            .ok_or(IotaError::UserInput {
4150                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4151            })
4152    }
4153
4154    #[cfg(msim)]
4155    pub fn get_highest_pruned_checkpoint_for_testing(
4156        &self,
4157    ) -> IotaResult<CheckpointSequenceNumber> {
4158        self.database_for_testing()
4159            .perpetual_tables
4160            .get_highest_pruned_checkpoint()
4161            .map(|c| c.unwrap_or(0))
4162            .map_err(Into::into)
4163    }
4164
4165    #[instrument(level = "trace", skip_all)]
4166    pub fn get_checkpoint_summary_by_sequence_number(
4167        &self,
4168        sequence_number: CheckpointSequenceNumber,
4169    ) -> IotaResult<CheckpointSummary> {
4170        let verified_checkpoint = self
4171            .get_checkpoint_store()
4172            .get_checkpoint_by_sequence_number(sequence_number)?;
4173        match verified_checkpoint {
4174            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4175            None => Err(IotaError::UserInput {
4176                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4177            }),
4178        }
4179    }
4180
4181    #[instrument(level = "trace", skip_all)]
4182    pub fn get_checkpoint_summary_by_digest(
4183        &self,
4184        digest: CheckpointDigest,
4185    ) -> IotaResult<CheckpointSummary> {
4186        let verified_checkpoint = self
4187            .get_checkpoint_store()
4188            .get_checkpoint_by_digest(&digest)?;
4189        match verified_checkpoint {
4190            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4191            None => Err(IotaError::UserInput {
4192                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4193            }),
4194        }
4195    }
4196
4197    #[instrument(level = "trace", skip_all)]
4198    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4199        if is_system_package(package_id) {
4200            return self.find_genesis_txn_digest();
4201        }
4202        Ok(self
4203            .get_object_read(&package_id)?
4204            .into_object()?
4205            .previous_transaction)
4206    }
4207
4208    #[instrument(level = "trace", skip_all)]
4209    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4210        let summary = self
4211            .get_verified_checkpoint_by_sequence_number(0)?
4212            .into_message();
4213        let content = self.get_checkpoint_contents(summary.content_digest)?;
4214        let genesis_transaction = content.enumerate_transactions(&summary).next();
4215        Ok(genesis_transaction
4216            .ok_or(IotaError::UserInput {
4217                error: UserInputError::GenesisTransactionNotFound,
4218            })?
4219            .1
4220            .transaction)
4221    }
4222
4223    #[instrument(level = "trace", skip_all)]
4224    pub fn get_verified_checkpoint_by_sequence_number(
4225        &self,
4226        sequence_number: CheckpointSequenceNumber,
4227    ) -> IotaResult<VerifiedCheckpoint> {
4228        let verified_checkpoint = self
4229            .get_checkpoint_store()
4230            .get_checkpoint_by_sequence_number(sequence_number)?;
4231        match verified_checkpoint {
4232            Some(verified_checkpoint) => Ok(verified_checkpoint),
4233            None => Err(IotaError::UserInput {
4234                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4235            }),
4236        }
4237    }
4238
4239    #[instrument(level = "trace", skip_all)]
4240    pub fn get_verified_checkpoint_summary_by_digest(
4241        &self,
4242        digest: CheckpointDigest,
4243    ) -> IotaResult<VerifiedCheckpoint> {
4244        let verified_checkpoint = self
4245            .get_checkpoint_store()
4246            .get_checkpoint_by_digest(&digest)?;
4247        match verified_checkpoint {
4248            Some(verified_checkpoint) => Ok(verified_checkpoint),
4249            None => Err(IotaError::UserInput {
4250                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4251            }),
4252        }
4253    }
4254
4255    #[instrument(level = "trace", skip_all)]
4256    pub fn get_checkpoint_contents(
4257        &self,
4258        digest: CheckpointContentsDigest,
4259    ) -> IotaResult<CheckpointContents> {
4260        self.get_checkpoint_store()
4261            .get_checkpoint_contents(&digest)?
4262            .ok_or(IotaError::UserInput {
4263                error: UserInputError::CheckpointContentsNotFound(digest),
4264            })
4265    }
4266
4267    #[instrument(level = "trace", skip_all)]
4268    pub fn get_checkpoint_contents_by_sequence_number(
4269        &self,
4270        sequence_number: CheckpointSequenceNumber,
4271    ) -> IotaResult<CheckpointContents> {
4272        let verified_checkpoint = self
4273            .get_checkpoint_store()
4274            .get_checkpoint_by_sequence_number(sequence_number)?;
4275        match verified_checkpoint {
4276            Some(verified_checkpoint) => {
4277                let content_digest = verified_checkpoint.into_inner().content_digest;
4278                self.get_checkpoint_contents(content_digest)
4279            }
4280            None => Err(IotaError::UserInput {
4281                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4282            }),
4283        }
4284    }
4285
4286    #[instrument(level = "trace", skip_all)]
4287    pub async fn query_events(
4288        &self,
4289        kv_store: &Arc<TransactionKeyValueStore>,
4290        query: EventFilter,
4291        // If `Some`, the query will start from the next item after the specified cursor
4292        cursor: Option<EventID>,
4293        limit: usize,
4294        descending: bool,
4295    ) -> IotaResult<Vec<IotaEvent>> {
4296        let index_store = self.get_indexes()?;
4297
4298        // Get the tx_num from tx_digest
4299        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4300            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4301                IotaError::TransactionNotFound {
4302                    digest: cursor.tx_digest,
4303                },
4304            )?;
4305            (tx_seq, cursor.event_seq as usize)
4306        } else if descending {
4307            (u64::MAX, usize::MAX)
4308        } else {
4309            (0, 0)
4310        };
4311
4312        let limit = limit + 1;
4313        let mut event_keys = match query {
4314            EventFilter::All(filters) => {
4315                if filters.is_empty() {
4316                    index_store.all_events(tx_num, event_num, limit, descending)?
4317                } else {
4318                    return Err(IotaError::UserInput {
4319                        error: UserInputError::Unsupported(
4320                            "This query type does not currently support filter combinations"
4321                                .to_string(),
4322                        ),
4323                    });
4324                }
4325            }
4326            EventFilter::Transaction(digest) => {
4327                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4328            }
4329            EventFilter::MoveModule { package, module } => {
4330                let module_id = ModuleId::new(package.into(), module);
4331                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4332            }
4333            EventFilter::MoveEventType(struct_name) => index_store
4334                .events_by_move_event_struct_name(
4335                    &struct_name,
4336                    tx_num,
4337                    event_num,
4338                    limit,
4339                    descending,
4340                )?,
4341            EventFilter::Sender(sender) => {
4342                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4343            }
4344            EventFilter::TimeRange {
4345                start_time,
4346                end_time,
4347            } => index_store
4348                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4349            EventFilter::MoveEventModule { package, module } => index_store
4350                .events_by_move_event_module(
4351                    &ModuleId::new(package.into(), module),
4352                    tx_num,
4353                    event_num,
4354                    limit,
4355                    descending,
4356                )?,
4357            // not using "_ =>" because we want to make sure we remember to add new variants here
4358            EventFilter::Package(_)
4359            | EventFilter::MoveEventField { .. }
4360            | EventFilter::Any(_)
4361            | EventFilter::And(_, _)
4362            | EventFilter::Or(_, _) => {
4363                return Err(IotaError::UserInput {
4364                    error: UserInputError::Unsupported(
4365                        "This query type is not supported by the full node.".to_string(),
4366                    ),
4367                });
4368            }
4369        };
4370
4371        // skip one event if exclusive cursor is provided,
4372        // otherwise truncate to the original limit.
4373        if cursor.is_some() {
4374            if !event_keys.is_empty() {
4375                event_keys.remove(0);
4376            }
4377        } else {
4378            event_keys.truncate(limit - 1);
4379        }
4380
4381        // get the unique set of digests from the event_keys
4382        let transaction_digests = event_keys
4383            .iter()
4384            .map(|(_, digest, _, _)| *digest)
4385            .collect::<HashSet<_>>()
4386            .into_iter()
4387            .collect::<Vec<_>>();
4388
4389        let events = kv_store
4390            .multi_get_events_by_tx_digests(&transaction_digests)
4391            .await?;
4392
4393        let events_map: HashMap<_, _> =
4394            transaction_digests.iter().zip(events.into_iter()).collect();
4395
4396        let stored_events = event_keys
4397            .into_iter()
4398            .map(|k| {
4399                (
4400                    k,
4401                    events_map
4402                        .get(&k.1)
4403                        .expect("fetched digest is missing")
4404                        .clone()
4405                        .and_then(|e| e.data.get(k.2).cloned()),
4406                )
4407            })
4408            .map(
4409                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4410                    event
4411                        .map(|e| (e, tx_digest, event_seq, timestamp))
4412                        .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4413                },
4414            )
4415            .collect::<Result<Vec<_>, _>>()?;
4416
4417        let epoch_store = self.load_epoch_store_one_call_per_task();
4418        let backing_store = self.get_backing_package_store().as_ref();
4419        let mut layout_resolver = epoch_store
4420            .executor()
4421            .type_layout_resolver(Box::new(backing_store));
4422        let mut events = vec![];
4423        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4424            events.push(IotaEvent::try_from(
4425                e.clone(),
4426                tx_digest,
4427                event_seq as u64,
4428                Some(timestamp),
4429                layout_resolver.get_annotated_layout(&e.type_)?,
4430            )?)
4431        }
4432        Ok(events)
4433    }
4434
4435    pub async fn insert_genesis_object(&self, object: Object) {
4436        self.get_reconfig_api()
4437            .try_insert_genesis_object(object)
4438            .expect("Cannot insert genesis object")
4439    }
4440
4441    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4442        futures::future::join_all(
4443            objects
4444                .iter()
4445                .map(|o| self.insert_genesis_object(o.clone())),
4446        )
4447        .await;
4448    }
4449
4450    /// Make a status response for a transaction
4451    #[instrument(level = "trace", skip_all)]
4452    pub fn get_transaction_status(
4453        &self,
4454        transaction_digest: &TransactionDigest,
4455        epoch_store: &Arc<AuthorityPerEpochStore>,
4456    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4457        // TODO: In the case of read path, we should not have to re-sign the effects.
4458        if let Some(effects) =
4459            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4460        {
4461            if let Some(transaction) = self
4462                .get_transaction_cache_reader()
4463                .try_get_transaction_block(transaction_digest)?
4464            {
4465                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4466                let events = if effects.events_digest().is_some() {
4467                    self.get_transaction_events(effects.transaction_digest())?
4468                } else {
4469                    TransactionEvents::default()
4470                };
4471                return Ok(Some((
4472                    (*transaction).clone().into_message(),
4473                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4474                )));
4475            } else {
4476                // The read of effects and read of transaction are not atomic. It's possible
4477                // that we reverted the transaction (during epoch change) in
4478                // between the above two reads, and we end up having effects but
4479                // not transaction. In this case, we just fall through.
4480                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4481            }
4482        }
4483        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4484            self.metrics.tx_already_processed.inc();
4485            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4486            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4487        } else {
4488            Ok(None)
4489        }
4490    }
4491
4492    /// Get the signed effects of the given transaction. If the effects was
4493    /// signed in a previous epoch, re-sign it so that the caller is able to
4494    /// form a cert of the effects in the current epoch.
4495    #[instrument(level = "trace", skip_all)]
4496    pub fn get_signed_effects_and_maybe_resign(
4497        &self,
4498        transaction_digest: &TransactionDigest,
4499        epoch_store: &Arc<AuthorityPerEpochStore>,
4500    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4501        let effects = self
4502            .get_transaction_cache_reader()
4503            .try_get_executed_effects(transaction_digest)?;
4504        match effects {
4505            Some(effects) => {
4506                // If the transaction was executed in previous epochs, the validator will
4507                // re-sign the effects with new current epoch so that a client is always able to
4508                // obtain an effects certificate at the current epoch.
4509                //
4510                // Why is this necessary? Consider the following case:
4511                // - assume there are 4 validators
4512                // - Quorum driver gets 2 signed effects before reconfig halt
4513                // - The tx makes it into final checkpoint.
4514                // - 2 validators go away and are replaced in the new epoch.
4515                // - The new epoch begins.
4516                // - The quorum driver cannot complete the partial effects cert from the
4517                //   previous epoch, because it may not be able to reach either of the 2 former
4518                //   validators.
4519                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4520                //   the new epoch, the QD can make a new effects cert and return it to the
4521                //   client.
4522                //
4523                // This is a considered a short-term workaround. Eventually, Quorum Driver
4524                // should be able to return either an effects certificate, -or-
4525                // a proof of inclusion in a checkpoint. In the case above, the
4526                // Quorum Driver would return a proof of inclusion in the final
4527                // checkpoint, and this code would no longer be necessary.
4528                if effects.executed_epoch() != epoch_store.epoch() {
4529                    debug!(
4530                        tx_digest=?transaction_digest,
4531                        effects_epoch=?effects.executed_epoch(),
4532                        epoch=?epoch_store.epoch(),
4533                        "Re-signing the effects with the current epoch"
4534                    );
4535                }
4536                Ok(Some(self.sign_effects(effects, epoch_store)?))
4537            }
4538            None => Ok(None),
4539        }
4540    }
4541
4542    #[instrument(level = "trace", skip_all)]
4543    pub(crate) fn sign_effects(
4544        &self,
4545        effects: TransactionEffects,
4546        epoch_store: &Arc<AuthorityPerEpochStore>,
4547    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4548        let tx_digest = *effects.transaction_digest();
4549        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4550            Some(sig) => {
4551                debug_assert!(sig.epoch == epoch_store.epoch());
4552                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4553            }
4554            _ => {
4555                let sig = AuthoritySignInfo::new(
4556                    epoch_store.epoch(),
4557                    &effects,
4558                    Intent::iota_app(IntentScope::TransactionEffects),
4559                    self.name,
4560                    &*self.secret,
4561                );
4562
4563                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4564
4565                epoch_store.insert_effects_digest_and_signature(
4566                    &tx_digest,
4567                    effects.digest(),
4568                    &sig,
4569                )?;
4570
4571                effects
4572            }
4573        };
4574
4575        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4576            signed_effects,
4577        ))
4578    }
4579
4580    // Returns coin objects for indexing for fullnode if indexing is enabled.
4581    #[instrument(level = "trace", skip_all)]
4582    fn fullnode_only_get_tx_coins_for_indexing(
4583        &self,
4584        effects: &TransactionEffects,
4585        inner_temporary_store: &InnerTemporaryStore,
4586        epoch_store: &Arc<AuthorityPerEpochStore>,
4587    ) -> Option<TxCoins> {
4588        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4589            return None;
4590        }
4591        let written_coin_objects = inner_temporary_store
4592            .written
4593            .iter()
4594            .filter_map(|(k, v)| {
4595                if v.is_coin() {
4596                    Some((*k, v.clone()))
4597                } else {
4598                    None
4599                }
4600            })
4601            .collect();
4602        let mut input_coin_objects = inner_temporary_store
4603            .input_objects
4604            .iter()
4605            .filter_map(|(k, v)| {
4606                if v.is_coin() {
4607                    Some((*k, v.clone()))
4608                } else {
4609                    None
4610                }
4611            })
4612            .collect::<ObjectMap>();
4613
4614        // Check for receiving objects that were actually used and modified during
4615        // execution. Their updated version will already showup in
4616        // "written_coins" but their input isn't included in the set of input
4617        // objects in a inner_temporary_store.
4618        for (object_id, version) in effects.modified_at_versions() {
4619            if inner_temporary_store
4620                .loaded_runtime_objects
4621                .contains_key(&object_id)
4622            {
4623                if let Some(object) = self
4624                    .get_object_store()
4625                    .get_object_by_key(&object_id, version)
4626                {
4627                    if object.is_coin() {
4628                        input_coin_objects.insert(object_id, object);
4629                    }
4630                }
4631            }
4632        }
4633
4634        Some((input_coin_objects, written_coin_objects))
4635    }
4636
4637    /// Get the TransactionEnvelope that currently locks the given object, if
4638    /// any. Since object locks are only valid for one epoch, we also need
4639    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4640    /// no lock records for the given object can be found.
4641    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4642    /// object record is at a different version.
4643    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4644    /// certain transaction. Returns None if the a lock record is
4645    /// initialized for the given ObjectRef but not yet locked by any
4646    /// transaction,     or cannot find the transaction in transaction
4647    /// table, because of data race etc.
4648    #[instrument(level = "trace", skip_all)]
4649    pub async fn get_transaction_lock(
4650        &self,
4651        object_ref: &ObjectRef,
4652        epoch_store: &AuthorityPerEpochStore,
4653    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4654        let lock_info = self
4655            .get_object_cache_reader()
4656            .try_get_lock(*object_ref, epoch_store)?;
4657        let lock_info = match lock_info {
4658            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4659                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4660                    provided_obj_ref: *object_ref,
4661                    current_version: locked_ref.1,
4662                }
4663                .into());
4664            }
4665            ObjectLockStatus::Initialized => {
4666                return Ok(None);
4667            }
4668            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4669        };
4670
4671        epoch_store.get_signed_transaction(&lock_info)
4672    }
4673
4674    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4675        self.get_object_cache_reader().try_get_objects(objects)
4676    }
4677
4678    /// Non-fallible version of `try_get_objects`.
4679    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4680        self.try_get_objects(objects)
4681            .await
4682            .expect("storage access failed")
4683    }
4684
4685    pub async fn try_get_object_or_tombstone(
4686        &self,
4687        object_id: ObjectID,
4688    ) -> IotaResult<Option<ObjectRef>> {
4689        self.get_object_cache_reader()
4690            .try_get_latest_object_ref_or_tombstone(object_id)
4691    }
4692
4693    /// Non-fallible version of `try_get_object_or_tombstone`.
4694    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4695        self.try_get_object_or_tombstone(object_id)
4696            .await
4697            .expect("storage access failed")
4698    }
4699
4700    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4701    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4702    /// upgrade.
4703    ///
4704    /// This method can be used to dynamic adjust the amount of buffer. If set
4705    /// to 0, the upgrade will go through with only 2f+1 votes.
4706    ///
4707    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4708    /// should have the same value), or you risk halting the chain.
4709    pub fn set_override_protocol_upgrade_buffer_stake(
4710        &self,
4711        expected_epoch: EpochId,
4712        buffer_stake_bps: u64,
4713    ) -> IotaResult {
4714        let epoch_store = self.load_epoch_store_one_call_per_task();
4715        let actual_epoch = epoch_store.epoch();
4716        if actual_epoch != expected_epoch {
4717            return Err(IotaError::WrongEpoch {
4718                expected_epoch,
4719                actual_epoch,
4720            });
4721        }
4722
4723        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4724    }
4725
4726    pub fn clear_override_protocol_upgrade_buffer_stake(
4727        &self,
4728        expected_epoch: EpochId,
4729    ) -> IotaResult {
4730        let epoch_store = self.load_epoch_store_one_call_per_task();
4731        let actual_epoch = epoch_store.epoch();
4732        if actual_epoch != expected_epoch {
4733            return Err(IotaError::WrongEpoch {
4734                expected_epoch,
4735                actual_epoch,
4736            });
4737        }
4738
4739        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4740    }
4741
4742    /// Get the set of system packages that are compiled in to this build, if
4743    /// those packages are compatible with the current versions of those
4744    /// packages on-chain.
4745    pub async fn get_available_system_packages(
4746        &self,
4747        binary_config: &BinaryConfig,
4748    ) -> Vec<ObjectRef> {
4749        let mut results = vec![];
4750
4751        let system_packages = BuiltInFramework::iter_system_packages();
4752
4753        // Add extra framework packages during simtest
4754        #[cfg(msim)]
4755        let extra_packages = framework_injection::get_extra_packages(self.name);
4756        #[cfg(msim)]
4757        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4758
4759        for system_package in system_packages {
4760            let modules = system_package.modules().to_vec();
4761            // In simtests, we could override the current built-in framework packages.
4762            #[cfg(msim)]
4763            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4764                .unwrap_or(modules);
4765
4766            let Some(obj_ref) = iota_framework::compare_system_package(
4767                &self.get_object_store(),
4768                &system_package.id,
4769                &modules,
4770                system_package.dependencies.to_vec(),
4771                binary_config,
4772            )
4773            .await
4774            else {
4775                return vec![];
4776            };
4777            results.push(obj_ref);
4778        }
4779
4780        results
4781    }
4782
4783    /// Return the new versions, module bytes, and dependencies for the packages
4784    /// that have been committed to for a framework upgrade, in
4785    /// `system_packages`.  Loads the module contents from the binary, and
4786    /// performs the following checks:
4787    ///
4788    /// - Whether its contents matches what is on-chain already, in which case
4789    ///   no upgrade is required, and its contents are omitted from the output.
4790    /// - Whether the contents in the binary can form a package whose digest
4791    ///   matches the input, meaning the framework will be upgraded, and this
4792    ///   authority can satisfy that upgrade, in which case the contents are
4793    ///   included in the output.
4794    ///
4795    /// If the current version of the framework can't be loaded, the binary does
4796    /// not contain the bytes for that framework ID, or the resulting
4797    /// package fails the digest check, `None` is returned indicating that
4798    /// this authority cannot run the upgrade that the network voted on.
4799    async fn get_system_package_bytes(
4800        &self,
4801        system_packages: Vec<ObjectRef>,
4802        binary_config: &BinaryConfig,
4803    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4804        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4805        let objects = self.get_objects(&ids).await;
4806
4807        let mut res = Vec::with_capacity(system_packages.len());
4808        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4809            let prev_transaction = match object {
4810                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4811                    // Skip this one because it doesn't need to be upgraded.
4812                    info!("Framework {} does not need updating", system_package_ref.0);
4813                    continue;
4814                }
4815
4816                Some(cur_object) => cur_object.previous_transaction,
4817                None => TransactionDigest::genesis_marker(),
4818            };
4819
4820            #[cfg(msim)]
4821            let SystemPackage {
4822                id: _,
4823                bytes,
4824                dependencies,
4825            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4826                .unwrap_or_else(|| {
4827                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4828                });
4829
4830            #[cfg(not(msim))]
4831            let SystemPackage {
4832                id: _,
4833                bytes,
4834                dependencies,
4835            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4836
4837            let modules: Vec<_> = bytes
4838                .iter()
4839                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4840                .collect();
4841
4842            let new_object = Object::new_system_package(
4843                &modules,
4844                system_package_ref.1,
4845                dependencies.clone(),
4846                prev_transaction,
4847            );
4848
4849            let new_ref = new_object.compute_object_reference();
4850            if new_ref != system_package_ref {
4851                error!(
4852                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4853                );
4854                return None;
4855            }
4856
4857            res.push((system_package_ref.1, bytes, dependencies));
4858        }
4859
4860        Some(res)
4861    }
4862
4863    /// Returns the new protocol version and system packages that the network
4864    /// has voted to upgrade to. If the proposed protocol version is not
4865    /// supported, None is returned.
4866    fn is_protocol_version_supported_v1(
4867        proposed_protocol_version: ProtocolVersion,
4868        committee: &Committee,
4869        capabilities: Vec<AuthorityCapabilitiesV1>,
4870        mut buffer_stake_bps: u64,
4871    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4872        if buffer_stake_bps > 10000 {
4873            warn!("clamping buffer_stake_bps to 10000");
4874            buffer_stake_bps = 10000;
4875        }
4876
4877        // For each validator, gather the protocol version and system packages that it
4878        // would like to upgrade to in the next epoch.
4879        let mut desired_upgrades: Vec<_> = capabilities
4880            .into_iter()
4881            .filter_map(|mut cap| {
4882                // A validator that lists no packages is voting against any change at all.
4883                if cap.available_system_packages.is_empty() {
4884                    return None;
4885                }
4886
4887                cap.available_system_packages.sort();
4888
4889                info!(
4890                    "validator {:?} supports {:?} with system packages: {:?}",
4891                    cap.authority.concise(),
4892                    cap.supported_protocol_versions,
4893                    cap.available_system_packages,
4894                );
4895
4896                // A validator that only supports the current protocol version is also voting
4897                // against any change, because framework upgrades always require a protocol
4898                // version bump.
4899                cap.supported_protocol_versions
4900                    .get_version_digest(proposed_protocol_version)
4901                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4902            })
4903            .collect();
4904
4905        // There can only be one set of votes that have a majority, find one if it
4906        // exists.
4907        desired_upgrades.sort();
4908        desired_upgrades
4909            .into_iter()
4910            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4911            .into_iter()
4912            .find_map(|((digest, packages), group)| {
4913                // should have been filtered out earlier.
4914                assert!(!packages.is_empty());
4915
4916                let mut stake_aggregator: StakeAggregator<(), true> =
4917                    StakeAggregator::new(Arc::new(committee.clone()));
4918
4919                for (_, _, authority) in group {
4920                    stake_aggregator.insert_generic(authority, ());
4921                }
4922
4923                let total_votes = stake_aggregator.total_votes();
4924                let quorum_threshold = committee.quorum_threshold();
4925                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4926
4927                info!(
4928                    protocol_config_digest = ?digest,
4929                    ?total_votes,
4930                    ?quorum_threshold,
4931                    ?buffer_stake_bps,
4932                    ?effective_threshold,
4933                    ?proposed_protocol_version,
4934                    ?packages,
4935                    "support for upgrade"
4936                );
4937
4938                let has_support = total_votes >= effective_threshold;
4939                has_support.then_some((proposed_protocol_version, digest, packages))
4940            })
4941    }
4942
4943    /// Selects the highest supported protocol version and system packages that
4944    /// the network has voted to upgrade to. If no upgrade is supported,
4945    /// returns the current protocol version and system packages.
4946    fn choose_protocol_version_and_system_packages_v1(
4947        current_protocol_version: ProtocolVersion,
4948        current_protocol_digest: Digest,
4949        committee: &Committee,
4950        capabilities: Vec<AuthorityCapabilitiesV1>,
4951        buffer_stake_bps: u64,
4952    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4953        let mut next_protocol_version = current_protocol_version;
4954        let mut system_packages = vec![];
4955        let mut protocol_version_digest = current_protocol_digest;
4956
4957        // Finds the highest supported protocol version and system packages by
4958        // incrementing the proposed protocol version by one until no further
4959        // upgrades are supported.
4960        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4961            next_protocol_version + 1,
4962            committee,
4963            capabilities.clone(),
4964            buffer_stake_bps,
4965        ) {
4966            next_protocol_version = version;
4967            protocol_version_digest = digest;
4968            system_packages = packages;
4969        }
4970
4971        (
4972            next_protocol_version,
4973            protocol_version_digest,
4974            system_packages,
4975        )
4976    }
4977
4978    /// Returns the indices of validators that support the given protocol
4979    /// version and digest. This includes both committee and non-committee
4980    /// validators based on their capabilities. Uses active validators
4981    /// instead of committee indices.
4982    fn get_validators_supporting_protocol_version(
4983        target_protocol_version: ProtocolVersion,
4984        target_digest: Digest,
4985        active_validators: &[AuthorityPublicKey],
4986        capabilities: &[AuthorityCapabilitiesV1],
4987    ) -> Vec<u64> {
4988        let mut eligible_validators = Vec::new();
4989
4990        for capability in capabilities {
4991            // Check if this validator supports the target protocol version and digest
4992            if let Some(digest) = capability
4993                .supported_protocol_versions
4994                .get_version_digest(target_protocol_version)
4995            {
4996                if digest == target_digest {
4997                    // Find the validator's index in the active validators list
4998                    if let Some(index) = active_validators
4999                        .iter()
5000                        .position(|name| AuthorityName::from(name) == capability.authority)
5001                    {
5002                        eligible_validators.push(index as u64);
5003                    }
5004                }
5005            }
5006        }
5007
5008        // Sort indices for deterministic behavior
5009        eligible_validators.sort();
5010        eligible_validators
5011    }
5012
5013    /// Calculates the sum of weights for eligible validators that are part of
5014    /// the committee. Takes the indices from
5015    /// get_validators_supporting_protocol_version and maps them back
5016    /// to committee members to get their weights.
5017    fn calculate_eligible_validators_weight(
5018        eligible_validator_indices: &[u64],
5019        active_validators: &[AuthorityPublicKey],
5020        committee: &Committee,
5021    ) -> u64 {
5022        let mut total_weight = 0u64;
5023
5024        for &index in eligible_validator_indices {
5025            let authority_pubkey = &active_validators[index as usize];
5026            // Check if this validator is in the committee and get their weight
5027            if let Some((_, weight)) = committee
5028                .members()
5029                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5030            {
5031                total_weight += weight;
5032            }
5033        }
5034
5035        total_weight
5036    }
5037
5038    #[instrument(level = "debug", skip_all)]
5039    fn create_authenticator_state_tx(
5040        &self,
5041        epoch_store: &Arc<AuthorityPerEpochStore>,
5042    ) -> Option<EndOfEpochTransactionKind> {
5043        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5044            info!("authenticator state transactions not enabled");
5045            return None;
5046        }
5047
5048        let authenticator_state_exists = epoch_store.authenticator_state_exists();
5049        let tx = if authenticator_state_exists {
5050            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5051            let min_epoch =
5052                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5053            let authenticator_obj_initial_shared_version = epoch_store
5054                .epoch_start_config()
5055                .authenticator_obj_initial_shared_version()
5056                .expect("initial version must exist");
5057
5058            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5059                min_epoch,
5060                authenticator_obj_initial_shared_version,
5061            );
5062
5063            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5064
5065            tx
5066        } else {
5067            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5068            info!("Creating AuthenticatorStateCreate tx");
5069            tx
5070        };
5071        Some(tx)
5072    }
5073
5074    /// Creates and execute the advance epoch transaction to effects without
5075    /// committing it to the database. The effects of the change epoch tx
5076    /// are only written to the database after a certified checkpoint has been
5077    /// formed and executed by CheckpointExecutor.
5078    ///
5079    /// When a framework upgraded has been decided on, but the validator does
5080    /// not have the new versions of the packages locally, the validator
5081    /// cannot form the ChangeEpochTx. In this case it returns Err,
5082    /// indicating that the checkpoint builder should give up trying to make the
5083    /// final checkpoint. As long as the network is able to create a certified
5084    /// checkpoint (which should be ensured by the capabilities vote), it
5085    /// will arrive via state sync and be executed by CheckpointExecutor.
5086    #[instrument(level = "error", skip_all)]
5087    pub async fn create_and_execute_advance_epoch_tx(
5088        &self,
5089        epoch_store: &Arc<AuthorityPerEpochStore>,
5090        gas_cost_summary: &GasCostSummary,
5091        checkpoint: CheckpointSequenceNumber,
5092        epoch_start_timestamp_ms: CheckpointTimestamp,
5093        scores: Vec<u64>,
5094    ) -> anyhow::Result<(
5095        IotaSystemState,
5096        Option<SystemEpochInfoEvent>,
5097        TransactionEffects,
5098    )> {
5099        let mut txns = Vec::new();
5100
5101        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
5102            txns.push(tx);
5103        }
5104
5105        let next_epoch = epoch_store.epoch() + 1;
5106
5107        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5108        let authority_capabilities = epoch_store
5109            .get_capabilities_v1()
5110            .expect("read capabilities from db cannot fail");
5111        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5112            Self::choose_protocol_version_and_system_packages_v1(
5113                epoch_store.protocol_version(),
5114                SupportedProtocolVersionsWithHashes::protocol_config_digest(
5115                    epoch_store.protocol_config(),
5116                ),
5117                epoch_store.committee(),
5118                authority_capabilities.clone(),
5119                buffer_stake_bps,
5120            );
5121
5122        // since system packages are created during the current epoch, they should abide
5123        // by the rules of the current epoch, including the current epoch's max
5124        // Move binary format version
5125        let config = epoch_store.protocol_config();
5126        let binary_config = to_binary_config(config);
5127        let Some(next_epoch_system_package_bytes) = self
5128            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5129            .await
5130        else {
5131            error!(
5132                "upgraded system packages {:?} are not locally available, cannot create \
5133                ChangeEpochTx. validator binary must be upgraded to the correct version!",
5134                next_epoch_system_packages
5135            );
5136            // the checkpoint builder will keep retrying forever when it hits this error.
5137            // Eventually, one of two things will happen:
5138            // - The operator will upgrade this binary to one that has the new packages
5139            //   locally, and this function will succeed.
5140            // - The final checkpoint will be certified by other validators, we will receive
5141            //   it via state sync, and execute it. This will upgrade the framework
5142            //   packages, reconfigure, and most likely shut down in the new epoch (this
5143            //   validator likely doesn't support the new protocol version, or else it
5144            //   should have had the packages.)
5145            bail!("missing system packages: cannot form ChangeEpochTx");
5146        };
5147
5148        // Use ChangeEpochV3 or ChangeEpochV4 when the feature flags are enabled and
5149        // ChangeEpochV2 requirements are met
5150        if config.select_committee_from_eligible_validators() {
5151            // Get the list of eligible validators that support the target protocol version
5152            let active_validators = epoch_store.epoch_start_state().get_active_validators();
5153
5154            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5155
5156            // Use validators supporting the target protocol version as eligible validators
5157            // in the next version if select_committee_supporting_next_epoch_version feature
5158            // flag is set to true.
5159            if config.select_committee_supporting_next_epoch_version() {
5160                eligible_active_validators = Self::get_validators_supporting_protocol_version(
5161                    next_epoch_protocol_version,
5162                    next_epoch_protocol_digest,
5163                    &active_validators,
5164                    &authority_capabilities,
5165                );
5166
5167                // Calculate the total weight of eligible validators in the committee
5168                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5169                    &eligible_active_validators,
5170                    &active_validators,
5171                    epoch_store.committee(),
5172                );
5173
5174                // Safety check: ensure eligible validators have enough stake
5175                // Use the same effective threshold calculation that was used to decide the
5176                // protocol version
5177                let committee = epoch_store.committee();
5178                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5179
5180                if eligible_validators_weight < effective_threshold {
5181                    error!(
5182                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5183                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5184                    );
5185                    // Pass all active validator indices as eligible validators
5186                    // to perform selection among all of them.
5187                    eligible_active_validators = (0..active_validators.len() as u64).collect();
5188                }
5189            }
5190
5191            // Use ChangeEpochV4 when the pass_validator_scores_to_advance_epoch feature
5192            // flag is enabled.
5193            if config.pass_validator_scores_to_advance_epoch() {
5194                txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5195                    next_epoch,
5196                    next_epoch_protocol_version,
5197                    gas_cost_summary.storage_cost,
5198                    gas_cost_summary.computation_cost,
5199                    gas_cost_summary.computation_cost_burned,
5200                    gas_cost_summary.storage_rebate,
5201                    gas_cost_summary.non_refundable_storage_fee,
5202                    epoch_start_timestamp_ms,
5203                    next_epoch_system_package_bytes,
5204                    eligible_active_validators,
5205                    scores,
5206                    config.adjust_rewards_by_score(),
5207                ));
5208            } else {
5209                txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5210                    next_epoch,
5211                    next_epoch_protocol_version,
5212                    gas_cost_summary.storage_cost,
5213                    gas_cost_summary.computation_cost,
5214                    gas_cost_summary.computation_cost_burned,
5215                    gas_cost_summary.storage_rebate,
5216                    gas_cost_summary.non_refundable_storage_fee,
5217                    epoch_start_timestamp_ms,
5218                    next_epoch_system_package_bytes,
5219                    eligible_active_validators,
5220                ));
5221            }
5222        } else if config.protocol_defined_base_fee()
5223            && config.max_committee_members_count_as_option().is_some()
5224        {
5225            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5226                next_epoch,
5227                next_epoch_protocol_version,
5228                gas_cost_summary.storage_cost,
5229                gas_cost_summary.computation_cost,
5230                gas_cost_summary.computation_cost_burned,
5231                gas_cost_summary.storage_rebate,
5232                gas_cost_summary.non_refundable_storage_fee,
5233                epoch_start_timestamp_ms,
5234                next_epoch_system_package_bytes,
5235            ));
5236        } else {
5237            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5238                next_epoch,
5239                next_epoch_protocol_version,
5240                gas_cost_summary.storage_cost,
5241                gas_cost_summary.computation_cost,
5242                gas_cost_summary.storage_rebate,
5243                gas_cost_summary.non_refundable_storage_fee,
5244                epoch_start_timestamp_ms,
5245                next_epoch_system_package_bytes,
5246            ));
5247        }
5248
5249        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5250
5251        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5252            tx.clone(),
5253            epoch_store.epoch(),
5254            checkpoint,
5255        );
5256
5257        let tx_digest = executable_tx.digest();
5258
5259        info!(
5260            ?next_epoch,
5261            ?next_epoch_protocol_version,
5262            ?next_epoch_system_packages,
5263            computation_cost=?gas_cost_summary.computation_cost,
5264            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5265            storage_cost=?gas_cost_summary.storage_cost,
5266            storage_rebate=?gas_cost_summary.storage_rebate,
5267            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5268            ?tx_digest,
5269            "Creating advance epoch transaction"
5270        );
5271
5272        fail_point_async!("change_epoch_tx_delay");
5273        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5274
5275        // The tx could have been executed by state sync already - if so simply return
5276        // an error. The checkpoint builder will shortly be terminated by
5277        // reconfiguration anyway.
5278        if self
5279            .get_transaction_cache_reader()
5280            .try_is_tx_already_executed(tx_digest)?
5281        {
5282            warn!("change epoch tx has already been executed via state sync");
5283            bail!("change epoch tx has already been executed via state sync",);
5284        }
5285
5286        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5287
5288        // We must manually assign the shared object versions to the transaction before
5289        // executing it. This is because we do not sequence end-of-epoch
5290        // transactions through consensus.
5291        epoch_store.assign_shared_object_versions_idempotent(
5292            self.get_object_cache_reader().as_ref(),
5293            std::slice::from_ref(&executable_tx),
5294        )?;
5295
5296        let (input_objects, _) =
5297            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5298
5299        let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5300            &execution_guard,
5301            &executable_tx,
5302            input_objects,
5303            vec![],
5304            epoch_store,
5305        )?;
5306        let system_obj = get_iota_system_state(&temporary_store.written)
5307            .expect("change epoch tx must write to system object");
5308        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
5309        let system_epoch_info_event = temporary_store
5310            .events
5311            .data
5312            .into_iter()
5313            .find(|event| event.is_system_epoch_info_event())
5314            .map(SystemEpochInfoEvent::from);
5315        // The system epoch info event can be `None` in case if the `advance_epoch`
5316        // Move function call failed and was executed in the safe mode.
5317        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5318
5319        // We must write tx and effects to the state sync tables so that state sync is
5320        // able to deliver to the transaction to CheckpointExecutor after it is
5321        // included in a certified checkpoint.
5322        self.get_state_sync_store()
5323            .try_insert_transaction_and_effects(&tx, &effects)
5324            .map_err(|err| {
5325                let err: anyhow::Error = err.into();
5326                err
5327            })?;
5328
5329        info!(
5330            "Effects summary of the change epoch transaction: {:?}",
5331            effects.summary_for_debug()
5332        );
5333        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5334        // The change epoch transaction cannot fail to execute.
5335        assert!(effects.status().is_ok());
5336        Ok((system_obj, system_epoch_info_event, effects))
5337    }
5338
5339    /// This function is called at the very end of the epoch.
5340    /// This step is required before updating new epoch in the db and calling
5341    /// reopen_epoch_db.
5342    #[instrument(level = "error", skip_all)]
5343    async fn revert_uncommitted_epoch_transactions(
5344        &self,
5345        epoch_store: &AuthorityPerEpochStore,
5346    ) -> IotaResult {
5347        {
5348            let state = epoch_store.get_reconfig_state_write_lock_guard();
5349            if state.should_accept_user_certs() {
5350                // Need to change this so that consensus adapter do not accept certificates from
5351                // user. This can happen if our local validator did not initiate
5352                // epoch change locally, but 2f+1 nodes already concluded the
5353                // epoch.
5354                //
5355                // This lock is essentially a barrier for
5356                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5357                // after this block
5358                epoch_store.close_user_certs(state);
5359            }
5360            // lock is dropped here
5361        }
5362        let pending_certificates = epoch_store.pending_consensus_certificates();
5363        info!(
5364            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5365            pending_certificates.len(),
5366            pending_certificates,
5367        );
5368        for digest in pending_certificates {
5369            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5370                info!(
5371                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5372                    digest
5373                );
5374                continue;
5375            }
5376            info!("Reverting {:?} at the end of epoch", digest);
5377            epoch_store.revert_executed_transaction(&digest)?;
5378            self.get_reconfig_api().try_revert_state_update(&digest)?;
5379        }
5380        info!("All uncommitted local transactions reverted");
5381        Ok(())
5382    }
5383
5384    #[instrument(level = "error", skip_all)]
5385    async fn reopen_epoch_db(
5386        &self,
5387        cur_epoch_store: &AuthorityPerEpochStore,
5388        new_committee: Committee,
5389        epoch_start_configuration: EpochStartConfiguration,
5390        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5391        epoch_last_checkpoint: CheckpointSequenceNumber,
5392    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5393        let new_epoch = new_committee.epoch;
5394        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5395        assert_eq!(
5396            epoch_start_configuration.epoch_start_state().epoch(),
5397            new_committee.epoch
5398        );
5399        fail_point!("before-open-new-epoch-store");
5400        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5401            self.name,
5402            new_committee,
5403            epoch_start_configuration,
5404            self.get_backing_package_store().clone(),
5405            self.get_object_store().clone(),
5406            expensive_safety_check_config,
5407            epoch_last_checkpoint,
5408        )?;
5409        self.epoch_store.store(new_epoch_store.clone());
5410        Ok(new_epoch_store)
5411    }
5412
5413    /// Checks if `authenticator` unlocks a valid Move account and returns the
5414    /// account-related `AuthenticatorFunctionRef` object.
5415    fn check_move_account(
5416        &self,
5417        auth_account_object_id: ObjectID,
5418        auth_account_object_seq_number: Option<SequenceNumber>,
5419        auth_account_object_digest: Option<ObjectDigest>,
5420        account_object: ObjectReadResult,
5421        signer: &IotaAddress,
5422    ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5423        let account_object = match account_object.object {
5424            ObjectReadResultKind::Object(object) => Ok(object),
5425            ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5426                Err(UserInputError::AccountObjectDeleted {
5427                    account_id: account_object.id(),
5428                    account_version: version,
5429                    transaction_digest: digest,
5430                })
5431            }
5432            // It is impossible to check the account object because it is used in a canceled
5433            // transaction and is not loaded.
5434            ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5435                Err(UserInputError::AccountObjectInCanceledTransaction {
5436                    account_id: account_object.id(),
5437                    account_version: version,
5438                })
5439            }
5440        }?;
5441
5442        let account_object_addr = IotaAddress::from(auth_account_object_id);
5443
5444        fp_ensure!(
5445            signer == &account_object_addr,
5446            UserInputError::IncorrectUserSignature {
5447                error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5448            }
5449            .into()
5450        );
5451
5452        fp_ensure!(
5453            account_object.is_shared() || account_object.is_immutable(),
5454            UserInputError::AccountObjectNotSupported {
5455                object_id: auth_account_object_id
5456            }
5457            .into()
5458        );
5459
5460        let auth_account_object_seq_number =
5461            if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5462                let account_object_version = account_object.version();
5463
5464                fp_ensure!(
5465                    account_object_version == auth_account_object_seq_number,
5466                    UserInputError::AccountObjectVersionMismatch {
5467                        object_id: auth_account_object_id,
5468                        expected_version: auth_account_object_seq_number,
5469                        actual_version: account_object_version,
5470                    }
5471                    .into()
5472                );
5473
5474                auth_account_object_seq_number
5475            } else {
5476                account_object.version()
5477            };
5478
5479        if let Some(auth_account_object_digest) = auth_account_object_digest {
5480            let expected_digest = account_object.digest();
5481            fp_ensure!(
5482                expected_digest == auth_account_object_digest,
5483                UserInputError::InvalidAccountObjectDigest {
5484                    object_id: auth_account_object_id,
5485                    expected_digest,
5486                    actual_digest: auth_account_object_digest,
5487                }
5488                .into()
5489            );
5490        }
5491
5492        let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5493            auth_account_object_id,
5494            &AuthenticatorFunctionRefV1Key::tag().into(),
5495            &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5496        )
5497        .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5498            account_object_id: auth_account_object_id,
5499        })?;
5500
5501        let authenticator_function_ref_field = self
5502            .get_object_cache_reader()
5503            .try_find_object_lt_or_eq_version(
5504                authenticator_function_ref_field_id,
5505                auth_account_object_seq_number,
5506            )?;
5507
5508        if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5509            let field_move_object = authenticator_function_ref_field_obj
5510                .data
5511                .try_as_move()
5512                .expect("dynamic field should never be a package object");
5513
5514            let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5515                field_move_object.to_rust().ok_or(
5516                    UserInputError::InvalidAuthenticatorFunctionRefField {
5517                        account_object_id: auth_account_object_id,
5518                    },
5519                )?;
5520
5521            Ok(AuthenticatorFunctionRefForExecution::new_v1(
5522                field.value,
5523                authenticator_function_ref_field_obj.compute_object_reference(),
5524                authenticator_function_ref_field_obj.owner,
5525                authenticator_function_ref_field_obj.storage_rebate,
5526                authenticator_function_ref_field_obj.previous_transaction,
5527            ))
5528        } else {
5529            Err(UserInputError::MoveAuthenticatorNotFound {
5530                authenticator_function_ref_id: authenticator_function_ref_field_id,
5531                account_object_id: auth_account_object_id,
5532                account_object_version: auth_account_object_seq_number,
5533            }
5534            .into())
5535        }
5536    }
5537
5538    #[allow(clippy::type_complexity)]
5539    fn read_objects_for_signing(
5540        &self,
5541        transaction: &VerifiedTransaction,
5542        epoch: u64,
5543    ) -> IotaResult<(
5544        InputObjects,
5545        ReceivingObjects,
5546        Vec<(InputObjects, ObjectReadResult)>,
5547    )> {
5548        let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5549            Some(transaction.digest()),
5550            &transaction.collect_all_input_object_kind_for_reading()?,
5551            &transaction.data().transaction_data().receiving_objects(),
5552            epoch,
5553        )?;
5554
5555        transaction
5556            .split_input_objects_into_groups_for_reading(input_objects)
5557            .map(|(tx_input_objects, per_authenticator_inputs)| {
5558                (
5559                    tx_input_objects,
5560                    tx_receiving_objects,
5561                    per_authenticator_inputs,
5562                )
5563            })
5564    }
5565
5566    #[allow(clippy::type_complexity)]
5567    fn check_transaction_inputs_for_signing(
5568        &self,
5569        protocol_config: &ProtocolConfig,
5570        reference_gas_price: u64,
5571        tx_data: &TransactionData,
5572        tx_input_objects: InputObjects,
5573        tx_receiving_objects: &ReceivingObjects,
5574        move_authenticators: &Vec<&MoveAuthenticator>,
5575        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5576    ) -> IotaResult<(
5577        IotaGasStatus,
5578        CheckedInputObjects,
5579        Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5580    )> {
5581        let authenticator_gas_budget = if move_authenticators.is_empty() {
5582            0
5583        } else {
5584            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
5585            // not a part of the transaction data.
5586            protocol_config.max_auth_gas()
5587        };
5588
5589        debug_assert_eq!(
5590            move_authenticators.len(),
5591            per_authenticator_inputs.len(),
5592            "Move authenticators amount must match the number of authenticator inputs"
5593        );
5594
5595        let per_authenticator_checked_inputs = move_authenticators
5596            .iter()
5597            .zip(per_authenticator_inputs)
5598            .map(
5599                |(move_authenticator, (authenticator_input_objects, account_object))| {
5600                    // Check basic `object_to_authenticate` preconditions and get its components.
5601                    let (
5602                        auth_account_object_id,
5603                        auth_account_object_seq_number,
5604                        auth_account_object_digest,
5605                    ) = move_authenticator.object_to_authenticate_components()?;
5606
5607                    let signer = move_authenticator.address()?;
5608
5609                    // Make sure the signer is a Move account.
5610                    let AuthenticatorFunctionRefForExecution {
5611                        authenticator_function_ref,
5612                        ..
5613                    } = self.check_move_account(
5614                        auth_account_object_id,
5615                        auth_account_object_seq_number,
5616                        auth_account_object_digest,
5617                        account_object,
5618                        &signer,
5619                    )?;
5620
5621                    // Check the MoveAuthenticator input objects.
5622                    let authenticator_checked_input_objects =
5623                        iota_transaction_checks::check_move_authenticator_input_for_signing(
5624                            authenticator_input_objects,
5625                        )?;
5626
5627                    Ok((
5628                        authenticator_checked_input_objects,
5629                        authenticator_function_ref,
5630                    ))
5631                },
5632            )
5633            .collect::<IotaResult<Vec<_>>>()?;
5634
5635        // Check the transaction inputs.
5636        let (gas_status, tx_checked_input_objects) =
5637            iota_transaction_checks::check_transaction_input(
5638                protocol_config,
5639                reference_gas_price,
5640                tx_data,
5641                tx_input_objects,
5642                tx_receiving_objects,
5643                &self.metrics.bytecode_verifier_metrics,
5644                &self.config.verifier_signing_config,
5645                authenticator_gas_budget,
5646            )?;
5647
5648        Ok((
5649            gas_status,
5650            tx_checked_input_objects,
5651            per_authenticator_checked_inputs,
5652        ))
5653    }
5654
5655    #[cfg(test)]
5656    pub(crate) fn iter_live_object_set_for_testing(
5657        &self,
5658    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5659        self.get_accumulator_store()
5660            .iter_cached_live_object_set_for_testing()
5661    }
5662
5663    #[cfg(test)]
5664    pub(crate) fn shutdown_execution_for_test(&self) {
5665        self.tx_execution_shutdown
5666            .lock()
5667            .take()
5668            .unwrap()
5669            .send(())
5670            .unwrap();
5671    }
5672
5673    /// NOTE: this function is only to be used for fuzzing and testing. Never
5674    /// use in prod
5675    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5676        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5677        self.get_object_cache_reader()
5678            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5679        self.get_reconfig_api()
5680            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5681    }
5682}
5683
5684pub struct RandomnessRoundReceiver {
5685    authority_state: Arc<AuthorityState>,
5686    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5687}
5688
5689impl RandomnessRoundReceiver {
5690    pub fn spawn(
5691        authority_state: Arc<AuthorityState>,
5692        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5693    ) -> JoinHandle<()> {
5694        let rrr = RandomnessRoundReceiver {
5695            authority_state,
5696            randomness_rx,
5697        };
5698        spawn_monitored_task!(rrr.run())
5699    }
5700
5701    async fn run(mut self) {
5702        info!("RandomnessRoundReceiver event loop started");
5703
5704        loop {
5705            tokio::select! {
5706                maybe_recv = self.randomness_rx.recv() => {
5707                    if let Some((epoch, round, bytes)) = maybe_recv {
5708                        self.handle_new_randomness(epoch, round, bytes);
5709                    } else {
5710                        break;
5711                    }
5712                },
5713            }
5714        }
5715
5716        info!("RandomnessRoundReceiver event loop ended");
5717    }
5718
5719    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5720    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5721        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5722        if epoch_store.epoch() != epoch {
5723            warn!(
5724                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5725                epoch_store.epoch()
5726            );
5727            return;
5728        }
5729        let transaction = VerifiedTransaction::new_randomness_state_update(
5730            epoch,
5731            round,
5732            bytes,
5733            epoch_store
5734                .epoch_start_config()
5735                .randomness_obj_initial_shared_version(),
5736        );
5737        debug!(
5738            "created randomness state update transaction with digest: {:?}",
5739            transaction.digest()
5740        );
5741        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5742        let digest = *transaction.digest();
5743
5744        // Randomness state updates contain the full bls signature for the random round,
5745        // which cannot necessarily be reconstructed again later. Therefore we must
5746        // immediately persist this transaction. If we crash before its outputs
5747        // are committed, this ensures we will be able to re-execute it.
5748        self.authority_state
5749            .get_cache_commit()
5750            .persist_transaction(&transaction);
5751
5752        // Send transaction to TransactionManager for execution.
5753        self.authority_state
5754            .transaction_manager()
5755            .enqueue(vec![transaction], &epoch_store);
5756
5757        let authority_state = self.authority_state.clone();
5758        spawn_monitored_task!(async move {
5759            // Wait for transaction execution in a separate task, to avoid deadlock in case
5760            // of out-of-order randomness generation. (Each
5761            // RandomnessStateUpdate depends on the output of the
5762            // RandomnessStateUpdate from the previous round.)
5763            //
5764            // We set a very long timeout so that in case this gets stuck for some reason,
5765            // the validator will eventually crash rather than continuing in a
5766            // zombie mode.
5767            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5768            let result = tokio::time::timeout(
5769                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5770                authority_state
5771                    .get_transaction_cache_reader()
5772                    .try_notify_read_executed_effects(&[digest]),
5773            )
5774            .await;
5775            let result = match result {
5776                Ok(result) => result,
5777                Err(_) => {
5778                    if cfg!(debug_assertions) {
5779                        // Crash on randomness update execution timeout in debug builds.
5780                        panic!(
5781                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5782                        );
5783                    }
5784                    warn!(
5785                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5786                    );
5787                    // Continue waiting as long as necessary in non-debug builds.
5788                    authority_state
5789                        .get_transaction_cache_reader()
5790                        .try_notify_read_executed_effects(&[digest])
5791                        .await
5792                }
5793            };
5794
5795            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5796            let effects = effects.pop().expect("should return effects");
5797            if *effects.status() != ExecutionStatus::Success {
5798                fatal!(
5799                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5800                );
5801            }
5802            debug!(
5803                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5804            );
5805        });
5806    }
5807}
5808
5809#[async_trait]
5810impl TransactionKeyValueStoreTrait for AuthorityState {
5811    async fn multi_get(
5812        &self,
5813        transaction_keys: &[TransactionDigest],
5814        effects_keys: &[TransactionDigest],
5815    ) -> IotaResult<KVStoreTransactionData> {
5816        let txns = if !transaction_keys.is_empty() {
5817            self.get_transaction_cache_reader()
5818                .try_multi_get_transaction_blocks(transaction_keys)?
5819                .into_iter()
5820                .map(|t| t.map(|t| (*t).clone().into_inner()))
5821                .collect()
5822        } else {
5823            vec![]
5824        };
5825
5826        let fx = if !effects_keys.is_empty() {
5827            self.get_transaction_cache_reader()
5828                .try_multi_get_executed_effects(effects_keys)?
5829        } else {
5830            vec![]
5831        };
5832
5833        Ok((txns, fx))
5834    }
5835
5836    async fn multi_get_checkpoints(
5837        &self,
5838        checkpoint_summaries: &[CheckpointSequenceNumber],
5839        checkpoint_contents: &[CheckpointSequenceNumber],
5840        checkpoint_summaries_by_digest: &[CheckpointDigest],
5841    ) -> IotaResult<(
5842        Vec<Option<CertifiedCheckpointSummary>>,
5843        Vec<Option<CheckpointContents>>,
5844        Vec<Option<CertifiedCheckpointSummary>>,
5845    )> {
5846        // TODO: use multi-get methods if it ever becomes important (unlikely)
5847        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5848        let store = self.get_checkpoint_store();
5849        for seq in checkpoint_summaries {
5850            let checkpoint = store
5851                .get_checkpoint_by_sequence_number(*seq)?
5852                .map(|c| c.into_inner());
5853
5854            summaries.push(checkpoint);
5855        }
5856
5857        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5858        for seq in checkpoint_contents {
5859            let checkpoint = store
5860                .get_checkpoint_by_sequence_number(*seq)?
5861                .and_then(|summary| {
5862                    store
5863                        .get_checkpoint_contents(&summary.content_digest)
5864                        .expect("db read cannot fail")
5865                });
5866            contents.push(checkpoint);
5867        }
5868
5869        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5870        for digest in checkpoint_summaries_by_digest {
5871            let checkpoint = store
5872                .get_checkpoint_by_digest(digest)?
5873                .map(|c| c.into_inner());
5874            summaries_by_digest.push(checkpoint);
5875        }
5876
5877        Ok((summaries, contents, summaries_by_digest))
5878    }
5879
5880    async fn get_transaction_perpetual_checkpoint(
5881        &self,
5882        digest: TransactionDigest,
5883    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5884        self.get_checkpoint_cache()
5885            .try_get_transaction_perpetual_checkpoint(&digest)
5886            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5887    }
5888
5889    async fn get_object(
5890        &self,
5891        object_id: ObjectID,
5892        version: VersionNumber,
5893    ) -> IotaResult<Option<Object>> {
5894        self.get_object_cache_reader()
5895            .try_get_object_by_key(&object_id, version)
5896    }
5897
5898    #[instrument(skip_all)]
5899    async fn multi_get_objects(
5900        &self,
5901        object_keys: &[ObjectKey],
5902    ) -> IotaResult<Vec<Option<Object>>> {
5903        Ok(self
5904            .get_object_cache_reader()
5905            .multi_get_objects_by_key(object_keys))
5906    }
5907
5908    async fn multi_get_transactions_perpetual_checkpoints(
5909        &self,
5910        digests: &[TransactionDigest],
5911    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5912        let res = self
5913            .get_checkpoint_cache()
5914            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5915
5916        Ok(res
5917            .into_iter()
5918            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5919            .collect())
5920    }
5921
5922    #[instrument(skip(self))]
5923    async fn multi_get_events_by_tx_digests(
5924        &self,
5925        digests: &[TransactionDigest],
5926    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5927        if digests.is_empty() {
5928            return Ok(vec![]);
5929        }
5930
5931        Ok(self
5932            .get_transaction_cache_reader()
5933            .multi_get_events(digests))
5934    }
5935}
5936
5937#[cfg(msim)]
5938pub mod framework_injection {
5939    use std::{
5940        cell::RefCell,
5941        collections::{BTreeMap, BTreeSet},
5942    };
5943
5944    use iota_framework::{BuiltInFramework, SystemPackage};
5945    use iota_types::{
5946        base_types::{AuthorityName, ObjectID},
5947        is_system_package,
5948    };
5949    use move_binary_format::CompiledModule;
5950
5951    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5952
5953    // Thread local cache because all simtests run in a single unique thread.
5954    thread_local! {
5955        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5956    }
5957
5958    type Framework = Vec<CompiledModule>;
5959
5960    pub type PackageUpgradeCallback =
5961        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5962
5963    enum PackageOverrideConfig {
5964        Global(Framework),
5965        PerValidator(PackageUpgradeCallback),
5966    }
5967
5968    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5969        modules
5970            .iter()
5971            .map(|m| {
5972                let mut buf = Vec::new();
5973                m.serialize_with_version(m.version, &mut buf).unwrap();
5974                buf
5975            })
5976            .collect()
5977    }
5978
5979    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5980        OVERRIDE.with(|bs| {
5981            bs.borrow_mut()
5982                .insert(package_id, PackageOverrideConfig::Global(modules))
5983        });
5984    }
5985
5986    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5987        OVERRIDE.with(|bs| {
5988            bs.borrow_mut()
5989                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5990        });
5991    }
5992
5993    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5994        OVERRIDE.with(|cfg| {
5995            cfg.borrow().get(package_id).and_then(|entry| match entry {
5996                PackageOverrideConfig::Global(framework) => {
5997                    Some(compiled_modules_to_bytes(framework))
5998                }
5999                PackageOverrideConfig::PerValidator(func) => {
6000                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6001                }
6002            })
6003        })
6004    }
6005
6006    pub fn get_override_modules(
6007        package_id: &ObjectID,
6008        name: AuthorityName,
6009    ) -> Option<Vec<CompiledModule>> {
6010        OVERRIDE.with(|cfg| {
6011            cfg.borrow().get(package_id).and_then(|entry| match entry {
6012                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6013                PackageOverrideConfig::PerValidator(func) => func(name),
6014            })
6015        })
6016    }
6017
6018    pub fn get_override_system_package(
6019        package_id: &ObjectID,
6020        name: AuthorityName,
6021    ) -> Option<SystemPackage> {
6022        let bytes = get_override_bytes(package_id, name)?;
6023        let dependencies = if is_system_package(*package_id) {
6024            BuiltInFramework::get_package_by_id(package_id)
6025                .dependencies
6026                .to_vec()
6027        } else {
6028            // Assume that entirely new injected packages depend on all existing system
6029            // packages.
6030            BuiltInFramework::all_package_ids()
6031        };
6032        Some(SystemPackage {
6033            id: *package_id,
6034            bytes,
6035            dependencies,
6036        })
6037    }
6038
6039    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6040        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6041        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6042            cfg.borrow()
6043                .keys()
6044                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6045                .collect()
6046        });
6047
6048        extra
6049            .into_iter()
6050            .map(|package| SystemPackage {
6051                id: package,
6052                bytes: get_override_bytes(&package, name).unwrap(),
6053                dependencies: BuiltInFramework::all_package_ids(),
6054            })
6055            .collect()
6056    }
6057}
6058
6059#[derive(Debug, Serialize, Deserialize, Clone)]
6060pub struct ObjDumpFormat {
6061    pub id: ObjectID,
6062    pub version: VersionNumber,
6063    pub digest: ObjectDigest,
6064    pub object: Object,
6065}
6066
6067impl ObjDumpFormat {
6068    fn new(object: Object) -> Self {
6069        let oref = object.compute_object_reference();
6070        Self {
6071            id: oref.0,
6072            version: oref.1,
6073            digest: oref.2,
6074            object,
6075        }
6076    }
6077}
6078
6079#[derive(Debug, Serialize, Deserialize, Clone)]
6080pub struct NodeStateDump {
6081    pub tx_digest: TransactionDigest,
6082    pub sender_signed_data: SenderSignedData,
6083    pub executed_epoch: u64,
6084    pub reference_gas_price: u64,
6085    pub protocol_version: u64,
6086    pub epoch_start_timestamp_ms: u64,
6087    pub computed_effects: TransactionEffects,
6088    pub expected_effects_digest: TransactionEffectsDigest,
6089    pub relevant_system_packages: Vec<ObjDumpFormat>,
6090    pub shared_objects: Vec<ObjDumpFormat>,
6091    pub loaded_child_objects: Vec<ObjDumpFormat>,
6092    pub modified_at_versions: Vec<ObjDumpFormat>,
6093    pub runtime_reads: Vec<ObjDumpFormat>,
6094    pub input_objects: Vec<ObjDumpFormat>,
6095}
6096
6097impl NodeStateDump {
6098    pub fn new(
6099        tx_digest: &TransactionDigest,
6100        effects: &TransactionEffects,
6101        expected_effects_digest: TransactionEffectsDigest,
6102        object_store: &dyn ObjectStore,
6103        epoch_store: &Arc<AuthorityPerEpochStore>,
6104        inner_temporary_store: &InnerTemporaryStore,
6105        certificate: &VerifiedExecutableTransaction,
6106    ) -> IotaResult<Self> {
6107        // Epoch info
6108        let executed_epoch = epoch_store.epoch();
6109        let reference_gas_price = epoch_store.reference_gas_price();
6110        let epoch_start_config = epoch_store.epoch_start_config();
6111        let protocol_version = epoch_store.protocol_version().as_u64();
6112        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6113
6114        // Record all system packages at this version
6115        let mut relevant_system_packages = Vec::new();
6116        for sys_package_id in BuiltInFramework::all_package_ids() {
6117            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6118                relevant_system_packages.push(ObjDumpFormat::new(w))
6119            }
6120        }
6121
6122        // Record all the shared objects
6123        let mut shared_objects = Vec::new();
6124        for kind in effects.input_shared_objects() {
6125            match kind {
6126                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6127                    if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
6128                        shared_objects.push(ObjDumpFormat::new(w))
6129                    }
6130                }
6131                InputSharedObject::ReadDeleted(..)
6132                | InputSharedObject::MutateDeleted(..)
6133                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
6134                                                           * objects. */
6135            }
6136        }
6137
6138        // Record all loaded child objects
6139        // Child objects which are read but not mutated are not tracked anywhere else
6140        let mut loaded_child_objects = Vec::new();
6141        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6142            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6143                loaded_child_objects.push(ObjDumpFormat::new(w))
6144            }
6145        }
6146
6147        // Record all modified objects
6148        let mut modified_at_versions = Vec::new();
6149        for (id, ver) in effects.modified_at_versions() {
6150            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6151                modified_at_versions.push(ObjDumpFormat::new(w))
6152            }
6153        }
6154
6155        // Packages read at runtime, which were not previously loaded into the temoorary
6156        // store Some packages may be fetched at runtime and wont show up in
6157        // input objects
6158        let mut runtime_reads = Vec::new();
6159        for obj in inner_temporary_store
6160            .runtime_packages_loaded_from_db
6161            .values()
6162        {
6163            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6164        }
6165
6166        // All other input objects should already be in `inner_temporary_store.objects`
6167
6168        Ok(Self {
6169            tx_digest: *tx_digest,
6170            executed_epoch,
6171            reference_gas_price,
6172            epoch_start_timestamp_ms,
6173            protocol_version,
6174            relevant_system_packages,
6175            shared_objects,
6176            loaded_child_objects,
6177            modified_at_versions,
6178            runtime_reads,
6179            sender_signed_data: certificate.clone().into_message(),
6180            input_objects: inner_temporary_store
6181                .input_objects
6182                .values()
6183                .map(|o| ObjDumpFormat::new(o.clone()))
6184                .collect(),
6185            computed_effects: effects.clone(),
6186            expected_effects_digest,
6187        })
6188    }
6189
6190    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6191        let mut objects = Vec::new();
6192        objects.extend(self.relevant_system_packages.clone());
6193        objects.extend(self.shared_objects.clone());
6194        objects.extend(self.loaded_child_objects.clone());
6195        objects.extend(self.modified_at_versions.clone());
6196        objects.extend(self.runtime_reads.clone());
6197        objects.extend(self.input_objects.clone());
6198        objects
6199    }
6200
6201    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6202        let file_name = format!(
6203            "{}_{}_NODE_DUMP.json",
6204            self.tx_digest,
6205            AuthorityState::unixtime_now_ms()
6206        );
6207        let mut path = path.to_path_buf();
6208        path.push(&file_name);
6209        let mut file = File::create(path.clone())?;
6210        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6211        Ok(path)
6212    }
6213
6214    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6215        let file = File::open(path)?;
6216        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6217    }
6218}