iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage as FrameworkSystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49    key_value_store::{
50        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51    },
52    key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57    account_abstraction::{
58        account::AuthenticatorFunctionRefV1Key,
59        authenticator_function::{
60            AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
61            AuthenticatorFunctionRefV1,
62        },
63    },
64    base_types::{
65        AuthorityName, ConciseableName, IotaAddress, ObjectID, ObjectInfo, ObjectRef, ObjectType,
66        SequenceNumber, StructTag, TypeTag, VersionNumber,
67    },
68    committee::{Committee, EpochId, ProtocolVersion},
69    crypto::{AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer},
70    deny_list_v1::check_coin_deny_list_v1_during_signing,
71    digests::{ChainIdentifier, Digest, ObjectDigest, TransactionDigest, TransactionEffectsDigest},
72    dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
73    effects::{
74        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
75        TransactionEffectsExt, TransactionEvents, VerifiedSignedTransactionEffects,
76    },
77    error::{ExecutionError, IotaError, IotaResult, UserInputError},
78    event::{Event, EventID, SystemEpochInfoEvent},
79    executable_transaction::VerifiedExecutableTransaction,
80    execution_config_utils::to_binary_config,
81    execution_status::ExecutionStatus,
82    fp_ensure,
83    gas::{GasCostSummary, IotaGasStatus},
84    gas_coin::NANOS_PER_IOTA,
85    inner_temporary_store::{
86        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
87        WrittenObjects,
88    },
89    iota_sdk_types_conversions::type_tag_core_to_sdk,
90    iota_system_state::{
91        IotaSystemState, IotaSystemStateTrait,
92        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
93    },
94    layout_resolver::{LayoutResolver, into_struct_layout},
95    message_envelope::Message,
96    messages_checkpoint::{
97        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
98        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
99        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
100        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
101    },
102    messages_consensus::AuthorityCapabilitiesV1,
103    messages_grpc::{
104        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
105        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
106        TransactionStatus,
107    },
108    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
109    move_authenticator::MoveAuthenticator,
110    object::{
111        MoveObject, MoveObjectExt, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
112        bounded_visitor::BoundedVisitor,
113    },
114    storage::{
115        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
116    },
117    supported_protocol_versions::{
118        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
119    },
120    transaction::*,
121    transaction_executor::{SimulateTransactionResult, VmChecks},
122};
123use itertools::Itertools;
124use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
125use move_core_types::{
126    account_address::AccountAddress, annotated_value::MoveStructLayout, language_storage::ModuleId,
127};
128use parking_lot::Mutex;
129use prometheus::{
130    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131    register_histogram_vec_with_registry, register_histogram_with_registry,
132    register_int_counter_vec_with_registry, register_int_counter_with_registry,
133    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139    task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152    authority::{
153        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157        authority_store_tables::AuthorityPrunerTables,
158        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159    },
160    authority_client::NetworkAuthorityClient,
161    checkpoint_progress_tracker::CheckpointProgressTracker,
162    checkpoints::CheckpointStore,
163    congestion_tracker::CongestionTracker,
164    consensus_adapter::ConsensusAdapter,
165    epoch::committee_store::CommitteeStore,
166    execution_cache::{
167        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
168        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
169        TransactionCacheRead,
170    },
171    execution_driver::execution_process,
172    global_state_hasher::{GlobalStateHashStore, GlobalStateHasher},
173    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
174    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
175    metrics::{LatencyObserver, RateTracker},
176    module_cache_metrics::ResolverMetrics,
177    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
178    stake_aggregator::StakeAggregator,
179    subscription_handler::SubscriptionHandler,
180    transaction_input_loader::TransactionInputLoader,
181    transaction_manager::TransactionManager,
182    transaction_outputs::TransactionOutputs,
183    validator_tx_finalizer::ValidatorTxFinalizer,
184    verify_indexes::verify_indexes,
185};
186
187#[cfg(test)]
188#[path = "unit_tests/authority_tests.rs"]
189pub mod authority_tests;
190
191#[cfg(test)]
192#[path = "unit_tests/transaction_tests.rs"]
193pub mod transaction_tests;
194
195#[cfg(test)]
196#[path = "unit_tests/batch_transaction_tests.rs"]
197mod batch_transaction_tests;
198
199#[cfg(test)]
200#[path = "unit_tests/move_integration_tests.rs"]
201pub mod move_integration_tests;
202
203#[cfg(test)]
204#[path = "unit_tests/gas_tests.rs"]
205mod gas_tests;
206
207#[cfg(test)]
208#[path = "unit_tests/batch_verification_tests.rs"]
209mod batch_verification_tests;
210
211#[cfg(test)]
212#[path = "unit_tests/coin_deny_list_tests.rs"]
213mod coin_deny_list_tests;
214
215#[cfg(test)]
216#[path = "unit_tests/auth_unit_test_utils.rs"]
217pub mod auth_unit_test_utils;
218
219pub mod authority_test_utils;
220
221pub mod authority_per_epoch_store;
222pub mod authority_per_epoch_store_pruner;
223
224mod authority_store_migrations;
225pub mod authority_store_pruner;
226pub mod authority_store_tables;
227pub mod authority_store_types;
228pub mod epoch_start_configuration;
229pub mod shared_object_congestion_tracker;
230pub mod shared_object_version_manager;
231pub mod suggested_gas_price_calculator;
232pub mod test_authority_builder;
233pub mod transaction_deferral;
234
235pub(crate) mod authority_store;
236pub mod backpressure;
237
238/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
239pub struct AuthorityMetrics {
240    tx_orders: IntCounter,
241    total_certs: IntCounter,
242    total_cert_attempts: IntCounter,
243    total_effects: IntCounter,
244    pub shared_obj_tx: IntCounter,
245    sponsored_tx: IntCounter,
246    tx_already_processed: IntCounter,
247    num_input_objs: Histogram,
248    num_shared_objects: Histogram,
249    batch_size: Histogram,
250
251    authority_state_handle_transaction_latency: Histogram,
252
253    execute_certificate_latency_single_writer: Histogram,
254    execute_certificate_latency_shared_object: Histogram,
255
256    internal_execution_latency: Histogram,
257    execution_load_input_objects_latency: Histogram,
258    prepare_certificate_latency: Histogram,
259    commit_certificate_latency: Histogram,
260    db_checkpoint_latency: Histogram,
261
262    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
263    pub(crate) transaction_manager_num_missing_objects: IntGauge,
264    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
265    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
266    pub(crate) transaction_manager_num_ready: IntGauge,
267    pub(crate) transaction_manager_object_cache_size: IntGauge,
268    pub(crate) transaction_manager_object_cache_hits: IntCounter,
269    pub(crate) transaction_manager_object_cache_misses: IntCounter,
270    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
271    pub(crate) transaction_manager_package_cache_size: IntGauge,
272    pub(crate) transaction_manager_package_cache_hits: IntCounter,
273    pub(crate) transaction_manager_package_cache_misses: IntCounter,
274    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
275    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
276
277    pub(crate) execution_driver_executed_transactions: IntCounter,
278    pub(crate) execution_driver_dispatch_queue: IntGauge,
279    pub(crate) execution_queueing_delay_s: Histogram,
280    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
281    pub(crate) execution_gas_latency_ratio: Histogram,
282
283    pub(crate) skipped_consensus_txns: IntCounter,
284    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
285
286    pub(crate) authority_overload_status: IntGauge,
287    pub(crate) authority_load_shedding_percentage: IntGauge,
288
289    pub(crate) transaction_overload_sources: IntCounterVec,
290
291    /// Post processing metrics
292    post_processing_total_events_emitted: IntCounter,
293    post_processing_total_tx_indexed: IntCounter,
294    post_processing_total_tx_had_event_processed: IntCounter,
295    post_processing_total_failures: IntCounter,
296
297    /// Consensus handler metrics
298    pub consensus_handler_processed: IntCounterVec,
299    pub consensus_handler_transaction_sizes: HistogramVec,
300    pub consensus_handler_num_low_scoring_authorities: IntGauge,
301    pub consensus_handler_scores: IntGaugeVec,
302    pub consensus_handler_deferred_transactions: IntCounter,
303    pub consensus_handler_congested_transactions: IntCounter,
304    pub consensus_handler_cancelled_transactions: IntCounter,
305    pub consensus_handler_max_object_costs: IntGaugeVec,
306    pub consensus_committed_subdags: IntCounterVec,
307    pub consensus_committed_messages: IntGaugeVec,
308    pub consensus_committed_user_transactions: IntGaugeVec,
309    pub consensus_handler_leader_round: IntGauge,
310    pub consensus_calculated_throughput: IntGauge,
311    pub consensus_calculated_throughput_profile: IntGauge,
312
313    pub limits_metrics: Arc<LimitsMetrics>,
314
315    /// bytecode verifier metrics for tracking timeouts
316    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
317
318    /// Count of multisig signatures
319    pub multisig_sig_count: IntCounter,
320
321    // Tracks recent average txn queueing delay between when it is ready for execution
322    // until it starts executing.
323    pub execution_queueing_latency: LatencyObserver,
324
325    // Tracks the rate of transactions become ready for execution in transaction manager.
326    // The need for the Mutex is that the tracker is updated in transaction manager and read
327    // in the overload_monitor. There should be low mutex contention because
328    // transaction manager is single threaded and the read rate in overload_monitor is
329    // low. In the case where transaction manager becomes multi-threaded, we can
330    // create one rate tracker per thread.
331    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
332
333    // Tracks the rate of transactions starts execution in execution driver.
334    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
335    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
336}
337
338// Override default Prom buckets for positive numbers in 0-10M range
339const POSITIVE_INT_BUCKETS: &[f64] = &[
340    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
341    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
342    7000000., 10000000.,
343];
344
345const LATENCY_SEC_BUCKETS: &[f64] = &[
346    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
347    10., 20., 30., 60., 90.,
348];
349
350// Buckets for low latency samples. Starts from 10us.
351const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
352    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
353    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
354];
355
356const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
357    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
358    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
359];
360
361/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
362pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
363
364impl AuthorityMetrics {
365    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
366        let execute_certificate_latency = register_histogram_vec_with_registry!(
367            "authority_state_execute_certificate_latency",
368            "Latency of executing certificates, including waiting for inputs",
369            &["tx_type"],
370            LATENCY_SEC_BUCKETS.to_vec(),
371            registry,
372        )
373        .unwrap();
374
375        let execute_certificate_latency_single_writer =
376            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
377        let execute_certificate_latency_shared_object =
378            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
379
380        Self {
381            tx_orders: register_int_counter_with_registry!(
382                "total_transaction_orders",
383                "Total number of transaction orders",
384                registry,
385            )
386            .unwrap(),
387            total_certs: register_int_counter_with_registry!(
388                "total_transaction_certificates",
389                "Total number of transaction certificates handled",
390                registry,
391            )
392            .unwrap(),
393            total_cert_attempts: register_int_counter_with_registry!(
394                "total_handle_certificate_attempts",
395                "Number of calls to handle_certificate",
396                registry,
397            )
398            .unwrap(),
399            // total_effects == total transactions finished
400            total_effects: register_int_counter_with_registry!(
401                "total_transaction_effects",
402                "Total number of transaction effects produced",
403                registry,
404            )
405            .unwrap(),
406
407            shared_obj_tx: register_int_counter_with_registry!(
408                "num_shared_obj_tx",
409                "Number of transactions involving shared objects",
410                registry,
411            )
412            .unwrap(),
413
414            sponsored_tx: register_int_counter_with_registry!(
415                "num_sponsored_tx",
416                "Number of sponsored transactions",
417                registry,
418            )
419            .unwrap(),
420
421            tx_already_processed: register_int_counter_with_registry!(
422                "num_tx_already_processed",
423                "Number of transaction orders already processed previously",
424                registry,
425            )
426            .unwrap(),
427            num_input_objs: register_histogram_with_registry!(
428                "num_input_objects",
429                "Distribution of number of input TX objects per TX",
430                POSITIVE_INT_BUCKETS.to_vec(),
431                registry,
432            )
433            .unwrap(),
434            num_shared_objects: register_histogram_with_registry!(
435                "num_shared_objects",
436                "Number of shared input objects per TX",
437                POSITIVE_INT_BUCKETS.to_vec(),
438                registry,
439            )
440            .unwrap(),
441            batch_size: register_histogram_with_registry!(
442                "batch_size",
443                "Distribution of size of transaction batch",
444                POSITIVE_INT_BUCKETS.to_vec(),
445                registry,
446            )
447            .unwrap(),
448            authority_state_handle_transaction_latency: register_histogram_with_registry!(
449                "authority_state_handle_transaction_latency",
450                "Latency of handling transactions",
451                LATENCY_SEC_BUCKETS.to_vec(),
452                registry,
453            )
454            .unwrap(),
455            execute_certificate_latency_single_writer,
456            execute_certificate_latency_shared_object,
457            internal_execution_latency: register_histogram_with_registry!(
458                "authority_state_internal_execution_latency",
459                "Latency of actual certificate executions",
460                LATENCY_SEC_BUCKETS.to_vec(),
461                registry,
462            )
463            .unwrap(),
464            execution_load_input_objects_latency: register_histogram_with_registry!(
465                "authority_state_execution_load_input_objects_latency",
466                "Latency of loading input objects for execution",
467                LOW_LATENCY_SEC_BUCKETS.to_vec(),
468                registry,
469            )
470            .unwrap(),
471            prepare_certificate_latency: register_histogram_with_registry!(
472                "authority_state_prepare_certificate_latency",
473                "Latency of executing certificates, before committing the results",
474                LATENCY_SEC_BUCKETS.to_vec(),
475                registry,
476            )
477            .unwrap(),
478            commit_certificate_latency: register_histogram_with_registry!(
479                "authority_state_commit_certificate_latency",
480                "Latency of committing certificate execution results",
481                LATENCY_SEC_BUCKETS.to_vec(),
482                registry,
483            )
484            .unwrap(),
485            db_checkpoint_latency: register_histogram_with_registry!(
486                "db_checkpoint_latency",
487                "Latency of checkpointing dbs",
488                LATENCY_SEC_BUCKETS.to_vec(),
489                registry,
490            ).unwrap(),
491            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
492                "transaction_manager_num_enqueued_certificates",
493                "Current number of certificates enqueued to TransactionManager",
494                &["result"],
495                registry,
496            )
497            .unwrap(),
498            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
499                "transaction_manager_num_missing_objects",
500                "Current number of missing objects in TransactionManager",
501                registry,
502            )
503            .unwrap(),
504            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
505                "transaction_manager_num_pending_certificates",
506                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
507                registry,
508            )
509            .unwrap(),
510            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
511                "transaction_manager_num_executing_certificates",
512                "Number of executing certificates, including queued and actually running certificates",
513                registry,
514            )
515            .unwrap(),
516            transaction_manager_num_ready: register_int_gauge_with_registry!(
517                "transaction_manager_num_ready",
518                "Number of ready transactions in TransactionManager",
519                registry,
520            )
521            .unwrap(),
522            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
523                "transaction_manager_object_cache_size",
524                "Current size of object-availability cache in TransactionManager",
525                registry,
526            )
527            .unwrap(),
528            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
529                "transaction_manager_object_cache_hits",
530                "Number of object-availability cache hits in TransactionManager",
531                registry,
532            )
533            .unwrap(),
534            authority_overload_status: register_int_gauge_with_registry!(
535                "authority_overload_status",
536                "Whether authority is current experiencing overload and enters load shedding mode.",
537                registry)
538            .unwrap(),
539            authority_load_shedding_percentage: register_int_gauge_with_registry!(
540                "authority_load_shedding_percentage",
541                "The percentage of transactions is shed when the authority is in load shedding mode.",
542                registry)
543            .unwrap(),
544            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
545                "transaction_manager_object_cache_misses",
546                "Number of object-availability cache misses in TransactionManager",
547                registry,
548            )
549            .unwrap(),
550            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
551                "transaction_manager_object_cache_evictions",
552                "Number of object-availability cache evictions in TransactionManager",
553                registry,
554            )
555            .unwrap(),
556            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
557                "transaction_manager_package_cache_size",
558                "Current size of package-availability cache in TransactionManager",
559                registry,
560            )
561            .unwrap(),
562            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
563                "transaction_manager_package_cache_hits",
564                "Number of package-availability cache hits in TransactionManager",
565                registry,
566            )
567            .unwrap(),
568            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
569                "transaction_manager_package_cache_misses",
570                "Number of package-availability cache misses in TransactionManager",
571                registry,
572            )
573            .unwrap(),
574            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
575                "transaction_manager_package_cache_evictions",
576                "Number of package-availability cache evictions in TransactionManager",
577                registry,
578            )
579            .unwrap(),
580            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
581                "transaction_manager_transaction_queue_age_s",
582                "Time spent in waiting for transaction in the queue",
583                LATENCY_SEC_BUCKETS.to_vec(),
584                registry,
585            )
586            .unwrap(),
587            transaction_overload_sources: register_int_counter_vec_with_registry!(
588                "transaction_overload_sources",
589                "Number of times each source indicates transaction overload.",
590                &["source"],
591                registry)
592            .unwrap(),
593            execution_driver_executed_transactions: register_int_counter_with_registry!(
594                "execution_driver_executed_transactions",
595                "Cumulative number of transaction executed by execution driver",
596                registry,
597            )
598            .unwrap(),
599            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
600                "execution_driver_dispatch_queue",
601                "Number of transaction pending in execution driver dispatch queue",
602                registry,
603            )
604            .unwrap(),
605            execution_queueing_delay_s: register_histogram_with_registry!(
606                "execution_queueing_delay_s",
607                "Queueing delay between a transaction is ready for execution until it starts executing.",
608                LATENCY_SEC_BUCKETS.to_vec(),
609                registry
610            )
611            .unwrap(),
612            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
613                "prepare_cert_gas_latency_ratio",
614                "The ratio of computation gas divided by VM execution latency.",
615                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
616                registry
617            )
618            .unwrap(),
619            execution_gas_latency_ratio: register_histogram_with_registry!(
620                "execution_gas_latency_ratio",
621                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
622                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
623                registry
624            )
625            .unwrap(),
626            skipped_consensus_txns: register_int_counter_with_registry!(
627                "skipped_consensus_txns",
628                "Total number of consensus transactions skipped",
629                registry,
630            )
631            .unwrap(),
632            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
633                "skipped_consensus_txns_cache_hit",
634                "Total number of consensus transactions skipped because of local cache hit",
635                registry,
636            )
637            .unwrap(),
638            post_processing_total_events_emitted: register_int_counter_with_registry!(
639                "post_processing_total_events_emitted",
640                "Total number of events emitted in post processing",
641                registry,
642            )
643            .unwrap(),
644            post_processing_total_tx_indexed: register_int_counter_with_registry!(
645                "post_processing_total_tx_indexed",
646                "Total number of txes indexed in post processing",
647                registry,
648            )
649            .unwrap(),
650            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
651                "post_processing_total_tx_had_event_processed",
652                "Total number of txes finished event processing in post processing",
653                registry,
654            )
655            .unwrap(),
656            post_processing_total_failures: register_int_counter_with_registry!(
657                "post_processing_total_failures",
658                "Total number of failure in post processing",
659                registry,
660            )
661            .unwrap(),
662            consensus_handler_processed: register_int_counter_vec_with_registry!(
663                "consensus_handler_processed",
664                "Number of transactions processed by consensus handler",
665                &["class"],
666                registry
667            ).unwrap(),
668            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
669                "consensus_handler_transaction_sizes",
670                "Sizes of each type of transactions processed by consensus handler",
671                &["class"],
672                POSITIVE_INT_BUCKETS.to_vec(),
673                registry
674            ).unwrap(),
675            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
676                "consensus_handler_num_low_scoring_authorities",
677                "Number of low scoring authorities based on reputation scores from consensus",
678                registry
679            ).unwrap(),
680            consensus_handler_scores: register_int_gauge_vec_with_registry!(
681                "consensus_handler_scores",
682                "scores from consensus for each authority",
683                &["authority"],
684                registry,
685            ).unwrap(),
686            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
687                "consensus_handler_deferred_transactions",
688                "Number of transactions deferred by consensus handler",
689                registry,
690            ).unwrap(),
691            consensus_handler_congested_transactions: register_int_counter_with_registry!(
692                "consensus_handler_congested_transactions",
693                "Number of transactions deferred by consensus handler due to congestion",
694                registry,
695            ).unwrap(),
696            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
697                "consensus_handler_cancelled_transactions",
698                "Number of transactions cancelled by consensus handler",
699                registry,
700            ).unwrap(),
701            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
702                "consensus_handler_max_congestion_control_object_costs",
703                "Max object costs for congestion control in the current consensus commit",
704                &["commit_type"],
705                registry,
706            ).unwrap(),
707            consensus_committed_subdags: register_int_counter_vec_with_registry!(
708                "consensus_committed_subdags",
709                "Number of committed subdags, sliced by leader",
710                &["authority"],
711                registry,
712            ).unwrap(),
713            consensus_committed_messages: register_int_gauge_vec_with_registry!(
714                "consensus_committed_messages",
715                "Total number of committed consensus messages, sliced by author",
716                &["authority"],
717                registry,
718            ).unwrap(),
719            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
720                "consensus_committed_user_transactions",
721                "Number of committed user transactions, sliced by submitter",
722                &["authority"],
723                registry,
724            ).unwrap(),
725            consensus_handler_leader_round: register_int_gauge_with_registry!(
726                "consensus_handler_leader_round",
727                "The leader round of the current consensus output being processed in the consensus handler",
728                registry,
729            ).unwrap(),
730            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
731            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
732            multisig_sig_count: register_int_counter_with_registry!(
733                "multisig_sig_count",
734                "Count of multisig signatures",
735                registry,
736            )
737            .unwrap(),
738            consensus_calculated_throughput: register_int_gauge_with_registry!(
739                "consensus_calculated_throughput",
740                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
741                registry,
742            ).unwrap(),
743            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
744                "consensus_calculated_throughput_profile",
745                "The current active calculated throughput profile",
746                registry
747            ).unwrap(),
748            execution_queueing_latency: LatencyObserver::new(),
749            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
750            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
751        }
752    }
753
754    /// Reset metrics that contain `hostname` as one of the labels. This is
755    /// needed to avoid retaining metrics for long-gone committee members and
756    /// only exposing metrics for the committee in the current epoch.
757    pub fn reset_on_reconfigure(&self) {
758        self.consensus_committed_messages.reset();
759        self.consensus_handler_scores.reset();
760        self.consensus_committed_user_transactions.reset();
761    }
762}
763
764/// a Trait object for `Signer` that is:
765/// - Pin, i.e. confined to one place in memory (we don't want to copy private
766///   keys).
767/// - Sync, i.e. can be safely shared between threads.
768///
769/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
770pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
771
772pub struct AuthorityState {
773    // Fixed size, static, identity of the authority
774    /// The name of this authority.
775    pub name: AuthorityName,
776    /// The signature key of the authority.
777    pub secret: StableSyncAuthoritySigner,
778
779    /// The database
780    input_loader: TransactionInputLoader,
781    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
782
783    epoch_store: ArcSwap<AuthorityPerEpochStore>,
784
785    /// This lock denotes current 'execution epoch'.
786    /// Execution acquires read lock, checks certificate epoch and holds it
787    /// until all writes are complete. Reconfiguration acquires write lock,
788    /// changes the epoch and revert all transactions from previous epoch
789    /// that are executed but did not make into checkpoint.
790    execution_lock: RwLock<EpochId>,
791
792    pub indexes: Option<Arc<IndexStore>>,
793    pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
794
795    pub subscription_handler: Arc<SubscriptionHandler>,
796    pub checkpoint_store: Arc<CheckpointStore>,
797
798    committee_store: Arc<CommitteeStore>,
799
800    /// Manages pending certificates and their missing input objects.
801    transaction_manager: Arc<TransactionManager>,
802
803    /// Shuts down the execution task. Used only in testing.
804    #[cfg_attr(not(test), expect(unused))]
805    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
806
807    pub metrics: Arc<AuthorityMetrics>,
808    _pruner: AuthorityStorePruner,
809    authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
810    checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
811
812    /// Take db checkpoints of different dbs
813    db_checkpoint_config: DBCheckpointConfig,
814
815    pub config: NodeConfig,
816
817    /// Current overload status in this authority. Updated periodically.
818    pub overload_info: AuthorityOverloadInfo,
819
820    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
821
822    /// The chain identifier is derived from the digest of the genesis
823    /// checkpoint.
824    chain_identifier: ChainIdentifier,
825
826    pub(crate) congestion_tracker: Arc<CongestionTracker>,
827}
828
829/// The authority state encapsulates all state, drives execution, and ensures
830/// safety.
831///
832/// Note the authority operations can be accessed through a read ref (&) and do
833/// not require &mut. Internally a database is synchronized through a mutex
834/// lock.
835///
836/// Repeating valid commands should produce no changes and return no error.
837impl AuthorityState {
838    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
839        epoch_store.committee().authority_exists(&self.name)
840    }
841
842    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
843        epoch_store
844            .active_validators()
845            .iter()
846            .any(|a| AuthorityName::from(a) == self.name)
847    }
848
849    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
850        !self.is_committee_validator(epoch_store)
851    }
852
853    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
854        &self.committee_store
855    }
856
857    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
858        self.committee_store.clone()
859    }
860
861    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
862        &self.config.authority_overload_config
863    }
864
865    pub fn get_epoch_state_commitments(
866        &self,
867        epoch: EpochId,
868    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
869        self.checkpoint_store.get_epoch_state_commitments(epoch)
870    }
871
872    /// This is a private method and should be kept that way. It doesn't check
873    /// whether the provided transaction is a system transaction, and hence
874    /// can only be called internally.
875    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
876    async fn handle_transaction_impl(
877        &self,
878        transaction: VerifiedTransaction,
879        epoch_store: &Arc<AuthorityPerEpochStore>,
880    ) -> IotaResult<VerifiedSignedTransaction> {
881        // Ensure that validator cannot reconfigure while we are signing the tx
882        let _execution_lock = self.execution_lock_for_signing()?;
883
884        let protocol_config = epoch_store.protocol_config();
885        let reference_gas_price = epoch_store.reference_gas_price();
886
887        let epoch = epoch_store.epoch();
888
889        let tx_data = transaction.data().transaction_data();
890
891        // Note: the deny checks may do redundant package loads but:
892        // - they only load packages when there is an active package deny map
893        // - the loads are cached anyway
894        iota_transaction_checks::deny::check_transaction_for_signing(
895            tx_data,
896            transaction.tx_signatures(),
897            &transaction.input_objects()?,
898            &tx_data.receiving_objects(),
899            &self.config.transaction_deny_config,
900            self.get_backing_package_store().as_ref(),
901        )?;
902
903        // Load all transaction-related input objects including ones for every
904        // `MoveAuthenticator`. Loading all objects eagerly means that any invalid
905        // reference — missing object, wrong version, inaccessible object — causes a
906        // pre-consensus rejection.
907        let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
908            self.read_objects_for_signing(&transaction, epoch)?;
909
910        let move_authenticators = transaction.move_authenticators();
911
912        // Check the inputs for signing.
913        // If there are `MoveAuthenticator` signatures, their input objects and the
914        // account objects are also checked and must be provided.
915        // It is also checked if there is enough gas to execute the transaction and its
916        // authenticators.
917        let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
918            .check_transaction_inputs_for_signing(
919                protocol_config,
920                reference_gas_price,
921                tx_data,
922                tx_input_objects,
923                &tx_receiving_objects,
924                &move_authenticators,
925                per_authenticator_inputs,
926            )?;
927
928        // Get the input objects for the authenticators, if there are
929        // `MoveAuthenticator`s.
930        let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
931            .iter()
932            .map(|i| &i.0)
933            .collect();
934
935        // Check if any of the sender, the transaction input objects, the receiving
936        // objects and the authenticator input objects are in the coin deny
937        // list, which would prevent the transaction from being signed.
938        check_coin_deny_list_v1_during_signing(
939            tx_data.sender(),
940            &tx_checked_input_objects,
941            &tx_receiving_objects,
942            &per_authenticator_checked_input_objects,
943            &self.get_object_store(),
944        )?;
945
946        // Filter the authenticators and their checked inputs down to those that must
947        // be executed pre-consensus. This is done *after* the deny-list check so
948        // that all MoveAuthenticator input objects are covered by that check regardless
949        // of deferral.
950        let pre_consensus_move_authenticators =
951            pre_consensus_move_authenticators(&transaction, protocol_config);
952        let (move_authenticators, per_authenticator_checked_inputs): (Vec<_>, Vec<_>) =
953            move_authenticators
954                .into_iter()
955                .zip(per_authenticator_checked_inputs)
956                .filter(|(a, _)| pre_consensus_move_authenticators.contains(a))
957                .unzip();
958        let per_authenticator_checked_input_objects: Vec<_> = per_authenticator_checked_inputs
959            .iter()
960            .map(|i| &i.0)
961            .collect();
962
963        // If there are `MoveAuthenticator` signatures, execute them and check if they
964        // all succeed.
965        if !move_authenticators.is_empty() {
966            let aggregated_authenticator_input_objects =
967                iota_transaction_checks::aggregate_authenticator_input_objects(
968                    &per_authenticator_checked_input_objects,
969                )?;
970
971            debug_assert_eq!(
972                move_authenticators.len(),
973                per_authenticator_checked_inputs.len(),
974                "Move authenticators amount must match the number of checked authenticator inputs"
975            );
976
977            let move_authenticators = move_authenticators
978                .into_iter()
979                .zip(per_authenticator_checked_inputs)
980                .map(
981                    |(
982                        move_authenticator,
983                        (authenticator_checked_input_objects, authenticator_function_ref),
984                    )| {
985                        (
986                            move_authenticator.to_owned(),
987                            authenticator_function_ref,
988                            authenticator_checked_input_objects,
989                        )
990                    },
991                )
992                .collect();
993
994            // It is supposed that `MoveAuthenticator` availability is checked in
995            // `SenderSignedData::validity_check`.
996
997            // Serialize the TransactionData for the auth context before decomposing.
998            let tx_data_bytes =
999                bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
1000
1001            let (sender_auth_digest, sponsor_auth_digest) =
1002                transaction.data().compute_auth_digests()?;
1003
1004            let (kind, signer, gas_data) = tx_data.execution_parts();
1005
1006            // Execute the Move authenticators.
1007            let validation_result = epoch_store.executor().authenticate_transaction(
1008                self.get_backing_store().as_ref(),
1009                protocol_config,
1010                self.metrics.limits_metrics.clone(),
1011                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1012                epoch_store
1013                    .epoch_start_config()
1014                    .epoch_data()
1015                    .epoch_start_timestamp(),
1016                gas_data,
1017                gas_status,
1018                move_authenticators,
1019                aggregated_authenticator_input_objects,
1020                kind,
1021                signer,
1022                transaction.digest().to_owned(),
1023                tx_data_bytes,
1024                sender_auth_digest,
1025                sponsor_auth_digest,
1026                &mut None,
1027            );
1028
1029            if let Err(validation_error) = validation_result {
1030                return Err(IotaError::MoveAuthenticatorExecutionFailure {
1031                    error: validation_error.to_string(),
1032                });
1033            }
1034        }
1035
1036        let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
1037
1038        let signed_transaction =
1039            VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1040
1041        // Check and write locks, to signed transaction, into the database
1042        // The call to self.set_transaction_lock checks the lock is not conflicting,
1043        // and returns ConflictingTransaction error in case there is a lock on a
1044        // different existing transaction.
1045        self.get_cache_writer().try_acquire_transaction_locks(
1046            epoch_store,
1047            &owned_objects,
1048            signed_transaction.clone(),
1049        )?;
1050
1051        Ok(signed_transaction)
1052    }
1053
1054    /// Initiate a new transaction.
1055    #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()))]
1056    pub async fn handle_transaction(
1057        &self,
1058        epoch_store: &Arc<AuthorityPerEpochStore>,
1059        transaction: VerifiedTransaction,
1060    ) -> IotaResult<HandleTransactionResponse> {
1061        let tx_digest = *transaction.digest();
1062        debug!("handle_transaction");
1063
1064        // Ensure an idempotent answer.
1065        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1066            return Ok(HandleTransactionResponse { status });
1067        }
1068
1069        let _metrics_guard = self
1070            .metrics
1071            .authority_state_handle_transaction_latency
1072            .start_timer();
1073        self.metrics.tx_orders.inc();
1074
1075        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1076        match signed {
1077            Ok(s) => {
1078                if self.is_committee_validator(epoch_store) {
1079                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1080                        let tx = s.clone();
1081                        let validator_tx_finalizer = validator_tx_finalizer.clone();
1082                        let cache_reader = self.get_transaction_cache_reader().clone();
1083                        let epoch_store = epoch_store.clone();
1084                        spawn_monitored_task!(epoch_store.within_alive_epoch(
1085                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1086                        ));
1087                    }
1088                }
1089                Ok(HandleTransactionResponse {
1090                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1091                })
1092            }
1093            // It happens frequently that while we are checking the validity of the transaction, it
1094            // has just been executed.
1095            // In that case, we could still return Ok to avoid showing confusing errors.
1096            Err(err) => Ok(HandleTransactionResponse {
1097                status: self
1098                    .get_transaction_status(&tx_digest, epoch_store)?
1099                    .ok_or(err)?
1100                    .1,
1101            }),
1102        }
1103    }
1104
1105    pub fn check_system_overload_at_signing(&self) -> bool {
1106        self.config
1107            .authority_overload_config
1108            .check_system_overload_at_signing
1109    }
1110
1111    pub fn check_system_overload_at_execution(&self) -> bool {
1112        self.config
1113            .authority_overload_config
1114            .check_system_overload_at_execution
1115    }
1116
1117    pub(crate) fn check_system_overload(
1118        &self,
1119        consensus_adapter: &Arc<ConsensusAdapter>,
1120        tx_data: &SenderSignedData,
1121        do_authority_overload_check: bool,
1122    ) -> IotaResult {
1123        if do_authority_overload_check {
1124            self.check_authority_overload(tx_data).tap_err(|_| {
1125                self.update_overload_metrics("execution_queue");
1126            })?;
1127        }
1128        self.transaction_manager
1129            .check_execution_overload(self.overload_config(), tx_data)
1130            .tap_err(|_| {
1131                self.update_overload_metrics("execution_pending");
1132            })?;
1133        consensus_adapter.check_consensus_overload().tap_err(|_| {
1134            self.update_overload_metrics("consensus");
1135        })?;
1136
1137        let pending_tx_count = self
1138            .get_cache_commit()
1139            .approximate_pending_transaction_count();
1140        if pending_tx_count
1141            > self
1142                .config
1143                .execution_cache_config
1144                .writeback_cache
1145                .backpressure_threshold_for_rpc()
1146        {
1147            return Err(IotaError::ValidatorOverloadedRetryAfter {
1148                retry_after_secs: 10,
1149            });
1150        }
1151
1152        Ok(())
1153    }
1154
1155    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1156        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1157            return Ok(());
1158        }
1159
1160        let load_shedding_percentage = self
1161            .overload_info
1162            .load_shedding_percentage
1163            .load(Ordering::Relaxed);
1164        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1165    }
1166
1167    fn update_overload_metrics(&self, source: &str) {
1168        self.metrics
1169            .transaction_overload_sources
1170            .with_label_values(&[source])
1171            .inc();
1172    }
1173
1174    /// Executes a certificate for its effects.
1175    #[instrument(level = "trace", skip_all)]
1176    pub async fn execute_certificate(
1177        &self,
1178        certificate: &VerifiedCertificate,
1179        epoch_store: &Arc<AuthorityPerEpochStore>,
1180    ) -> IotaResult<TransactionEffects> {
1181        let _metrics_guard = if certificate.contains_shared_object() {
1182            self.metrics
1183                .execute_certificate_latency_shared_object
1184                .start_timer()
1185        } else {
1186            self.metrics
1187                .execute_certificate_latency_single_writer
1188                .start_timer()
1189        };
1190        trace!("execute_certificate");
1191
1192        self.metrics.total_cert_attempts.inc();
1193
1194        if !certificate.contains_shared_object() {
1195            // Shared object transactions need to be sequenced by the consensus before
1196            // enqueueing for execution, done in
1197            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1198            // object transactions, they can be enqueued for execution immediately.
1199            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1200        }
1201
1202        // tx could be reverted when epoch ends, so we must be careful not to return a
1203        // result here after the epoch ends.
1204        epoch_store
1205            .within_alive_epoch(self.notify_read_effects(certificate))
1206            .await
1207            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1208            .and_then(|r| r)
1209    }
1210
1211    /// Internal logic to execute a certificate.
1212    ///
1213    /// Guarantees that
1214    /// - If input objects are available, return no permanent failure.
1215    /// - Execution and output commit are atomic. i.e. outputs are only written
1216    ///   to storage,
1217    /// on successful execution; crashed execution has no observable effect and
1218    /// can be retried.
1219    ///
1220    /// It is caller's responsibility to ensure input objects are available and
1221    /// locks are set. If this cannot be satisfied by the caller,
1222    /// execute_certificate() should be called instead.
1223    ///
1224    /// Should only be called within iota-core.
1225    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1226    pub fn try_execute_immediately(
1227        &self,
1228        certificate: &VerifiedExecutableTransaction,
1229        expected_effects_digest: Option<TransactionEffectsDigest>,
1230        epoch_store: &Arc<AuthorityPerEpochStore>,
1231    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1232        let _scope = monitored_scope("Execution::try_execute_immediately");
1233        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1234
1235        let tx_digest = certificate.digest();
1236
1237        // Acquire a lock to prevent concurrent executions of the same transaction.
1238        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1239
1240        // The cert could have been processed by a concurrent attempt of the same cert,
1241        // so check if the effects have already been written.
1242        if let Some(effects) = self
1243            .get_transaction_cache_reader()
1244            .try_get_executed_effects(tx_digest)?
1245        {
1246            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1247                assert_eq!(
1248                    effects.digest(),
1249                    expected_effects_digest_inner,
1250                    "Unexpected effects digest for transaction {tx_digest}"
1251                );
1252            }
1253            tx_guard.release();
1254            return Ok((effects, None));
1255        }
1256
1257        let (tx_input_objects, per_authenticator_inputs) =
1258            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1259
1260        // If no expected_effects_digest was provided, try to get it from storage.
1261        // We could be re-executing a previously executed but uncommitted transaction,
1262        // perhaps after restarting with a new binary. In this situation, if
1263        // we have published an effects signature, we must be sure not to
1264        // equivocate. TODO: read from cache instead of DB
1265        let expected_effects_digest =
1266            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1267
1268        self.process_certificate(
1269            tx_guard,
1270            certificate,
1271            tx_input_objects,
1272            per_authenticator_inputs,
1273            expected_effects_digest,
1274            epoch_store,
1275        )
1276        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1277        .tap_ok(
1278            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1279        )
1280    }
1281
1282    pub fn read_objects_for_execution(
1283        &self,
1284        tx_lock: &CertLockGuard,
1285        certificate: &VerifiedExecutableTransaction,
1286        epoch_store: &Arc<AuthorityPerEpochStore>,
1287    ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1288        let _scope = monitored_scope("Execution::load_input_objects");
1289        let _metrics_guard = self
1290            .metrics
1291            .execution_load_input_objects_latency
1292            .start_timer();
1293
1294        let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1295
1296        let input_objects = self.input_loader.read_objects_for_execution(
1297            epoch_store,
1298            &certificate.key(),
1299            tx_lock,
1300            &input_objects,
1301            epoch_store.epoch(),
1302        )?;
1303
1304        certificate.split_input_objects_into_groups_for_reading(input_objects)
1305    }
1306
1307    /// Test only wrapper for `try_execute_immediately()` above, useful for
1308    /// checking errors if the pre-conditions are not satisfied, and
1309    /// executing change epoch transactions.
1310    pub fn try_execute_for_test(
1311        &self,
1312        certificate: &VerifiedCertificate,
1313    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1314        let epoch_store = self.epoch_store_for_testing();
1315        let (effects, execution_error_opt) = self.try_execute_immediately(
1316            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1317            None,
1318            &epoch_store,
1319        )?;
1320        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1321        Ok((signed_effects, execution_error_opt))
1322    }
1323
1324    /// Non-fallible version of `try_execute_for_test()`.
1325    pub fn execute_for_test(
1326        &self,
1327        certificate: &VerifiedCertificate,
1328    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1329        self.try_execute_for_test(certificate)
1330            .expect("try_execute_for_test should not fail")
1331    }
1332
1333    pub async fn notify_read_effects(
1334        &self,
1335        certificate: &VerifiedCertificate,
1336    ) -> IotaResult<TransactionEffects> {
1337        self.get_transaction_cache_reader()
1338            .try_notify_read_executed_effects(&[*certificate.digest()])
1339            .await
1340            .map(|mut r| r.pop().expect("must return correct number of effects"))
1341    }
1342
1343    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1344        self.get_object_cache_reader()
1345            .try_check_owned_objects_are_live(owned_object_refs)
1346    }
1347
1348    /// This function captures the required state to debug a forked transaction.
1349    /// The dump is written to a file in dir `path`, with name prefixed by the
1350    /// transaction digest. NOTE: Since this info escapes the validator
1351    /// context, make sure not to leak any private info here
1352    pub(crate) fn debug_dump_transaction_state(
1353        &self,
1354        tx_digest: &TransactionDigest,
1355        effects: &TransactionEffects,
1356        expected_effects_digest: TransactionEffectsDigest,
1357        inner_temporary_store: &InnerTemporaryStore,
1358        certificate: &VerifiedExecutableTransaction,
1359        debug_dump_config: &StateDebugDumpConfig,
1360    ) -> IotaResult<PathBuf> {
1361        // Fall back to the OS temp directory if no dump directory is configured.
1362        // This is safe: dump files are named by transaction digest, so no collisions.
1363        let dump_dir = debug_dump_config
1364            .dump_file_directory
1365            .as_ref()
1366            .cloned()
1367            .unwrap_or(std::env::temp_dir());
1368        let epoch_store = self.load_epoch_store_one_call_per_task();
1369
1370        NodeStateDump::new(
1371            tx_digest,
1372            effects,
1373            expected_effects_digest,
1374            self.get_object_store().as_ref(),
1375            &epoch_store,
1376            inner_temporary_store,
1377            certificate,
1378        )?
1379        .write_to_file(&dump_dir)
1380        .map_err(|e| IotaError::FileIO(e.to_string()))
1381    }
1382
1383    #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?certificate.digest(), sender = ?certificate.data().transaction_data().gas_owner().to_string()))]
1384    pub(crate) fn process_certificate(
1385        &self,
1386        tx_guard: CertTxGuard,
1387        certificate: &VerifiedExecutableTransaction,
1388        tx_input_objects: InputObjects,
1389        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1390        expected_effects_digest: Option<TransactionEffectsDigest>,
1391        epoch_store: &Arc<AuthorityPerEpochStore>,
1392    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1393        let process_certificate_start_time = tokio::time::Instant::now();
1394        let digest = *certificate.digest();
1395
1396        let _scope = monitored_scope("Execution::process_certificate");
1397
1398        fail_point_if!("correlated-crash-process-certificate", || {
1399            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1400                iota_simulator::task::kill_current_node(None);
1401            }
1402        });
1403
1404        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1405        // Any caller that verifies the signatures on the certificate will have already
1406        // checked the epoch. But paths that don't verify sigs (e.g. execution
1407        // from checkpoint, reading from db) present the possibility of an epoch
1408        // mismatch. If this cert is not finalzied in previous epoch, then it's
1409        // invalid.
1410        let execution_guard = match execution_guard {
1411            Ok(execution_guard) => execution_guard,
1412            Err(err) => {
1413                tx_guard.release();
1414                return Err(err);
1415            }
1416        };
1417        // Since we obtain a reference to the epoch store before taking the execution
1418        // lock, it's possible that reconfiguration has happened and they no
1419        // longer match.
1420        if *execution_guard != epoch_store.epoch() {
1421            tx_guard.release();
1422            info!("The epoch of the execution_guard doesn't match the epoch store");
1423            return Err(IotaError::WrongEpoch {
1424                expected_epoch: epoch_store.epoch(),
1425                actual_epoch: *execution_guard,
1426            });
1427        }
1428
1429        // Errors originating from prepare_certificate may be transient (failure to read
1430        // locks) or non-transient (transaction input is invalid, move vm
1431        // errors). However, all errors from this function occur before we have
1432        // written anything to the db, so we commit the tx guard and rely on the
1433        // client to retry the tx (if it was transient).
1434        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1435            &execution_guard,
1436            certificate,
1437            tx_input_objects,
1438            per_authenticator_inputs,
1439            epoch_store,
1440        ) {
1441            Err(e) => {
1442                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1443                tx_guard.release();
1444                return Err(e);
1445            }
1446            Ok(res) => res,
1447        };
1448
1449        if let Some(expected_effects_digest) = expected_effects_digest {
1450            if effects.digest() != expected_effects_digest {
1451                // We dont want to mask the original error, so we log it and continue.
1452                match self.debug_dump_transaction_state(
1453                    &digest,
1454                    &effects,
1455                    expected_effects_digest,
1456                    &inner_temporary_store,
1457                    certificate,
1458                    &self.config.state_debug_dump_config,
1459                ) {
1460                    Ok(out_path) => {
1461                        info!(
1462                            "Dumped node state for transaction {} to {}",
1463                            digest,
1464                            out_path.as_path().display().to_string()
1465                        );
1466                    }
1467                    Err(e) => {
1468                        error!("Error dumping state for transaction {}: {e}", digest);
1469                    }
1470                }
1471                error!(
1472                    tx_digest = ?digest,
1473                    ?expected_effects_digest,
1474                    actual_effects = ?effects,
1475                    "fork detected!"
1476                );
1477                panic!(
1478                    "Transaction {} is expected to have effects digest {}, but got {}!",
1479                    digest,
1480                    expected_effects_digest,
1481                    effects.digest(),
1482                );
1483            }
1484        }
1485
1486        fail_point!("crash");
1487
1488        self.commit_certificate(
1489            certificate,
1490            inner_temporary_store,
1491            &effects,
1492            tx_guard,
1493            execution_guard,
1494            epoch_store,
1495        )?;
1496
1497        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1498        if elapsed > 0.0 {
1499            self.metrics
1500                .execution_gas_latency_ratio
1501                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1502        };
1503        Ok((effects, execution_error_opt))
1504    }
1505
1506    #[instrument(level = "trace", skip_all)]
1507    fn commit_certificate(
1508        &self,
1509        certificate: &VerifiedExecutableTransaction,
1510        inner_temporary_store: InnerTemporaryStore,
1511        effects: &TransactionEffects,
1512        tx_guard: CertTxGuard,
1513        _execution_guard: ExecutionLockReadGuard<'_>,
1514        epoch_store: &Arc<AuthorityPerEpochStore>,
1515    ) -> IotaResult {
1516        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1517            monitored_scope("Execution::commit_certificate");
1518        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1519
1520        let tx_key = certificate.key();
1521        let tx_digest = certificate.digest();
1522        let input_object_count = inner_temporary_store.input_objects.len();
1523        let shared_object_count = effects.input_shared_objects().len();
1524
1525        let output_keys = inner_temporary_store.get_output_keys(effects);
1526
1527        // index certificate
1528        let _ = self
1529            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1530            .tap_err(|e| {
1531                self.metrics.post_processing_total_failures.inc();
1532                error!(?tx_digest, "tx post processing failed: {e}");
1533            });
1534
1535        // The insertion to epoch_store is not atomic with the insertion to the
1536        // perpetual store. This is OK because we insert to the epoch store
1537        // first. And during lookups we always look up in the perpetual store first.
1538        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1539
1540        // Allow testing what happens if we crash here.
1541        fail_point!("crash");
1542
1543        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1544            certificate.clone().into_unsigned(),
1545            effects.clone(),
1546            inner_temporary_store,
1547        );
1548        self.get_cache_writer()
1549            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1550
1551        if certificate.transaction_data().is_end_of_epoch_tx() {
1552            // At the end of epoch, since system packages may have been upgraded, force
1553            // reload them in the cache.
1554            self.get_object_cache_reader()
1555                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1556        }
1557
1558        // commit_certificate finished, the tx is fully committed to the store.
1559        tx_guard.commit_tx();
1560
1561        // Notifies transaction manager about transaction and output objects committed.
1562        // This provides necessary information to transaction manager to start executing
1563        // additional ready transactions.
1564        self.transaction_manager
1565            .notify_commit(tx_digest, output_keys, epoch_store);
1566
1567        self.update_metrics(certificate, input_object_count, shared_object_count);
1568
1569        Ok(())
1570    }
1571
1572    fn update_metrics(
1573        &self,
1574        certificate: &VerifiedExecutableTransaction,
1575        input_object_count: usize,
1576        shared_object_count: usize,
1577    ) {
1578        // count signature by scheme, for multisig
1579        if certificate.has_upgraded_multisig() {
1580            self.metrics.multisig_sig_count.inc();
1581        }
1582
1583        self.metrics.total_effects.inc();
1584        self.metrics.total_certs.inc();
1585
1586        if shared_object_count > 0 {
1587            self.metrics.shared_obj_tx.inc();
1588        }
1589
1590        if certificate.is_sponsored_tx() {
1591            self.metrics.sponsored_tx.inc();
1592        }
1593
1594        self.metrics
1595            .num_input_objs
1596            .observe(input_object_count as f64);
1597        self.metrics
1598            .num_shared_objects
1599            .observe(shared_object_count as f64);
1600        self.metrics.batch_size.observe(
1601            certificate
1602                .data()
1603                .intent_message()
1604                .value
1605                .kind()
1606                .num_commands() as f64,
1607        );
1608    }
1609
1610    /// prepare_certificate validates the transaction input, and executes the
1611    /// certificate, returning effects, output objects, events, etc.
1612    ///
1613    /// It reads state from the db (both owned and shared locks), but it has no
1614    /// side effects.
1615    ///
1616    /// It can be generally understood that a failure of prepare_certificate
1617    /// indicates a non-transient error, e.g. the transaction input is
1618    /// somehow invalid, the correct locks are not held, etc. However, this
1619    /// is not entirely true, as a transient db read error may also cause
1620    /// this function to fail.
1621    #[instrument(level = "trace", skip_all)]
1622    fn prepare_certificate(
1623        &self,
1624        _execution_guard: &ExecutionLockReadGuard<'_>,
1625        certificate: &VerifiedExecutableTransaction,
1626        tx_input_objects: InputObjects,
1627        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1628        epoch_store: &Arc<AuthorityPerEpochStore>,
1629    ) -> IotaResult<(
1630        InnerTemporaryStore,
1631        TransactionEffects,
1632        Option<ExecutionError>,
1633    )> {
1634        let _scope = monitored_scope("Execution::prepare_certificate");
1635        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1636        let prepare_certificate_start_time = tokio::time::Instant::now();
1637
1638        let protocol_config = epoch_store.protocol_config();
1639
1640        let reference_gas_price = epoch_store.reference_gas_price();
1641
1642        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1643        let epoch_start_timestamp = epoch_store
1644            .epoch_start_config()
1645            .epoch_data()
1646            .epoch_start_timestamp();
1647
1648        let backing_store = self.get_backing_store().as_ref();
1649
1650        let tx_digest = *certificate.digest();
1651
1652        // TODO: We need to move this to a more appropriate place to avoid redundant
1653        // checks.
1654        let tx_data = certificate.data().transaction_data();
1655        tx_data.validity_check(protocol_config)?;
1656
1657        let (kind, signer, gas_data) = tx_data.execution_parts();
1658
1659        let move_authenticators = certificate.move_authenticators();
1660
1661        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1662        let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1663            .is_empty()
1664        {
1665            // No Move authentication required, proceed to execute the transaction directly.
1666
1667            // The cost of partially re-auditing a transaction before execution is
1668            // tolerated.
1669            let (tx_gas_status, tx_checked_input_objects) =
1670                iota_transaction_checks::check_certificate_input(
1671                    certificate,
1672                    tx_input_objects,
1673                    protocol_config,
1674                    reference_gas_price,
1675                )?;
1676
1677            let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1678            self.check_owned_locks(&owned_object_refs)?;
1679            epoch_store.executor().execute_transaction_to_effects(
1680                backing_store,
1681                protocol_config,
1682                self.metrics.limits_metrics.clone(),
1683                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1684                // cyclic dependency w/ iota-adapter
1685                self.config
1686                    .expensive_safety_check_config
1687                    .enable_deep_per_tx_iota_conservation_check(),
1688                self.config.certificate_deny_config.certificate_deny_set(),
1689                &epoch_id,
1690                epoch_start_timestamp,
1691                tx_checked_input_objects,
1692                gas_data,
1693                tx_gas_status,
1694                kind,
1695                signer,
1696                tx_digest,
1697                &mut None,
1698            )
1699        } else {
1700            // One or more `MoveAuthenticator` signatures present — authenticate each and
1701            // then execute the transaction.
1702            // It is supposed that `MoveAuthenticator` availability is checked in
1703            // `SenderSignedData::validity_check`.
1704
1705            debug_assert_eq!(
1706                move_authenticators.len(),
1707                per_authenticator_inputs.len(),
1708                "Move authenticators amount must match the number of authenticator inputs"
1709            );
1710
1711            let per_authenticator_inputs = move_authenticators
1712                .iter()
1713                .zip(per_authenticator_inputs)
1714                .map(
1715                    |(move_authenticator, (authenticator_input_objects, account_object))| {
1716                        // Check basic `object_to_authenticate` preconditions and get its
1717                        // components.
1718                        let (
1719                            auth_account_object_id,
1720                            auth_account_object_seq_number,
1721                            auth_account_object_digest,
1722                        ) = move_authenticator.object_to_authenticate_components()?;
1723
1724                        let signer = move_authenticator.address()?;
1725
1726                        let authenticator_function_ref_for_execution = self.check_move_account(
1727                            auth_account_object_id,
1728                            auth_account_object_seq_number,
1729                            auth_account_object_digest,
1730                            account_object,
1731                            &signer,
1732                        )?;
1733
1734                        Ok((
1735                            authenticator_input_objects,
1736                            authenticator_function_ref_for_execution,
1737                        ))
1738                    },
1739                )
1740                .collect::<IotaResult<Vec<_>>>()?;
1741
1742            let per_authenticator_input_objects = per_authenticator_inputs
1743                .iter()
1744                .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1745                .collect::<Vec<_>>();
1746
1747            // Serialize the TransactionData for the auth context.
1748            let tx_data_bytes =
1749                bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1750
1751            let (sender_auth_digest, sponsor_auth_digest) =
1752                certificate.data().compute_auth_digests()?;
1753
1754            // Check the `MoveAuthenticator` input objects.
1755            // The `MoveAuthenticator` receiving objects are checked on the signing step.
1756            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
1757            // not a part of the transaction data.
1758            let authenticator_gas_budget = protocol_config.max_auth_gas();
1759            let (
1760                gas_status,
1761                per_authenticator_checked_input_objects,
1762                authenticator_and_tx_checked_input_objects,
1763            ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1764                certificate,
1765                tx_input_objects,
1766                per_authenticator_input_objects,
1767                authenticator_gas_budget,
1768                protocol_config,
1769                reference_gas_price,
1770            )?;
1771
1772            debug_assert_eq!(
1773                move_authenticators.len(),
1774                per_authenticator_checked_input_objects.len(),
1775                "Move authenticators amount must match the number of checked authenticator inputs"
1776            );
1777
1778            let move_authenticators = move_authenticators
1779                .into_iter()
1780                .zip(per_authenticator_inputs)
1781                .zip(per_authenticator_checked_input_objects)
1782                .map(
1783                    |(
1784                        (move_authenticator, (_, authenticator_function_ref_for_execution)),
1785                        authenticator_checked_input_objects,
1786                    )| {
1787                        (
1788                            move_authenticator.to_owned(),
1789                            authenticator_function_ref_for_execution,
1790                            authenticator_checked_input_objects,
1791                        )
1792                    },
1793                )
1794                .collect::<Vec<_>>();
1795
1796            let owned_object_refs = authenticator_and_tx_checked_input_objects
1797                .inner()
1798                .filter_owned_objects();
1799            self.check_owned_locks(&owned_object_refs)?;
1800
1801            epoch_store
1802                .executor()
1803                .authenticate_then_execute_transaction_to_effects(
1804                    backing_store,
1805                    protocol_config,
1806                    self.metrics.limits_metrics.clone(),
1807                    self.config
1808                        .expensive_safety_check_config
1809                        .enable_deep_per_tx_iota_conservation_check(),
1810                    self.config.certificate_deny_config.certificate_deny_set(),
1811                    &epoch_id,
1812                    epoch_start_timestamp,
1813                    gas_data,
1814                    gas_status,
1815                    move_authenticators,
1816                    authenticator_and_tx_checked_input_objects,
1817                    kind,
1818                    signer,
1819                    tx_digest,
1820                    tx_data_bytes,
1821                    sender_auth_digest,
1822                    sponsor_auth_digest,
1823                    &mut None,
1824                )
1825        };
1826
1827        fail_point_if!("cp_execution_nondeterminism", || {
1828            #[cfg(msim)]
1829            self.create_fail_state(certificate, epoch_store, &mut effects);
1830        });
1831
1832        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1833        if elapsed > 0.0 {
1834            self.metrics
1835                .prepare_cert_gas_latency_ratio
1836                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1837        }
1838
1839        Ok((inner_temp_store, effects, execution_error_opt.err()))
1840    }
1841
1842    pub fn prepare_certificate_for_benchmark(
1843        &self,
1844        certificate: &VerifiedExecutableTransaction,
1845        input_objects: InputObjects,
1846        epoch_store: &Arc<AuthorityPerEpochStore>,
1847    ) -> IotaResult<(
1848        InnerTemporaryStore,
1849        TransactionEffects,
1850        Option<ExecutionError>,
1851    )> {
1852        let lock = RwLock::new(epoch_store.epoch());
1853        let execution_guard = lock.try_read().unwrap();
1854
1855        self.prepare_certificate(
1856            &execution_guard,
1857            certificate,
1858            input_objects,
1859            vec![],
1860            epoch_store,
1861        )
1862    }
1863
1864    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1865    /// `VmChecks::Enabled` instead.
1866    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1867    #[allow(clippy::type_complexity)]
1868    pub fn dry_exec_transaction(
1869        &self,
1870        transaction: TransactionData,
1871        transaction_digest: TransactionDigest,
1872    ) -> IotaResult<(
1873        DryRunTransactionBlockResponse,
1874        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1875        TransactionEffects,
1876        Option<ObjectID>,
1877    )> {
1878        let epoch_store = self.load_epoch_store_one_call_per_task();
1879        if !self.is_fullnode(&epoch_store) {
1880            return Err(IotaError::UnsupportedFeature {
1881                error: "dry-exec is only supported on fullnodes".to_string(),
1882            });
1883        }
1884
1885        if transaction.kind().is_system() {
1886            return Err(IotaError::UnsupportedFeature {
1887                error: "dry-exec does not support system transactions".to_string(),
1888            });
1889        }
1890
1891        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1892    }
1893
1894    #[allow(clippy::type_complexity)]
1895    pub fn dry_exec_transaction_for_benchmark(
1896        &self,
1897        transaction: TransactionData,
1898        transaction_digest: TransactionDigest,
1899    ) -> IotaResult<(
1900        DryRunTransactionBlockResponse,
1901        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1902        TransactionEffects,
1903        Option<ObjectID>,
1904    )> {
1905        let epoch_store = self.load_epoch_store_one_call_per_task();
1906        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1907    }
1908
1909    #[instrument(level = "trace", skip_all)]
1910    #[allow(clippy::type_complexity)]
1911    fn dry_exec_transaction_impl(
1912        &self,
1913        epoch_store: &AuthorityPerEpochStore,
1914        transaction: TransactionData,
1915        transaction_digest: TransactionDigest,
1916    ) -> IotaResult<(
1917        DryRunTransactionBlockResponse,
1918        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1919        TransactionEffects,
1920        Option<ObjectID>,
1921    )> {
1922        // Cheap validity checks for a transaction, including input size limits.
1923        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1924
1925        let input_object_kinds = transaction.input_objects()?;
1926        let receiving_object_refs = transaction.receiving_objects();
1927
1928        iota_transaction_checks::deny::check_transaction_for_signing(
1929            &transaction,
1930            &[],
1931            &input_object_kinds,
1932            &receiving_object_refs,
1933            &self.config.transaction_deny_config,
1934            self.get_backing_package_store().as_ref(),
1935        )?;
1936
1937        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1938            // We don't want to cache this transaction since it's a dry run.
1939            None,
1940            &input_object_kinds,
1941            &receiving_object_refs,
1942            epoch_store.epoch(),
1943        )?;
1944
1945        // make a gas object if one was not provided
1946        let mut transaction = transaction;
1947        let reference_gas_price = epoch_store.reference_gas_price();
1948        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1949            let sender = transaction.gas_owner();
1950            let gas_object_id = ObjectID::random();
1951            let gas_object = Object::new_move(
1952                MoveObject::new_gas_coin(
1953                    OBJECT_START_VERSION,
1954                    gas_object_id,
1955                    SIMULATION_GAS_COIN_VALUE,
1956                ),
1957                Owner::Address(sender),
1958                TransactionDigest::GENESIS_MARKER,
1959            );
1960            let gas_object_ref = gas_object.compute_object_reference();
1961            // Add gas object to transaction gas payment
1962            transaction.gas_data_mut().objects = vec![gas_object_ref];
1963            (
1964                iota_transaction_checks::check_transaction_input_with_given_gas(
1965                    epoch_store.protocol_config(),
1966                    reference_gas_price,
1967                    &transaction,
1968                    input_objects,
1969                    receiving_objects,
1970                    gas_object,
1971                    &self.metrics.bytecode_verifier_metrics,
1972                    &self.config.verifier_signing_config,
1973                )?,
1974                Some(gas_object_id),
1975            )
1976        } else {
1977            // `MoveAuthenticator`s are not supported in dry runs, so we set the
1978            // `authenticator_gas_budget` to 0.
1979            let authenticator_gas_budget = 0;
1980
1981            (
1982                iota_transaction_checks::check_transaction_input(
1983                    epoch_store.protocol_config(),
1984                    reference_gas_price,
1985                    &transaction,
1986                    input_objects,
1987                    &receiving_objects,
1988                    &self.metrics.bytecode_verifier_metrics,
1989                    &self.config.verifier_signing_config,
1990                    authenticator_gas_budget,
1991                )?,
1992                None,
1993            )
1994        };
1995
1996        let protocol_config = epoch_store.protocol_config();
1997        let (kind, signer, gas_data) = transaction.execution_parts();
1998
1999        let silent = true;
2000        let executor = iota_execution::executor(protocol_config, silent, None)
2001            .expect("Creating an executor should not fail here");
2002
2003        let expensive_checks = false;
2004        let (inner_temp_store, _, effects, execution_error) = executor
2005            .execute_transaction_to_effects(
2006                self.get_backing_store().as_ref(),
2007                protocol_config,
2008                self.metrics.limits_metrics.clone(),
2009                expensive_checks,
2010                self.config.certificate_deny_config.certificate_deny_set(),
2011                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2012                epoch_store
2013                    .epoch_start_config()
2014                    .epoch_data()
2015                    .epoch_start_timestamp(),
2016                checked_input_objects,
2017                gas_data,
2018                gas_status,
2019                kind,
2020                signer,
2021                transaction_digest,
2022                &mut None,
2023            );
2024        let tx_digest = *effects.transaction_digest();
2025
2026        let module_cache =
2027            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2028
2029        let mut layout_resolver =
2030            epoch_store
2031                .executor()
2032                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2033                    &inner_temp_store,
2034                    self.get_backing_package_store(),
2035                )));
2036        // Returning empty vector here because we recalculate changes in the rpc layer.
2037        let object_changes = Vec::new();
2038
2039        // Returning empty vector here because we recalculate changes in the rpc layer.
2040        let balance_changes = Vec::new();
2041
2042        let written_with_kind = effects
2043            .created()
2044            .into_iter()
2045            .map(|(oref, _)| (oref, WriteKind::Create))
2046            .chain(
2047                effects
2048                    .unwrapped()
2049                    .into_iter()
2050                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2051            )
2052            .chain(
2053                effects
2054                    .mutated()
2055                    .into_iter()
2056                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2057            )
2058            .map(|(oref, kind)| {
2059                let obj = inner_temp_store.written.get(&oref.object_id).unwrap();
2060                // TODO: Avoid clones.
2061                (oref.object_id, (oref, obj.clone(), kind))
2062            })
2063            .collect();
2064
2065        let execution_error_source = execution_error
2066            .as_ref()
2067            .err()
2068            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2069
2070        Ok((
2071            DryRunTransactionBlockResponse {
2072                // to avoid cloning `transaction`, fields are populated in this order
2073                suggested_gas_price: self
2074                    .congestion_tracker
2075                    .get_prediction_suggested_gas_price(&transaction),
2076                input: IotaTransactionBlockData::try_from_with_module_cache(
2077                    transaction,
2078                    &module_cache,
2079                    tx_digest,
2080                )
2081                .map_err(|e| IotaError::TransactionSerialization {
2082                    error: format!(
2083                        "Failed to convert transaction to IotaTransactionBlockData: {e}",
2084                    ),
2085                })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
2086                effects: effects.clone().try_into()?,
2087                events: IotaTransactionBlockEvents::try_from(
2088                    inner_temp_store.events.clone(),
2089                    tx_digest,
2090                    None,
2091                    layout_resolver.as_mut(),
2092                )?,
2093                object_changes,
2094                balance_changes,
2095                execution_error_source,
2096            },
2097            written_with_kind,
2098            effects,
2099            mock_gas,
2100        ))
2101    }
2102
2103    pub fn simulate_transaction(
2104        &self,
2105        mut transaction: TransactionData,
2106        checks: VmChecks,
2107    ) -> IotaResult<SimulateTransactionResult> {
2108        if transaction.kind().is_system() {
2109            return Err(IotaError::UnsupportedFeature {
2110                error: "simulate does not support system transactions".to_string(),
2111            });
2112        }
2113
2114        let epoch_store = self.load_epoch_store_one_call_per_task();
2115        if !self.is_fullnode(&epoch_store) {
2116            return Err(IotaError::UnsupportedFeature {
2117                error: "simulate is only supported on fullnodes".to_string(),
2118            });
2119        }
2120
2121        // Cheap validity checks for a transaction, including input size limits.
2122        // This does not check if gas objects are missing since we may create a
2123        // mock gas object. It checks for other transaction input validity.
2124        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2125
2126        let input_object_kinds = transaction.input_objects()?;
2127        let receiving_object_refs = transaction.receiving_objects();
2128
2129        // Since we need to simulate a validator signing the transaction, the first step
2130        // is to check if some transaction elements are denied.
2131        iota_transaction_checks::deny::check_transaction_for_signing(
2132            &transaction,
2133            &[],
2134            &input_object_kinds,
2135            &receiving_object_refs,
2136            &self.config.transaction_deny_config,
2137            self.get_backing_package_store().as_ref(),
2138        )?;
2139
2140        // Load input and receiving objects
2141        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2142            // We don't want to cache this transaction since it's a simulation.
2143            None,
2144            &input_object_kinds,
2145            &receiving_object_refs,
2146            epoch_store.epoch(),
2147        )?;
2148
2149        // Create a mock gas object if one was not provided
2150        let mock_gas_id = if transaction.gas().is_empty() {
2151            let mock_gas_object = Object::new_move(
2152                MoveObject::new_gas_coin(
2153                    OBJECT_START_VERSION,
2154                    ObjectID::MAX,
2155                    SIMULATION_GAS_COIN_VALUE,
2156                ),
2157                Owner::Address(transaction.gas_data().owner),
2158                TransactionDigest::GENESIS_MARKER,
2159            );
2160            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2161            transaction.gas_data_mut().objects = vec![mock_gas_object_ref];
2162            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2163            Some(mock_gas_object.id())
2164        } else {
2165            None
2166        };
2167
2168        let protocol_config = epoch_store.protocol_config();
2169
2170        // `MoveAuthenticator`s are not supported in simulation, so we set the
2171        // `authenticator_gas_budget` to 0.
2172        let authenticator_gas_budget = 0;
2173
2174        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
2175        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
2176        let (gas_status, checked_input_objects) = if checks.enabled() {
2177            iota_transaction_checks::check_transaction_input(
2178                protocol_config,
2179                epoch_store.reference_gas_price(),
2180                &transaction,
2181                input_objects,
2182                &receiving_objects,
2183                &self.metrics.bytecode_verifier_metrics,
2184                &self.config.verifier_signing_config,
2185                authenticator_gas_budget,
2186            )?
2187        } else {
2188            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2189                protocol_config,
2190                transaction.kind(),
2191                input_objects,
2192                receiving_objects,
2193            )?;
2194            let gas_status = IotaGasStatus::new(
2195                transaction.gas_budget(),
2196                transaction.gas_price(),
2197                epoch_store.reference_gas_price(),
2198                protocol_config,
2199            )?;
2200
2201            (gas_status, checked_input_objects)
2202        };
2203
2204        // Create a new executor for the simulation
2205        let executor = iota_execution::executor(
2206            protocol_config,
2207            true, // silent
2208            None,
2209        )
2210        .expect("Creating an executor should not fail here");
2211
2212        // Execute the simulation
2213        let (kind, signer, gas_data) = transaction.execution_parts();
2214        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2215            self.get_backing_store().as_ref(),
2216            protocol_config,
2217            self.metrics.limits_metrics.clone(),
2218            false, // expensive_checks
2219            self.config.certificate_deny_config.certificate_deny_set(),
2220            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2221            epoch_store
2222                .epoch_start_config()
2223                .epoch_data()
2224                .epoch_start_timestamp(),
2225            checked_input_objects,
2226            gas_data,
2227            gas_status,
2228            kind,
2229            signer,
2230            transaction.digest(),
2231            checks.disabled(),
2232        );
2233
2234        // In the case of a dev inspect, the execution_result could be filled with some
2235        // values. Else, execution_result is empty in the case of a dry run.
2236        Ok(SimulateTransactionResult {
2237            input_objects: inner_temp_store.input_objects,
2238            output_objects: inner_temp_store.written,
2239            events: effects.events_digest().map(|_| inner_temp_store.events),
2240            effects,
2241            execution_result,
2242            suggested_gas_price: self
2243                .congestion_tracker
2244                .get_prediction_suggested_gas_price(&transaction),
2245            mock_gas_id,
2246        })
2247    }
2248
2249    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2250    /// `VmChecks::DISABLED` instead.
2251    /// The object ID for gas can be any
2252    /// object ID, even for an uncreated object
2253    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2254    pub async fn dev_inspect_transaction_block(
2255        &self,
2256        sender: IotaAddress,
2257        transaction_kind: TransactionKind,
2258        gas_price: Option<u64>,
2259        gas_budget: Option<u64>,
2260        gas_sponsor: Option<IotaAddress>,
2261        gas_objects: Option<Vec<ObjectRef>>,
2262        show_raw_txn_data_and_effects: Option<bool>,
2263        skip_checks: Option<bool>,
2264    ) -> IotaResult<DevInspectResults> {
2265        let epoch_store = self.load_epoch_store_one_call_per_task();
2266
2267        if !self.is_fullnode(&epoch_store) {
2268            return Err(IotaError::UnsupportedFeature {
2269                error: "dev-inspect is only supported on fullnodes".to_string(),
2270            });
2271        }
2272
2273        if transaction_kind.is_system() {
2274            return Err(IotaError::UnsupportedFeature {
2275                error: "system transactions are not supported".to_string(),
2276            });
2277        }
2278
2279        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2280        let skip_checks = skip_checks.unwrap_or(true);
2281        let reference_gas_price = epoch_store.reference_gas_price();
2282        let protocol_config = epoch_store.protocol_config();
2283        let max_tx_gas = protocol_config.max_tx_gas();
2284
2285        let price = gas_price.unwrap_or(reference_gas_price);
2286        let budget = gas_budget.unwrap_or(max_tx_gas);
2287        let owner = gas_sponsor.unwrap_or(sender);
2288        // Payment might be empty here, but it's fine we'll have to deal with it later
2289        // after reading all the input objects.
2290        let payment = gas_objects.unwrap_or_default();
2291        let mut transaction = TransactionData::V1(TransactionDataV1 {
2292            kind: transaction_kind.clone(),
2293            sender,
2294            gas_payment: GasData {
2295                objects: payment,
2296                owner,
2297                price,
2298                budget,
2299            },
2300            expiration: TransactionExpiration::None,
2301        });
2302
2303        let raw_txn_data = if show_raw_txn_data_and_effects {
2304            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2305                error: "Failed to serialize transaction during dev inspect".to_string(),
2306            })?
2307        } else {
2308            vec![]
2309        };
2310
2311        transaction.validity_check_no_gas_check(protocol_config)?;
2312
2313        let input_object_kinds = transaction.input_objects()?;
2314        let receiving_object_refs = transaction.receiving_objects();
2315
2316        iota_transaction_checks::deny::check_transaction_for_signing(
2317            &transaction,
2318            &[],
2319            &input_object_kinds,
2320            &receiving_object_refs,
2321            &self.config.transaction_deny_config,
2322            self.get_backing_package_store().as_ref(),
2323        )?;
2324
2325        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2326            // We don't want to cache this transaction since it's a dev inspect.
2327            None,
2328            &input_object_kinds,
2329            &receiving_object_refs,
2330            epoch_store.epoch(),
2331        )?;
2332
2333        let (gas_status, checked_input_objects) = if skip_checks {
2334            // If we are skipping checks, then we call the check_dev_inspect_input function
2335            // which will perform only lightweight checks on the transaction
2336            // input. And if the gas field is empty, that means we will
2337            // use the dummy gas object so we need to add it to the input objects vector.
2338            if transaction.gas().is_empty() {
2339                // Create and use a dummy gas object if there is no gas object provided.
2340                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2341                    SIMULATION_GAS_COIN_VALUE,
2342                    transaction.gas_owner(),
2343                );
2344                let gas_object_ref = dummy_gas_object.compute_object_reference();
2345                transaction.gas_data_mut().objects = vec![gas_object_ref];
2346                input_objects.push(ObjectReadResult::new(
2347                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2348                    dummy_gas_object.into(),
2349                ));
2350            }
2351            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2352                protocol_config,
2353                &transaction_kind,
2354                input_objects,
2355                receiving_objects,
2356            )?;
2357            let gas_status = IotaGasStatus::new(
2358                max_tx_gas,
2359                transaction.gas_price(),
2360                reference_gas_price,
2361                protocol_config,
2362            )?;
2363
2364            (gas_status, checked_input_objects)
2365        } else {
2366            // If we are not skipping checks, then we call the check_transaction_input
2367            // function and its dummy gas variant which will perform full
2368            // fledged checks just like a real transaction execution.
2369            if transaction.gas().is_empty() {
2370                // Create and use a dummy gas object if there is no gas object provided.
2371                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2372                    SIMULATION_GAS_COIN_VALUE,
2373                    transaction.gas_owner(),
2374                );
2375                let gas_object_ref = dummy_gas_object.compute_object_reference();
2376                transaction.gas_data_mut().objects = vec![gas_object_ref];
2377                iota_transaction_checks::check_transaction_input_with_given_gas(
2378                    epoch_store.protocol_config(),
2379                    reference_gas_price,
2380                    &transaction,
2381                    input_objects,
2382                    receiving_objects,
2383                    dummy_gas_object,
2384                    &self.metrics.bytecode_verifier_metrics,
2385                    &self.config.verifier_signing_config,
2386                )?
2387            } else {
2388                // `MoveAuthenticator`s are not supported in dev inspects, so we set the
2389                // `authenticator_gas_budget` to 0.
2390                let authenticator_gas_budget = 0;
2391
2392                iota_transaction_checks::check_transaction_input(
2393                    epoch_store.protocol_config(),
2394                    reference_gas_price,
2395                    &transaction,
2396                    input_objects,
2397                    &receiving_objects,
2398                    &self.metrics.bytecode_verifier_metrics,
2399                    &self.config.verifier_signing_config,
2400                    authenticator_gas_budget,
2401                )?
2402            }
2403        };
2404
2405        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2406            .expect("Creating an executor should not fail here");
2407        let gas_data = transaction.gas_data().clone();
2408        let intent_msg = IntentMessage::new(
2409            Intent {
2410                version: IntentVersion::V0,
2411                scope: IntentScope::TransactionData,
2412                app_id: IntentAppId::Iota,
2413            },
2414            transaction,
2415        );
2416        let transaction_digest = TransactionDigest::new(intent_msg.value.digest().into_inner());
2417        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2418            self.get_backing_store().as_ref(),
2419            protocol_config,
2420            self.metrics.limits_metrics.clone(),
2421            // expensive checks
2422            false,
2423            self.config.certificate_deny_config.certificate_deny_set(),
2424            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2425            epoch_store
2426                .epoch_start_config()
2427                .epoch_data()
2428                .epoch_start_timestamp(),
2429            checked_input_objects,
2430            gas_data,
2431            gas_status,
2432            transaction_kind,
2433            sender,
2434            transaction_digest,
2435            skip_checks,
2436        );
2437
2438        let raw_effects = if show_raw_txn_data_and_effects {
2439            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2440                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2441            })?
2442        } else {
2443            vec![]
2444        };
2445
2446        let mut layout_resolver =
2447            epoch_store
2448                .executor()
2449                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2450                    &inner_temp_store,
2451                    self.get_backing_package_store(),
2452                )));
2453
2454        DevInspectResults::new(
2455            effects,
2456            inner_temp_store.events.clone(),
2457            execution_result,
2458            raw_txn_data,
2459            raw_effects,
2460            layout_resolver.as_mut(),
2461        )
2462    }
2463
2464    // Only used for testing because of how epoch store is loaded.
2465    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2466        let epoch_store = self.epoch_store_for_testing();
2467        Ok(epoch_store.reference_gas_price())
2468    }
2469
2470    #[instrument(level = "trace", skip_all)]
2471    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2472        self.get_transaction_cache_reader()
2473            .try_is_tx_already_executed(digest)
2474    }
2475
2476    /// Non-fallible version of `try_is_tx_already_executed`.
2477    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2478        self.try_is_tx_already_executed(digest)
2479            .expect("storage access failed")
2480    }
2481
2482    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2483    #[instrument(level = "debug", skip_all, err)]
2484    fn index_tx(
2485        &self,
2486        indexes: &IndexStore,
2487        digest: &TransactionDigest,
2488        // TODO: index_tx really just need the transaction data here.
2489        cert: &VerifiedExecutableTransaction,
2490        effects: &TransactionEffects,
2491        events: &TransactionEvents,
2492        timestamp_ms: u64,
2493        tx_coins: Option<TxCoins>,
2494        written: &WrittenObjects,
2495        inner_temporary_store: &InnerTemporaryStore,
2496    ) -> IotaResult<u64> {
2497        let changes = self
2498            .process_object_index(effects, written, inner_temporary_store)
2499            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2500
2501        indexes.index_tx(
2502            cert.data().intent_message().value.sender(),
2503            cert.data()
2504                .intent_message()
2505                .value
2506                .input_objects()?
2507                .iter()
2508                .map(|o| o.object_id()),
2509            effects
2510                .all_changed_objects()
2511                .into_iter()
2512                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2513            cert.data()
2514                .intent_message()
2515                .value
2516                .move_calls()
2517                .into_iter()
2518                .map(|(package, module, function)| {
2519                    (*package, module.to_owned(), function.to_owned())
2520                }),
2521            events,
2522            changes,
2523            digest,
2524            timestamp_ms,
2525            tx_coins,
2526        )
2527    }
2528
2529    #[cfg(msim)]
2530    fn create_fail_state(
2531        &self,
2532        certificate: &VerifiedExecutableTransaction,
2533        epoch_store: &Arc<AuthorityPerEpochStore>,
2534        effects: &mut TransactionEffects,
2535    ) {
2536        use std::cell::RefCell;
2537
2538        use iota_types::effects::TransactionEffectsAPIForTesting;
2539        thread_local! {
2540            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2541        }
2542        if !certificate.data().intent_message().value.is_system_tx() {
2543            let committee = epoch_store.committee();
2544            let cur_stake = (**committee).weight(&self.name);
2545            if cur_stake > 0 {
2546                FAIL_STATE.with_borrow_mut(|fail_state| {
2547                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2548                    if fail_state.0 < committee.validity_threshold() {
2549                        fail_state.0 += cur_stake;
2550                        fail_state.1.insert(self.name);
2551                    }
2552
2553                    if fail_state.1.contains(&self.name) {
2554                        info!("cp_exec failing tx");
2555                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2556                    }
2557                });
2558            }
2559        }
2560    }
2561
2562    fn process_object_index(
2563        &self,
2564        effects: &TransactionEffects,
2565        written: &WrittenObjects,
2566        inner_temporary_store: &InnerTemporaryStore,
2567    ) -> IotaResult<ObjectIndexChanges> {
2568        let epoch_store = self.load_epoch_store_one_call_per_task();
2569        let mut layout_resolver =
2570            epoch_store
2571                .executor()
2572                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2573                    inner_temporary_store,
2574                    self.get_backing_package_store(),
2575                )));
2576
2577        let modified_at_version = effects
2578            .modified_at_versions()
2579            .into_iter()
2580            .collect::<HashMap<_, _>>();
2581
2582        let tx_digest = effects.transaction_digest();
2583        let mut deleted_owners = vec![];
2584        let mut deleted_dynamic_fields = vec![];
2585        for object_ref in effects.deleted().into_iter().chain(effects.wrapped()) {
2586            let old_version = modified_at_version.get(&object_ref.object_id).unwrap();
2587            // When we process the index, the latest object hasn't been written yet so
2588            // the old object must be present.
2589            match self.get_owner_at_version(&object_ref.object_id, *old_version).unwrap_or_else(
2590                |e| panic!("tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {} at version {old_version:?}. Err: {e:?}",object_ref.object_id)
2591            ) {
2592                Owner::Address(addr) => deleted_owners.push((addr, object_ref.object_id)),
2593                Owner::Object(object_id) => {
2594                    deleted_dynamic_fields.push((object_id, object_ref.object_id))
2595                }
2596                _ => {}
2597            }
2598        }
2599
2600        let mut new_owners = vec![];
2601        let mut new_dynamic_fields = vec![];
2602
2603        for (oref, owner, kind) in effects.all_changed_objects() {
2604            let id = &oref.object_id;
2605            // For mutated objects, retrieve old owner and delete old index if there is a
2606            // owner change.
2607            if let WriteKind::Mutate = kind {
2608                let Some(old_version) = modified_at_version.get(id) else {
2609                    panic!(
2610                        "tx_digest={tx_digest}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2611                    );
2612                };
2613                // When we process the index, the latest object hasn't been written yet so
2614                // the old object must be present.
2615                let Some(old_object) = self
2616                    .get_object_store()
2617                    .try_get_object_by_key(id, *old_version)?
2618                else {
2619                    panic!(
2620                        "tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {id} at version {old_version:?}"
2621                    );
2622                };
2623                if old_object.owner != owner {
2624                    match old_object.owner {
2625                        Owner::Address(addr) => {
2626                            deleted_owners.push((addr, *id));
2627                        }
2628                        Owner::Object(object_id) => deleted_dynamic_fields.push((object_id, *id)),
2629                        _ => {}
2630                    }
2631                }
2632            }
2633
2634            match owner {
2635                Owner::Address(addr) => {
2636                    // TODO: We can remove the object fetching after we added ObjectType to
2637                    // TransactionEffects
2638                    let new_object = written.get(id).unwrap_or_else(
2639                        || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2640                    );
2641                    assert_eq!(
2642                        new_object.version(),
2643                        oref.version,
2644                        "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2645                        tx_digest,
2646                        id,
2647                        new_object.version(),
2648                        oref.version
2649                    );
2650
2651                    let type_ = new_object
2652                        .type_()
2653                        .map(|type_| ObjectType::Struct(type_.clone()))
2654                        .unwrap_or(ObjectType::Package);
2655
2656                    new_owners.push((
2657                        (addr, *id),
2658                        ObjectInfo {
2659                            object_id: *id,
2660                            version: oref.version,
2661                            digest: oref.digest,
2662                            type_,
2663                            owner,
2664                            previous_transaction: *effects.transaction_digest(),
2665                        },
2666                    ));
2667                }
2668                Owner::Object(owner) => {
2669                    let new_object = written.get(id).unwrap_or_else(
2670                        || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2671                    );
2672                    assert_eq!(
2673                        new_object.version(),
2674                        oref.version,
2675                        "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2676                        tx_digest,
2677                        id,
2678                        new_object.version(),
2679                        oref.version
2680                    );
2681
2682                    let Some(df_info) = self
2683                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2684                        .unwrap_or_else(|e| {
2685                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2686                            None
2687                        }
2688                    )
2689                        else {
2690                            // Skip indexing for non dynamic field objects.
2691                            continue;
2692                        };
2693                    new_dynamic_fields.push(((owner, *id), df_info))
2694                }
2695                _ => {}
2696            }
2697        }
2698
2699        Ok(ObjectIndexChanges {
2700            deleted_owners,
2701            deleted_dynamic_fields,
2702            new_owners,
2703            new_dynamic_fields,
2704        })
2705    }
2706
2707    fn try_create_dynamic_field_info(
2708        &self,
2709        o: &Object,
2710        written: &WrittenObjects,
2711        resolver: &mut dyn LayoutResolver,
2712    ) -> IotaResult<Option<DynamicFieldInfo>> {
2713        // Skip if not a move object
2714        let Some(move_object) = o.data.as_struct_opt().cloned() else {
2715            return Ok(None);
2716        };
2717
2718        // We only index dynamic field objects
2719        if !move_object.struct_tag().is_dynamic_field() {
2720            return Ok(None);
2721        }
2722
2723        let layout = resolver
2724            .get_annotated_layout(move_object.struct_tag())?
2725            .into_layout();
2726
2727        let field =
2728            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2729                IotaError::ObjectDeserialization {
2730                    error: e.to_string(),
2731                }
2732            })?;
2733
2734        let type_ = field.kind;
2735        let name_type: TypeTag = type_tag_core_to_sdk(&field.name_layout.into());
2736        let bcs_name = field.name_bytes.to_owned();
2737
2738        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2739            .map_err(|e| {
2740                warn!("{e}");
2741                IotaError::ObjectDeserialization {
2742                    error: e.to_string(),
2743                }
2744            })?;
2745
2746        let name = DynamicFieldName {
2747            type_: name_type,
2748            value: IotaMoveValue::from(name_value).to_json_value(),
2749        };
2750
2751        let value_metadata = field.value_metadata().map_err(|e| {
2752            warn!("{e}");
2753            IotaError::ObjectDeserialization {
2754                error: e.to_string(),
2755            }
2756        })?;
2757
2758        Ok(Some(match value_metadata {
2759            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2760                name,
2761                bcs_name,
2762                type_,
2763                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2764                object_id: o.id(),
2765                version: o.version(),
2766                digest: o.digest(),
2767            },
2768
2769            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2770                // Find the actual object from storage using the object id obtained from the
2771                // wrapper.
2772
2773                // Try to find the object in the written objects first.
2774                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2775                    (
2776                        object.version(),
2777                        object.digest(),
2778                        object.data.object_type().unwrap().clone(),
2779                    )
2780                } else {
2781                    // If not found, try to find it in the database.
2782                    let object = self
2783                        .get_object_store()
2784                        .try_get_object_by_key(&object_id, o.version())?
2785                        .ok_or_else(|| UserInputError::ObjectNotFound {
2786                            object_id,
2787                            version: Some(o.version()),
2788                        })?;
2789                    let version = object.version();
2790                    let digest = object.digest();
2791                    let object_type = object.data.object_type().unwrap().clone();
2792                    (version, digest, object_type)
2793                };
2794
2795                DynamicFieldInfo {
2796                    name,
2797                    bcs_name,
2798                    type_,
2799                    object_type: object_type.to_string(),
2800                    object_id,
2801                    version,
2802                    digest,
2803                }
2804            }
2805        }))
2806    }
2807
2808    #[instrument(level = "trace", skip_all, err)]
2809    fn post_process_one_tx(
2810        &self,
2811        certificate: &VerifiedExecutableTransaction,
2812        effects: &TransactionEffects,
2813        inner_temporary_store: &InnerTemporaryStore,
2814        epoch_store: &Arc<AuthorityPerEpochStore>,
2815    ) -> IotaResult {
2816        if self.indexes.is_none() {
2817            return Ok(());
2818        }
2819
2820        let _scope = monitored_scope("Execution::post_process_one_tx");
2821
2822        let tx_digest = certificate.digest();
2823        let timestamp_ms = Self::unixtime_now_ms();
2824        let events = &inner_temporary_store.events;
2825        let written = &inner_temporary_store.written;
2826        let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2827            effects,
2828            inner_temporary_store,
2829            epoch_store,
2830        );
2831
2832        // Index tx
2833        if let Some(indexes) = &self.indexes {
2834            let _ = self
2835                .index_tx(
2836                    indexes.as_ref(),
2837                    tx_digest,
2838                    certificate,
2839                    effects,
2840                    events,
2841                    timestamp_ms,
2842                    tx_coins,
2843                    written,
2844                    inner_temporary_store,
2845                )
2846                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2847                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2848                .expect("Indexing tx should not fail");
2849
2850            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2851            let events = self.make_transaction_block_events(
2852                events.clone(),
2853                *tx_digest,
2854                timestamp_ms,
2855                epoch_store,
2856                inner_temporary_store,
2857            )?;
2858            // Emit events
2859            self.subscription_handler
2860                .process_tx(certificate.data().transaction_data(), &effects, &events)
2861                .tap_ok(|_| {
2862                    self.metrics
2863                        .post_processing_total_tx_had_event_processed
2864                        .inc()
2865                })
2866                .tap_err(|e| {
2867                    warn!(
2868                        ?tx_digest,
2869                        "Post processing - Couldn't process events for tx: {}", e
2870                    )
2871                })?;
2872
2873            self.metrics
2874                .post_processing_total_events_emitted
2875                .inc_by(events.data.len() as u64);
2876        };
2877        Ok(())
2878    }
2879
2880    fn make_transaction_block_events(
2881        &self,
2882        transaction_events: TransactionEvents,
2883        digest: TransactionDigest,
2884        timestamp_ms: u64,
2885        epoch_store: &Arc<AuthorityPerEpochStore>,
2886        inner_temporary_store: &InnerTemporaryStore,
2887    ) -> IotaResult<IotaTransactionBlockEvents> {
2888        let mut layout_resolver =
2889            epoch_store
2890                .executor()
2891                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2892                    inner_temporary_store,
2893                    self.get_backing_package_store(),
2894                )));
2895        IotaTransactionBlockEvents::try_from(
2896            transaction_events,
2897            digest,
2898            Some(timestamp_ms),
2899            layout_resolver.as_mut(),
2900        )
2901    }
2902
2903    pub fn unixtime_now_ms() -> u64 {
2904        let now = SystemTime::now()
2905            .duration_since(UNIX_EPOCH)
2906            .expect("Time went backwards")
2907            .as_millis();
2908        u64::try_from(now).expect("Travelling in time machine")
2909    }
2910
2911    #[instrument(level = "trace", skip_all)]
2912    pub async fn handle_transaction_info_request(
2913        &self,
2914        request: TransactionInfoRequest,
2915    ) -> IotaResult<TransactionInfoResponse> {
2916        let epoch_store = self.load_epoch_store_one_call_per_task();
2917        let (transaction, status) = self
2918            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2919            .ok_or(IotaError::TransactionNotFound {
2920                digest: request.transaction_digest,
2921            })?;
2922        Ok(TransactionInfoResponse {
2923            transaction,
2924            status,
2925        })
2926    }
2927
2928    #[instrument(level = "trace", skip_all)]
2929    pub async fn handle_object_info_request(
2930        &self,
2931        request: ObjectInfoRequest,
2932    ) -> IotaResult<ObjectInfoResponse> {
2933        let epoch_store = self.load_epoch_store_one_call_per_task();
2934
2935        let requested_object_seq = match request.request_kind {
2936            ObjectInfoRequestKind::LatestObjectInfo => {
2937                self.try_get_object_or_tombstone(request.object_id)
2938                    .await?
2939                    .ok_or_else(|| {
2940                        IotaError::from(UserInputError::ObjectNotFound {
2941                            object_id: request.object_id,
2942                            version: None,
2943                        })
2944                    })?
2945                    .version
2946            }
2947            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2948        };
2949
2950        let object = self
2951            .get_object_store()
2952            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2953            .ok_or_else(|| {
2954                IotaError::from(UserInputError::ObjectNotFound {
2955                    object_id: request.object_id,
2956                    version: Some(requested_object_seq),
2957                })
2958            })?;
2959
2960        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2961            (request.generate_layout, object.data.as_struct_opt())
2962        {
2963            Some(into_struct_layout(
2964                epoch_store
2965                    .executor()
2966                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2967                    .get_annotated_layout(move_obj.struct_tag())?,
2968            )?)
2969        } else {
2970            None
2971        };
2972
2973        let lock = if !object.is_address_owned() {
2974            // Only address owned objects have locks.
2975            None
2976        } else {
2977            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2978                .await?
2979                .map(|s| s.into_inner())
2980        };
2981
2982        Ok(ObjectInfoResponse {
2983            object,
2984            layout,
2985            lock_for_debugging: lock,
2986        })
2987    }
2988
2989    #[instrument(level = "trace", skip_all)]
2990    pub fn handle_checkpoint_request(
2991        &self,
2992        request: &CheckpointRequest,
2993    ) -> IotaResult<CheckpointResponse> {
2994        let summary = if request.certified {
2995            let summary = match request.sequence_number {
2996                Some(seq) => self
2997                    .checkpoint_store
2998                    .get_checkpoint_by_sequence_number(seq)?,
2999                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3000            }
3001            .map(|v| v.into_inner());
3002            summary.map(CheckpointSummaryResponse::Certified)
3003        } else {
3004            let summary = match request.sequence_number {
3005                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3006                None => self
3007                    .checkpoint_store
3008                    .get_latest_locally_computed_checkpoint()?,
3009            };
3010            summary.map(CheckpointSummaryResponse::Pending)
3011        };
3012        let contents = match &summary {
3013            Some(s) => self
3014                .checkpoint_store
3015                .get_checkpoint_contents(&s.content_digest())?,
3016            None => None,
3017        };
3018        Ok(CheckpointResponse {
3019            checkpoint: summary,
3020            contents,
3021        })
3022    }
3023
3024    fn check_protocol_version(
3025        supported_protocol_versions: SupportedProtocolVersions,
3026        current_version: ProtocolVersion,
3027    ) {
3028        info!("current protocol version is now {:?}", current_version);
3029        info!("supported versions are: {:?}", supported_protocol_versions);
3030        if !supported_protocol_versions.is_version_supported(current_version) {
3031            let msg = format!(
3032                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3033            );
3034
3035            error!("{}", msg);
3036            eprintln!("{msg}");
3037
3038            #[cfg(not(msim))]
3039            std::process::exit(1);
3040
3041            #[cfg(msim)]
3042            iota_simulator::task::shutdown_current_node();
3043        }
3044    }
3045
3046    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
3047    pub async fn new(
3048        name: AuthorityName,
3049        secret: StableSyncAuthoritySigner,
3050        supported_protocol_versions: SupportedProtocolVersions,
3051        store: Arc<AuthorityStore>,
3052        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3053        epoch_store: Arc<AuthorityPerEpochStore>,
3054        committee_store: Arc<CommitteeStore>,
3055        indexes: Option<Arc<IndexStore>>,
3056        grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3057        checkpoint_store: Arc<CheckpointStore>,
3058        prometheus_registry: &Registry,
3059        genesis_objects: &[Object],
3060        db_checkpoint_config: &DBCheckpointConfig,
3061        config: NodeConfig,
3062        archive_readers: ArchiveReaderBalancer,
3063        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3064        chain_identifier: ChainIdentifier,
3065        pruner_db: Option<Arc<AuthorityPrunerTables>>,
3066        checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
3067    ) -> Arc<Self> {
3068        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3069
3070        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3071        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3072        let transaction_manager = Arc::new(TransactionManager::new(
3073            execution_cache_trait_pointers.object_cache_reader.clone(),
3074            execution_cache_trait_pointers
3075                .transaction_cache_reader
3076                .clone(),
3077            &epoch_store,
3078            tx_ready_certificates,
3079            metrics.clone(),
3080        ));
3081        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3082
3083        let authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3084            epoch_store.get_parent_path(),
3085            config
3086                .authority_store_pruning_config
3087                .num_latest_epoch_dbs_to_retain,
3088        )
3089        .await;
3090        let _pruner = AuthorityStorePruner::new(
3091            store.perpetual_tables.clone(),
3092            checkpoint_store.clone(),
3093            grpc_indexes_store.clone(),
3094            indexes.clone(),
3095            config.authority_store_pruning_config.clone(),
3096            epoch_store.committee().authority_exists(&name),
3097            epoch_store.epoch_start_state().epoch_duration_ms(),
3098            prometheus_registry,
3099            archive_readers,
3100            pruner_db,
3101            checkpoint_progress_tracker.clone(),
3102        );
3103        let input_loader =
3104            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3105        let epoch = epoch_store.epoch();
3106        let rgp = epoch_store.reference_gas_price();
3107        let state = Arc::new(AuthorityState {
3108            name,
3109            secret,
3110            execution_lock: RwLock::new(epoch),
3111            epoch_store: ArcSwap::new(epoch_store.clone()),
3112            input_loader,
3113            execution_cache_trait_pointers,
3114            indexes,
3115            grpc_indexes_store,
3116            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3117            checkpoint_store,
3118            committee_store,
3119            transaction_manager,
3120            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3121            metrics,
3122            _pruner,
3123            authority_per_epoch_pruner,
3124            checkpoint_progress_tracker,
3125            db_checkpoint_config: db_checkpoint_config.clone(),
3126            config,
3127            overload_info: AuthorityOverloadInfo::default(),
3128            validator_tx_finalizer,
3129            chain_identifier,
3130            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3131        });
3132
3133        // Start a task to execute ready certificates.
3134        let authority_state = Arc::downgrade(&state);
3135        spawn_monitored_task!(execution_process(
3136            authority_state,
3137            rx_ready_certificates,
3138            rx_execution_shutdown,
3139        ));
3140        spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3141
3142        // TODO: This doesn't belong to the constructor of AuthorityState.
3143        state
3144            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3145            .expect("Error indexing genesis objects.");
3146
3147        state
3148    }
3149
3150    pub fn epoch_db_pruner(&self) -> &AuthorityPerEpochStorePruner {
3151        &self.authority_per_epoch_pruner
3152    }
3153
3154    // TODO: Consolidate our traits to reduce the number of methods here.
3155    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3156        &self.execution_cache_trait_pointers.object_cache_reader
3157    }
3158
3159    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3160        &self.execution_cache_trait_pointers.transaction_cache_reader
3161    }
3162
3163    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3164        &self.execution_cache_trait_pointers.cache_writer
3165    }
3166
3167    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3168        &self.execution_cache_trait_pointers.backing_store
3169    }
3170
3171    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3172        &self.execution_cache_trait_pointers.backing_package_store
3173    }
3174
3175    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3176        &self.execution_cache_trait_pointers.object_store
3177    }
3178
3179    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3180        &self.execution_cache_trait_pointers.reconfig_api
3181    }
3182
3183    pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3184        &self.execution_cache_trait_pointers.global_state_hash_store
3185    }
3186
3187    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3188        &self.execution_cache_trait_pointers.checkpoint_cache
3189    }
3190
3191    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3192        &self.execution_cache_trait_pointers.state_sync_store
3193    }
3194
3195    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3196        &self.execution_cache_trait_pointers.cache_commit
3197    }
3198
3199    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3200        self.execution_cache_trait_pointers
3201            .testing_api
3202            .database_for_testing()
3203    }
3204
3205    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3206        &self,
3207        config: NodeConfig,
3208        metrics: Arc<AuthorityStorePruningMetrics>,
3209    ) -> anyhow::Result<()> {
3210        let archive_readers =
3211            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3212        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3213            &self.database_for_testing().perpetual_tables,
3214            &self.checkpoint_store,
3215            self.grpc_indexes_store.as_deref(),
3216            None,
3217            config.authority_store_pruning_config,
3218            metrics,
3219            archive_readers,
3220            EPOCH_DURATION_MS_FOR_TESTING,
3221            self.checkpoint_progress_tracker.as_ref(),
3222        )
3223        .await
3224    }
3225
3226    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3227        &self.transaction_manager
3228    }
3229
3230    /// Adds transactions / certificates to transaction manager for ordered
3231    /// execution.
3232    pub fn enqueue_transactions_for_execution(
3233        &self,
3234        txns: Vec<VerifiedExecutableTransaction>,
3235        epoch_store: &Arc<AuthorityPerEpochStore>,
3236    ) {
3237        self.transaction_manager.enqueue(txns, epoch_store)
3238    }
3239
3240    /// Adds certificates to transaction manager for ordered execution.
3241    pub fn enqueue_certificates_for_execution(
3242        &self,
3243        certs: Vec<VerifiedCertificate>,
3244        epoch_store: &Arc<AuthorityPerEpochStore>,
3245    ) {
3246        self.transaction_manager
3247            .enqueue_certificates(certs, epoch_store)
3248    }
3249
3250    pub fn enqueue_with_expected_effects_digest(
3251        &self,
3252        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3253        epoch_store: &AuthorityPerEpochStore,
3254    ) {
3255        self.transaction_manager
3256            .enqueue_with_expected_effects_digest(certs, epoch_store)
3257    }
3258
3259    fn create_owner_index_if_empty(
3260        &self,
3261        genesis_objects: &[Object],
3262        epoch_store: &Arc<AuthorityPerEpochStore>,
3263    ) -> IotaResult {
3264        let Some(index_store) = &self.indexes else {
3265            return Ok(());
3266        };
3267        if !index_store.is_empty() {
3268            return Ok(());
3269        }
3270
3271        let mut new_owners = vec![];
3272        let mut new_dynamic_fields = vec![];
3273        let mut layout_resolver = epoch_store
3274            .executor()
3275            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3276        for o in genesis_objects.iter() {
3277            match o.owner {
3278                Owner::Address(addr) => new_owners.push((
3279                    (addr, o.id()),
3280                    ObjectInfo::new(&o.compute_object_reference(), o),
3281                )),
3282                Owner::Object(object_id) => {
3283                    let id = o.id();
3284                    let info = match self.try_create_dynamic_field_info(
3285                        o,
3286                        &BTreeMap::new(),
3287                        layout_resolver.as_mut(),
3288                    ) {
3289                        Ok(Some(info)) => info,
3290                        Ok(None) => continue,
3291                        Err(IotaError::UserInput {
3292                            error:
3293                                UserInputError::ObjectNotFound {
3294                                    object_id: not_found_id,
3295                                    version,
3296                                },
3297                        }) => {
3298                            warn!(
3299                                ?not_found_id,
3300                                ?version,
3301                                object_owner=?object_id,
3302                                field=?id,
3303                                "Skipping dynamic field: referenced genesis object not found"
3304                            );
3305                            continue;
3306                        }
3307                        Err(e) => return Err(e),
3308                    };
3309                    new_dynamic_fields.push(((object_id, id), info));
3310                }
3311                _ => {}
3312            }
3313        }
3314
3315        index_store.insert_genesis_objects(ObjectIndexChanges {
3316            deleted_owners: vec![],
3317            deleted_dynamic_fields: vec![],
3318            new_owners,
3319            new_dynamic_fields,
3320        })
3321    }
3322
3323    /// Attempts to acquire execution lock for an executable transaction.
3324    /// Returns the lock if the transaction is matching current executed epoch
3325    /// Returns None otherwise
3326    pub fn execution_lock_for_executable_transaction(
3327        &self,
3328        transaction: &VerifiedExecutableTransaction,
3329    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3330        let lock = self
3331            .execution_lock
3332            .try_read()
3333            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3334        if *lock == transaction.auth_sig().epoch() {
3335            Ok(lock)
3336        } else {
3337            Err(IotaError::WrongEpoch {
3338                expected_epoch: *lock,
3339                actual_epoch: transaction.auth_sig().epoch(),
3340            })
3341        }
3342    }
3343
3344    /// Acquires the execution lock for the duration of a transaction signing
3345    /// request. This prevents reconfiguration from starting until we are
3346    /// finished handling the signing request. Otherwise, in-memory lock
3347    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3348    /// while we are attempting to acquire locks for the transaction.
3349    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3350        self.execution_lock
3351            .try_read()
3352            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3353    }
3354
3355    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3356        self.execution_lock.write().await
3357    }
3358
3359    #[instrument(level = "error", skip_all)]
3360    pub async fn reconfigure(
3361        &self,
3362        cur_epoch_store: &AuthorityPerEpochStore,
3363        supported_protocol_versions: SupportedProtocolVersions,
3364        new_committee: Committee,
3365        epoch_start_configuration: EpochStartConfiguration,
3366        state_hasher: Arc<GlobalStateHasher>,
3367        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3368        epoch_supply_change: i64,
3369        epoch_last_checkpoint: CheckpointSequenceNumber,
3370    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3371        Self::check_protocol_version(
3372            supported_protocol_versions,
3373            epoch_start_configuration
3374                .epoch_start_state()
3375                .protocol_version(),
3376        );
3377        self.metrics.reset_on_reconfigure();
3378        self.committee_store.insert_new_committee(&new_committee)?;
3379
3380        // Wait until no transactions are being executed.
3381        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3382
3383        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3384        cur_epoch_store.epoch_terminated().await;
3385
3386        let highest_locally_built_checkpoint_seq = self
3387            .checkpoint_store
3388            .get_latest_locally_computed_checkpoint()?
3389            .map(|c| *c.sequence_number())
3390            .unwrap_or(0);
3391
3392        assert!(
3393            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3394            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3395        );
3396        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint
3397            || self.is_fullnode(cur_epoch_store)
3398        {
3399            // if we built the last checkpoint locally (as opposed to receiving it from a
3400            // peer), then all shared_version_assignments except the one for the
3401            // ChangeEpoch transaction should have been removed
3402            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3403            // Due to (otherwise harmless) race conditions between CheckpointExecutor and
3404            // ConsensusHandler, we actually can't guarantee that all
3405            // shared_version_assignments have been removed. However,
3406            // typically at most 2 or 3 are left over. We leave this check here in order to
3407            // catch complete failure of cleanup which would cause a memory
3408            // leak.
3409            if num_shared_version_assignments > 10 {
3410                // If this happens in prod, we have a memory leak, but not a correctness issue.
3411                debug_fatal!(
3412                    "all shared_version_assignments should have been removed \
3413                    (num_shared_version_assignments: {num_shared_version_assignments})"
3414                );
3415            }
3416        }
3417
3418        // Safe to reconfigure now. No transactions are being executed,
3419        // and no epoch-specific tasks are running.
3420
3421        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3422        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3423        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3424            .await?;
3425        self.get_reconfig_api()
3426            .clear_state_end_of_epoch(&execution_lock);
3427        self.check_system_consistency(
3428            cur_epoch_store,
3429            state_hasher,
3430            expensive_safety_check_config,
3431            epoch_supply_change,
3432        )?;
3433        self.get_reconfig_api()
3434            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3435        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3436            if self
3437                .db_checkpoint_config
3438                .perform_db_checkpoints_at_epoch_end
3439            {
3440                let checkpoint_indexes = self
3441                    .db_checkpoint_config
3442                    .perform_index_db_checkpoints_at_epoch_end
3443                    .unwrap_or(false);
3444                let current_epoch = cur_epoch_store.epoch();
3445                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3446                self.checkpoint_all_dbs(
3447                    &epoch_checkpoint_path,
3448                    cur_epoch_store,
3449                    checkpoint_indexes,
3450                )?;
3451            }
3452        }
3453
3454        self.get_reconfig_api()
3455            .reconfigure_cache(&epoch_start_configuration)
3456            .await;
3457
3458        let new_epoch = new_committee.epoch;
3459        let new_epoch_store = self
3460            .reopen_epoch_db(
3461                cur_epoch_store,
3462                new_committee,
3463                epoch_start_configuration,
3464                expensive_safety_check_config,
3465                epoch_last_checkpoint,
3466            )
3467            .await?;
3468        assert_eq!(new_epoch_store.epoch(), new_epoch);
3469        self.transaction_manager.reconfigure(new_epoch);
3470        *execution_lock = new_epoch;
3471        // drop execution_lock after epoch store was updated
3472        // see also assert in AuthorityState::process_certificate
3473        // on the epoch store and execution lock epoch match
3474        Ok(new_epoch_store)
3475    }
3476
3477    /// Advance the epoch store to the next epoch for testing only.
3478    /// This only manually sets all the places where we have the epoch number.
3479    /// It doesn't properly reconfigure the node, hence should be only used for
3480    /// testing.
3481    pub async fn reconfigure_for_testing(&self) {
3482        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3483        let epoch_store = self.epoch_store_for_testing().clone();
3484        let protocol_config = epoch_store.protocol_config().clone();
3485        // The current protocol config used in the epoch store may have been overridden
3486        // and diverged from the protocol config definitions. That override may
3487        // have now been dropped when the initial guard was dropped. We reapply
3488        // the override before creating the new epoch store, to make sure that
3489        // the new epoch store has the same protocol config as the current one.
3490        // Since this is for testing only, we mostly like to keep the protocol config
3491        // the same across epochs.
3492        let _guard =
3493            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3494        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3495            self.get_backing_package_store().clone(),
3496            &self.config.expensive_safety_check_config,
3497            self.checkpoint_store
3498                .get_epoch_last_checkpoint(epoch_store.epoch())
3499                .unwrap()
3500                .map(|c| *c.sequence_number())
3501                .unwrap_or_default(),
3502        );
3503        let new_epoch = new_epoch_store.epoch();
3504        self.transaction_manager.reconfigure(new_epoch);
3505        self.epoch_store.store(new_epoch_store);
3506        epoch_store.epoch_terminated().await;
3507        *execution_lock = new_epoch;
3508    }
3509
3510    #[instrument(level = "error", skip_all)]
3511    fn check_system_consistency(
3512        &self,
3513        cur_epoch_store: &AuthorityPerEpochStore,
3514        state_hasher: Arc<GlobalStateHasher>,
3515        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3516        epoch_supply_change: i64,
3517    ) -> IotaResult<()> {
3518        info!(
3519            "Performing iota conservation consistency check for epoch {}",
3520            cur_epoch_store.epoch()
3521        );
3522
3523        if cfg!(debug_assertions) {
3524            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3525        }
3526
3527        self.get_reconfig_api()
3528            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3529
3530        // check for root state hash consistency with live object set
3531        if expensive_safety_check_config.enable_state_consistency_check() {
3532            info!(
3533                "Performing state consistency check for epoch {}",
3534                cur_epoch_store.epoch()
3535            );
3536            self.expensive_check_is_consistent_state(
3537                state_hasher,
3538                cur_epoch_store,
3539                cfg!(debug_assertions), // panic in debug mode only
3540            );
3541        }
3542
3543        if expensive_safety_check_config.enable_secondary_index_checks() {
3544            if let Some(indexes) = self.indexes.clone() {
3545                verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
3546                    .expect("secondary indexes are inconsistent");
3547            }
3548        }
3549
3550        Ok(())
3551    }
3552
3553    fn expensive_check_is_consistent_state(
3554        &self,
3555        state_hasher: Arc<GlobalStateHasher>,
3556        cur_epoch_store: &AuthorityPerEpochStore,
3557        panic: bool,
3558    ) {
3559        let live_object_set_hash = state_hasher.digest_live_object_set();
3560
3561        let root_state_hash: ECMHLiveObjectSetDigest = self
3562            .get_global_state_hash_store()
3563            .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
3564            .expect("Retrieving root state hash cannot fail")
3565            .expect("Root state hash for epoch must exist")
3566            .1
3567            .digest()
3568            .into();
3569
3570        let is_inconsistent = root_state_hash != live_object_set_hash;
3571        if is_inconsistent {
3572            if panic {
3573                panic!(
3574                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3575                );
3576            } else {
3577                error!(
3578                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3579                    root_state_hash, live_object_set_hash
3580                );
3581            }
3582        } else {
3583            info!("State consistency check passed");
3584        }
3585
3586        if !panic {
3587            state_hasher.set_inconsistent_state(is_inconsistent);
3588        }
3589    }
3590
3591    pub fn current_epoch_for_testing(&self) -> EpochId {
3592        self.epoch_store_for_testing().epoch()
3593    }
3594
3595    #[instrument(level = "error", skip_all)]
3596    pub fn checkpoint_all_dbs(
3597        &self,
3598        checkpoint_path: &Path,
3599        cur_epoch_store: &AuthorityPerEpochStore,
3600        checkpoint_indexes: bool,
3601    ) -> IotaResult {
3602        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3603        let current_epoch = cur_epoch_store.epoch();
3604
3605        if checkpoint_path.exists() {
3606            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3607            return Ok(());
3608        }
3609
3610        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3611        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3612
3613        if checkpoint_path_tmp.exists() {
3614            fs::remove_dir_all(&checkpoint_path_tmp)
3615                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3616        }
3617
3618        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3619        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3620
3621        // NOTE: Do not change the order of invoking these checkpoint calls
3622        // We want to snapshot checkpoint db first to not race with state sync
3623        self.checkpoint_store
3624            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3625
3626        self.get_reconfig_api()
3627            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3628
3629        self.committee_store
3630            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3631
3632        if checkpoint_indexes {
3633            if let Some(indexes) = self.indexes.as_ref() {
3634                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3635            }
3636            if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3637                grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3638            }
3639        }
3640
3641        fs::rename(checkpoint_path_tmp, checkpoint_path)
3642            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3643        Ok(())
3644    }
3645
3646    /// Load the current epoch store. This can change during reconfiguration. To
3647    /// ensure that we never end up accessing different epoch stores in a
3648    /// single task, we need to make sure that this is called once per task.
3649    /// Each call needs to be carefully audited to ensure it is
3650    /// the case. This also means we should minimize the number of call-sites.
3651    /// Only call it when there is no way to obtain it from somewhere else.
3652    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3653        self.epoch_store.load()
3654    }
3655
3656    // Load the epoch store, should be used in tests only.
3657    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3658        self.load_epoch_store_one_call_per_task()
3659    }
3660
3661    pub fn clone_committee_for_testing(&self) -> Committee {
3662        Committee::clone(self.epoch_store_for_testing().committee())
3663    }
3664
3665    #[instrument(level = "trace", skip_all)]
3666    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3667        self.get_object_store()
3668            .try_get_object(object_id)
3669            .map_err(Into::into)
3670    }
3671
3672    /// Non-fallible version of `try_get_object`.
3673    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3674        self.try_get_object(object_id)
3675            .await
3676            .expect("storage access failed")
3677    }
3678
3679    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3680        Ok(self
3681            .try_get_object(&ObjectID::SYSTEM)
3682            .await?
3683            .expect("system package should always exist")
3684            .compute_object_reference())
3685    }
3686
3687    // This function is only used for testing.
3688    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3689        self.get_object_cache_reader()
3690            .try_get_iota_system_state_object_unsafe()
3691    }
3692
3693    #[instrument(level = "trace", skip_all)]
3694    pub fn get_checkpoint_by_sequence_number(
3695        &self,
3696        sequence_number: CheckpointSequenceNumber,
3697    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3698        Ok(self
3699            .checkpoint_store
3700            .get_checkpoint_by_sequence_number(sequence_number)?)
3701    }
3702
3703    /// Wait for the given transactions to be included in a checkpoint.
3704    ///
3705    /// Returns a mapping from transaction digest to
3706    /// `(checkpoint_sequence_number, checkpoint_timestamp_ms)`.
3707    /// On timeout, returns partial results for any transactions that were
3708    /// already checkpointed.
3709    pub async fn wait_for_checkpoint_inclusion(
3710        &self,
3711        digests: &[TransactionDigest],
3712        timeout: Duration,
3713    ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3714        let epoch_store = self.load_epoch_store_one_call_per_task();
3715
3716        // Local cache so multiple transactions in the same checkpoint only
3717        // trigger a single checkpoint summary lookup.
3718        let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3719
3720        let results = epoch_store
3721            .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3722                *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3723                    self.get_checkpoint_by_sequence_number(seq)
3724                        .ok()
3725                        .flatten()
3726                        .map(|c| c.timestamp_ms)
3727                        .unwrap_or(0)
3728                })
3729            })
3730            .await?;
3731
3732        Ok(digests
3733            .iter()
3734            .copied()
3735            .zip(results)
3736            .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3737            .collect())
3738    }
3739
3740    #[instrument(level = "trace", skip_all)]
3741    pub fn get_transaction_checkpoint_for_tests(
3742        &self,
3743        digest: &TransactionDigest,
3744        epoch_store: &AuthorityPerEpochStore,
3745    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3746        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3747        let Some(checkpoint) = checkpoint else {
3748            return Ok(None);
3749        };
3750        let checkpoint = self
3751            .checkpoint_store
3752            .get_checkpoint_by_sequence_number(checkpoint)?;
3753        Ok(checkpoint)
3754    }
3755
3756    #[instrument(level = "trace", skip_all)]
3757    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3758        Ok(
3759            match self
3760                .get_object_cache_reader()
3761                .try_get_latest_object_or_tombstone(*object_id)?
3762            {
3763                Some((_, ObjectOrTombstone::Object(object))) => {
3764                    let layout = self.get_object_layout(&object)?;
3765                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3766                }
3767                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3768                None => ObjectRead::NotExists(*object_id),
3769            },
3770        )
3771    }
3772
3773    /// Chain Identifier is the digest of the genesis checkpoint.
3774    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3775        self.chain_identifier
3776    }
3777
3778    #[instrument(level = "trace", skip_all)]
3779    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3780    where
3781        T: DeserializeOwned,
3782    {
3783        let o = self.get_object_read(object_id)?.into_object()?;
3784        if let Some(move_object) = o.data.as_struct_opt() {
3785            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3786                IotaError::ObjectDeserialization {
3787                    error: format!("{e}"),
3788                }
3789            })?)
3790        } else {
3791            Err(IotaError::ObjectDeserialization {
3792                error: format!("Provided object : [{object_id}] is not a Move object."),
3793            })
3794        }
3795    }
3796
3797    /// This function aims to serve rpc reads on past objects and
3798    /// we don't expect it to be called for other purposes.
3799    /// Depending on the object pruning policies that will be enforced in the
3800    /// future there is no software-level guarantee/SLA to retrieve an object
3801    /// with an old version even if it exists/existed.
3802    #[instrument(level = "trace", skip_all)]
3803    pub fn get_past_object_read(
3804        &self,
3805        object_id: &ObjectID,
3806        version: SequenceNumber,
3807    ) -> IotaResult<PastObjectRead> {
3808        // Firstly we see if the object ever existed by getting its latest data
3809        let Some(obj_ref) = self
3810            .get_object_cache_reader()
3811            .try_get_latest_object_ref_or_tombstone(*object_id)?
3812        else {
3813            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3814        };
3815
3816        if version > obj_ref.version {
3817            return Ok(PastObjectRead::VersionTooHigh {
3818                object_id: *object_id,
3819                asked_version: version,
3820                latest_version: obj_ref.version,
3821            });
3822        }
3823
3824        if version < obj_ref.version {
3825            // Read past objects
3826            return Ok(match self.read_object_at_version(object_id, version)? {
3827                Some((object, layout)) => {
3828                    let obj_ref = object.compute_object_reference();
3829                    PastObjectRead::VersionFound(obj_ref, object, layout)
3830                }
3831
3832                None => PastObjectRead::VersionNotFound(*object_id, version),
3833            });
3834        }
3835
3836        if !obj_ref.digest.is_object_alive() {
3837            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3838        }
3839
3840        match self.read_object_at_version(object_id, obj_ref.version)? {
3841            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3842            None => {
3843                error!(
3844                    "Object with in parent_entry is missing from object store, datastore is \
3845                     inconsistent",
3846                );
3847                Err(UserInputError::ObjectNotFound {
3848                    object_id: *object_id,
3849                    version: Some(obj_ref.version),
3850                }
3851                .into())
3852            }
3853        }
3854    }
3855
3856    #[instrument(level = "trace", skip_all)]
3857    fn read_object_at_version(
3858        &self,
3859        object_id: &ObjectID,
3860        version: SequenceNumber,
3861    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3862        let Some(object) = self
3863            .get_object_cache_reader()
3864            .try_get_object_by_key(object_id, version)?
3865        else {
3866            return Ok(None);
3867        };
3868
3869        let layout = self.get_object_layout(&object)?;
3870        Ok(Some((object, layout)))
3871    }
3872
3873    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3874        let layout = object
3875            .data
3876            .as_struct_opt()
3877            .map(|object| {
3878                into_struct_layout(
3879                    self.load_epoch_store_one_call_per_task()
3880                        .executor()
3881                        // TODO(cache) - must read through cache
3882                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3883                        .get_annotated_layout(object.struct_tag())?,
3884                )
3885            })
3886            .transpose()?;
3887        Ok(layout)
3888    }
3889
3890    fn get_owner_at_version(
3891        &self,
3892        object_id: &ObjectID,
3893        version: SequenceNumber,
3894    ) -> IotaResult<Owner> {
3895        self.get_object_store()
3896            .try_get_object_by_key(object_id, version)?
3897            .ok_or_else(|| {
3898                IotaError::from(UserInputError::ObjectNotFound {
3899                    object_id: *object_id,
3900                    version: Some(version),
3901                })
3902            })
3903            .map(|o| o.owner)
3904    }
3905
3906    #[instrument(level = "trace", skip_all)]
3907    pub fn get_owner_objects(
3908        &self,
3909        owner: IotaAddress,
3910        // If `Some`, the query will start from the next item after the specified cursor
3911        cursor: Option<ObjectID>,
3912        limit: usize,
3913        filter: Option<IotaObjectDataFilter>,
3914    ) -> IotaResult<Vec<ObjectInfo>> {
3915        if let Some(indexes) = &self.indexes {
3916            indexes.get_owner_objects(owner, cursor, limit, filter)
3917        } else {
3918            Err(IotaError::IndexStoreNotAvailable)
3919        }
3920    }
3921
3922    #[instrument(level = "trace", skip_all)]
3923    pub fn get_owned_coins_iterator_with_cursor(
3924        &self,
3925        owner: IotaAddress,
3926        // If `Some`, the query will start from the next item after the specified cursor
3927        cursor: (String, ObjectID),
3928        limit: usize,
3929        one_coin_type_only: bool,
3930    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3931        if let Some(indexes) = &self.indexes {
3932            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3933        } else {
3934            Err(IotaError::IndexStoreNotAvailable)
3935        }
3936    }
3937
3938    #[instrument(level = "trace", skip_all)]
3939    pub fn get_owner_objects_iterator(
3940        &self,
3941        owner: IotaAddress,
3942        // If `Some`, the query will start from the next item after the specified cursor
3943        cursor: Option<ObjectID>,
3944        filter: Option<IotaObjectDataFilter>,
3945    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3946        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3947        if let Some(indexes) = &self.indexes {
3948            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3949        } else {
3950            Err(IotaError::IndexStoreNotAvailable)
3951        }
3952    }
3953
3954    #[instrument(level = "trace", skip_all)]
3955    pub async fn get_move_objects<T>(
3956        &self,
3957        owner: IotaAddress,
3958        tag: StructTag,
3959    ) -> IotaResult<Vec<T>>
3960    where
3961        T: DeserializeOwned,
3962    {
3963        let object_ids = self
3964            .get_owner_objects_iterator(owner, None, None)?
3965            .filter(|o| match &o.type_ {
3966                ObjectType::Struct(s) => *s == tag,
3967                ObjectType::Package => false,
3968            })
3969            .map(|info| ObjectKey(info.object_id, info.version))
3970            .collect::<Vec<_>>();
3971        let mut move_objects = vec![];
3972
3973        let objects = self
3974            .get_object_store()
3975            .try_multi_get_objects_by_key(&object_ids)?;
3976
3977        for (o, id) in objects.into_iter().zip(object_ids) {
3978            let object = o.ok_or_else(|| {
3979                IotaError::from(UserInputError::ObjectNotFound {
3980                    object_id: id.0,
3981                    version: Some(id.1),
3982                })
3983            })?;
3984            let move_object = object.data.as_struct_opt().ok_or_else(|| {
3985                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3986            })?;
3987            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3988                IotaError::ObjectDeserialization {
3989                    error: format!("{e}"),
3990                }
3991            })?);
3992        }
3993        Ok(move_objects)
3994    }
3995
3996    #[instrument(level = "trace", skip_all)]
3997    pub fn get_dynamic_fields(
3998        &self,
3999        owner: ObjectID,
4000        // If `Some`, the query will start from the next item after the specified cursor
4001        cursor: Option<ObjectID>,
4002        limit: usize,
4003    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4004        Ok(self
4005            .get_dynamic_fields_iterator(owner, cursor)?
4006            .take(limit)
4007            .collect::<Result<Vec<_>, _>>()?)
4008    }
4009
4010    fn get_dynamic_fields_iterator(
4011        &self,
4012        owner: ObjectID,
4013        // If `Some`, the query will start from the next item after the specified cursor
4014        cursor: Option<ObjectID>,
4015    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4016    {
4017        if let Some(indexes) = &self.indexes {
4018            indexes.get_dynamic_fields_iterator(owner, cursor)
4019        } else {
4020            Err(IotaError::IndexStoreNotAvailable)
4021        }
4022    }
4023
4024    #[instrument(level = "trace", skip_all)]
4025    pub fn get_dynamic_field_object_id(
4026        &self,
4027        owner: ObjectID,
4028        name_type: TypeTag,
4029        name_bcs_bytes: &[u8],
4030    ) -> IotaResult<Option<ObjectID>> {
4031        if let Some(indexes) = &self.indexes {
4032            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4033        } else {
4034            Err(IotaError::IndexStoreNotAvailable)
4035        }
4036    }
4037
4038    #[instrument(level = "trace", skip_all)]
4039    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4040        Ok(self.get_indexes()?.next_sequence_number())
4041    }
4042
4043    #[instrument(level = "trace", skip_all)]
4044    pub async fn get_executed_transaction_and_effects(
4045        &self,
4046        digest: TransactionDigest,
4047        kv_store: Arc<TransactionKeyValueStore>,
4048    ) -> IotaResult<(Transaction, TransactionEffects)> {
4049        let transaction = kv_store.get_tx(digest).await?;
4050        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4051        Ok((transaction, effects))
4052    }
4053
4054    #[instrument(level = "trace", skip_all)]
4055    pub fn multi_get_checkpoint_by_sequence_number(
4056        &self,
4057        sequence_numbers: &[CheckpointSequenceNumber],
4058    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4059        Ok(self
4060            .checkpoint_store
4061            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4062    }
4063
4064    #[instrument(level = "trace", skip_all)]
4065    pub fn get_transaction_events(
4066        &self,
4067        digest: &TransactionDigest,
4068    ) -> IotaResult<TransactionEvents> {
4069        self.get_transaction_cache_reader()
4070            .try_get_events(digest)?
4071            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4072    }
4073
4074    pub fn get_transaction_input_objects(
4075        &self,
4076        effects: &TransactionEffects,
4077    ) -> anyhow::Result<Vec<Object>> {
4078        iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4079            .map_err(Into::into)
4080    }
4081
4082    pub fn get_transaction_output_objects(
4083        &self,
4084        effects: &TransactionEffects,
4085    ) -> anyhow::Result<Vec<Object>> {
4086        iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4087            .map_err(Into::into)
4088    }
4089
4090    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4091        match &self.indexes {
4092            Some(i) => Ok(i.clone()),
4093            None => Err(IotaError::UnsupportedFeature {
4094                error: "extended object indexing is not enabled on this server".into(),
4095            }),
4096        }
4097    }
4098
4099    pub async fn get_transactions_for_tests(
4100        self: &Arc<Self>,
4101        filter: Option<TransactionFilter>,
4102        cursor: Option<TransactionDigest>,
4103        limit: Option<usize>,
4104        reverse: bool,
4105    ) -> IotaResult<Vec<TransactionDigest>> {
4106        let metrics = KeyValueStoreMetrics::new_for_tests();
4107        let kv_store = Arc::new(TransactionKeyValueStore::new(
4108            "rocksdb",
4109            metrics,
4110            self.clone(),
4111        ));
4112        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4113            .await
4114    }
4115
4116    #[instrument(level = "trace", skip_all)]
4117    pub async fn get_transactions(
4118        &self,
4119        kv_store: &Arc<TransactionKeyValueStore>,
4120        filter: Option<TransactionFilter>,
4121        // If `Some`, the query will start from the next item after the specified cursor
4122        cursor: Option<TransactionDigest>,
4123        limit: Option<usize>,
4124        reverse: bool,
4125    ) -> IotaResult<Vec<TransactionDigest>> {
4126        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4127            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4128            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4129            if reverse {
4130                let iter = iter
4131                    .rev()
4132                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4133                    .skip(usize::from(cursor.is_some()));
4134                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4135            } else {
4136                let iter = iter
4137                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4138                    .skip(usize::from(cursor.is_some()));
4139                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4140            }
4141        }
4142        self.get_indexes()?
4143            .get_transactions(filter, cursor, limit, reverse)
4144    }
4145
4146    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4147        &self.checkpoint_store
4148    }
4149
4150    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4151        self.get_checkpoint_store()
4152            .get_highest_executed_checkpoint_seq_number()?
4153            .ok_or(IotaError::UserInput {
4154                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4155            })
4156    }
4157
4158    #[cfg(msim)]
4159    pub fn get_highest_pruned_checkpoint_for_testing(
4160        &self,
4161    ) -> IotaResult<CheckpointSequenceNumber> {
4162        self.database_for_testing()
4163            .perpetual_tables
4164            .get_highest_pruned_checkpoint()
4165            .map(|c| c.unwrap_or(0))
4166            .map_err(Into::into)
4167    }
4168
4169    #[instrument(level = "trace", skip_all)]
4170    pub fn get_checkpoint_summary_by_sequence_number(
4171        &self,
4172        sequence_number: CheckpointSequenceNumber,
4173    ) -> IotaResult<CheckpointSummary> {
4174        let verified_checkpoint = self
4175            .get_checkpoint_store()
4176            .get_checkpoint_by_sequence_number(sequence_number)?;
4177        match verified_checkpoint {
4178            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4179            None => Err(IotaError::UserInput {
4180                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4181            }),
4182        }
4183    }
4184
4185    #[instrument(level = "trace", skip_all)]
4186    pub fn get_checkpoint_summary_by_digest(
4187        &self,
4188        digest: CheckpointDigest,
4189    ) -> IotaResult<CheckpointSummary> {
4190        let verified_checkpoint = self
4191            .get_checkpoint_store()
4192            .get_checkpoint_by_digest(&digest)?;
4193        match verified_checkpoint {
4194            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4195            None => Err(IotaError::UserInput {
4196                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4197            }),
4198        }
4199    }
4200
4201    #[instrument(level = "trace", skip_all)]
4202    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4203        if package_id.is_system_package() {
4204            return self.find_genesis_txn_digest();
4205        }
4206        Ok(self
4207            .get_object_read(&package_id)?
4208            .into_object()?
4209            .previous_transaction)
4210    }
4211
4212    #[instrument(level = "trace", skip_all)]
4213    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4214        let summary = self
4215            .get_verified_checkpoint_by_sequence_number(0)?
4216            .into_message();
4217        let content = self.get_checkpoint_contents(summary.content_digest)?;
4218        let genesis_transaction = content.enumerate_transactions(&summary).next();
4219        Ok(genesis_transaction
4220            .ok_or(IotaError::UserInput {
4221                error: UserInputError::GenesisTransactionNotFound,
4222            })?
4223            .1
4224            .transaction)
4225    }
4226
4227    #[instrument(level = "trace", skip_all)]
4228    pub fn get_verified_checkpoint_by_sequence_number(
4229        &self,
4230        sequence_number: CheckpointSequenceNumber,
4231    ) -> IotaResult<VerifiedCheckpoint> {
4232        let verified_checkpoint = self
4233            .get_checkpoint_store()
4234            .get_checkpoint_by_sequence_number(sequence_number)?;
4235        match verified_checkpoint {
4236            Some(verified_checkpoint) => Ok(verified_checkpoint),
4237            None => Err(IotaError::UserInput {
4238                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4239            }),
4240        }
4241    }
4242
4243    #[instrument(level = "trace", skip_all)]
4244    pub fn get_verified_checkpoint_summary_by_digest(
4245        &self,
4246        digest: CheckpointDigest,
4247    ) -> IotaResult<VerifiedCheckpoint> {
4248        let verified_checkpoint = self
4249            .get_checkpoint_store()
4250            .get_checkpoint_by_digest(&digest)?;
4251        match verified_checkpoint {
4252            Some(verified_checkpoint) => Ok(verified_checkpoint),
4253            None => Err(IotaError::UserInput {
4254                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4255            }),
4256        }
4257    }
4258
4259    #[instrument(level = "trace", skip_all)]
4260    pub fn get_checkpoint_contents(
4261        &self,
4262        digest: CheckpointContentsDigest,
4263    ) -> IotaResult<CheckpointContents> {
4264        self.get_checkpoint_store()
4265            .get_checkpoint_contents(&digest)?
4266            .ok_or(IotaError::UserInput {
4267                error: UserInputError::CheckpointContentsNotFound(digest),
4268            })
4269    }
4270
4271    #[instrument(level = "trace", skip_all)]
4272    pub fn get_checkpoint_contents_by_sequence_number(
4273        &self,
4274        sequence_number: CheckpointSequenceNumber,
4275    ) -> IotaResult<CheckpointContents> {
4276        let verified_checkpoint = self
4277            .get_checkpoint_store()
4278            .get_checkpoint_by_sequence_number(sequence_number)?;
4279        match verified_checkpoint {
4280            Some(verified_checkpoint) => {
4281                let content_digest = verified_checkpoint.into_inner().content_digest;
4282                self.get_checkpoint_contents(content_digest)
4283            }
4284            None => Err(IotaError::UserInput {
4285                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4286            }),
4287        }
4288    }
4289
4290    #[instrument(level = "trace", skip_all)]
4291    pub async fn query_events(
4292        &self,
4293        kv_store: &Arc<TransactionKeyValueStore>,
4294        query: EventFilter,
4295        // If `Some`, the query will start from the next item after the specified cursor
4296        cursor: Option<EventID>,
4297        limit: usize,
4298        descending: bool,
4299    ) -> IotaResult<Vec<IotaEvent>> {
4300        let index_store = self.get_indexes()?;
4301
4302        // Get the tx_num from tx_digest
4303        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4304            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4305                IotaError::TransactionNotFound {
4306                    digest: cursor.tx_digest,
4307                },
4308            )?;
4309            (tx_seq, cursor.event_seq as usize)
4310        } else if descending {
4311            (u64::MAX, usize::MAX)
4312        } else {
4313            (0, 0)
4314        };
4315
4316        let limit = limit + 1;
4317        let mut event_keys = match query {
4318            EventFilter::All(filters) => {
4319                if filters.is_empty() {
4320                    index_store.all_events(tx_num, event_num, limit, descending)?
4321                } else {
4322                    return Err(IotaError::UserInput {
4323                        error: UserInputError::Unsupported(
4324                            "This query type does not currently support filter combinations"
4325                                .to_string(),
4326                        ),
4327                    });
4328                }
4329            }
4330            EventFilter::Transaction(digest) => {
4331                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4332            }
4333            EventFilter::MoveModule { package, module } => {
4334                let module_id = ModuleId::new(
4335                    AccountAddress::new(package.into_bytes()),
4336                    move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4337                );
4338                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4339            }
4340            EventFilter::MoveEventType(struct_name) => index_store
4341                .events_by_move_event_struct_name(
4342                    &struct_name,
4343                    tx_num,
4344                    event_num,
4345                    limit,
4346                    descending,
4347                )?,
4348            EventFilter::Sender(sender) => {
4349                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4350            }
4351            EventFilter::TimeRange {
4352                start_time,
4353                end_time,
4354            } => index_store
4355                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4356            EventFilter::MoveEventModule { package, module } => index_store
4357                .events_by_move_event_module(
4358                    &ModuleId::new(
4359                        AccountAddress::new(package.into_bytes()),
4360                        move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4361                    ),
4362                    tx_num,
4363                    event_num,
4364                    limit,
4365                    descending,
4366                )?,
4367            // not using "_ =>" because we want to make sure we remember to add new variants here
4368            EventFilter::Package(_)
4369            | EventFilter::MoveEventField { .. }
4370            | EventFilter::Any(_)
4371            | EventFilter::And(_, _)
4372            | EventFilter::Or(_, _) => {
4373                return Err(IotaError::UserInput {
4374                    error: UserInputError::Unsupported(
4375                        "This query type is not supported by the full node.".to_string(),
4376                    ),
4377                });
4378            }
4379        };
4380
4381        // skip one event if exclusive cursor is provided,
4382        // otherwise truncate to the original limit.
4383        if cursor.is_some() {
4384            if !event_keys.is_empty() {
4385                event_keys.remove(0);
4386            }
4387        } else {
4388            event_keys.truncate(limit - 1);
4389        }
4390
4391        // get the unique set of digests from the event_keys
4392        let transaction_digests = event_keys
4393            .iter()
4394            .map(|(_, digest, _, _)| *digest)
4395            .collect::<HashSet<_>>()
4396            .into_iter()
4397            .collect::<Vec<_>>();
4398
4399        let events = kv_store
4400            .multi_get_events_by_tx_digests(&transaction_digests)
4401            .await?;
4402
4403        let events_map: HashMap<_, _> =
4404            transaction_digests.iter().zip(events.into_iter()).collect();
4405
4406        let stored_events = event_keys
4407            .into_iter()
4408            .map(|k| {
4409                (
4410                    k,
4411                    events_map
4412                        .get(&k.1)
4413                        .expect("fetched digest is missing")
4414                        .clone()
4415                        .and_then(|e| e.data.get(k.2).cloned()),
4416                )
4417            })
4418            .map(
4419                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4420                    event
4421                        .map(|e| (e, tx_digest, event_seq, timestamp))
4422                        .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4423                },
4424            )
4425            .collect::<Result<Vec<_>, _>>()?;
4426
4427        let epoch_store = self.load_epoch_store_one_call_per_task();
4428        let backing_store = self.get_backing_package_store().as_ref();
4429        let mut layout_resolver = epoch_store
4430            .executor()
4431            .type_layout_resolver(Box::new(backing_store));
4432        let mut events = vec![];
4433        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4434            events.push(IotaEvent::try_from(
4435                e.clone(),
4436                tx_digest,
4437                event_seq as u64,
4438                Some(timestamp),
4439                layout_resolver.get_annotated_layout(&e.type_)?,
4440            )?)
4441        }
4442        Ok(events)
4443    }
4444
4445    pub async fn insert_genesis_object(&self, object: Object) {
4446        self.get_reconfig_api()
4447            .try_insert_genesis_object(object)
4448            .expect("Cannot insert genesis object")
4449    }
4450
4451    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4452        futures::future::join_all(
4453            objects
4454                .iter()
4455                .map(|o| self.insert_genesis_object(o.clone())),
4456        )
4457        .await;
4458    }
4459
4460    /// Make a status response for a transaction
4461    #[instrument(level = "trace", skip_all)]
4462    pub fn get_transaction_status(
4463        &self,
4464        transaction_digest: &TransactionDigest,
4465        epoch_store: &Arc<AuthorityPerEpochStore>,
4466    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4467        // TODO: In the case of read path, we should not have to re-sign the effects.
4468        if let Some(effects) =
4469            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4470        {
4471            if let Some(transaction) = self
4472                .get_transaction_cache_reader()
4473                .try_get_transaction_block(transaction_digest)?
4474            {
4475                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4476                let events = if effects.events_digest().is_some() {
4477                    self.get_transaction_events(effects.transaction_digest())?
4478                } else {
4479                    TransactionEvents::default()
4480                };
4481                return Ok(Some((
4482                    (*transaction).clone().into_message(),
4483                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4484                )));
4485            } else {
4486                // The read of effects and read of transaction are not atomic. It's possible
4487                // that we reverted the transaction (during epoch change) in
4488                // between the above two reads, and we end up having effects but
4489                // not transaction. In this case, we just fall through.
4490                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4491            }
4492        }
4493        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4494            self.metrics.tx_already_processed.inc();
4495            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4496            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4497        } else {
4498            Ok(None)
4499        }
4500    }
4501
4502    /// Get the signed effects of the given transaction. If the effects was
4503    /// signed in a previous epoch, re-sign it so that the caller is able to
4504    /// form a cert of the effects in the current epoch.
4505    #[instrument(level = "trace", skip_all)]
4506    pub fn get_signed_effects_and_maybe_resign(
4507        &self,
4508        transaction_digest: &TransactionDigest,
4509        epoch_store: &Arc<AuthorityPerEpochStore>,
4510    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4511        let effects = self
4512            .get_transaction_cache_reader()
4513            .try_get_executed_effects(transaction_digest)?;
4514        match effects {
4515            Some(effects) => {
4516                // If the transaction was executed in previous epochs, the validator will
4517                // re-sign the effects with new current epoch so that a client is always able to
4518                // obtain an effects certificate at the current epoch.
4519                //
4520                // Why is this necessary? Consider the following case:
4521                // - assume there are 4 validators
4522                // - Quorum driver gets 2 signed effects before reconfig halt
4523                // - The tx makes it into final checkpoint.
4524                // - 2 validators go away and are replaced in the new epoch.
4525                // - The new epoch begins.
4526                // - The quorum driver cannot complete the partial effects cert from the
4527                //   previous epoch, because it may not be able to reach either of the 2 former
4528                //   validators.
4529                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4530                //   the new epoch, the QD can make a new effects cert and return it to the
4531                //   client.
4532                //
4533                // This is a considered a short-term workaround. Eventually, Quorum Driver
4534                // should be able to return either an effects certificate, -or-
4535                // a proof of inclusion in a checkpoint. In the case above, the
4536                // Quorum Driver would return a proof of inclusion in the final
4537                // checkpoint, and this code would no longer be necessary.
4538                if effects.epoch() != epoch_store.epoch() {
4539                    debug!(
4540                        tx_digest=?transaction_digest,
4541                        effects_epoch=?effects.epoch(),
4542                        epoch=?epoch_store.epoch(),
4543                        "Re-signing the effects with the current epoch"
4544                    );
4545                }
4546                Ok(Some(self.sign_effects(effects, epoch_store)?))
4547            }
4548            None => Ok(None),
4549        }
4550    }
4551
4552    #[instrument(level = "trace", skip_all)]
4553    pub(crate) fn sign_effects(
4554        &self,
4555        effects: TransactionEffects,
4556        epoch_store: &Arc<AuthorityPerEpochStore>,
4557    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4558        let tx_digest = *effects.transaction_digest();
4559        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4560            Some(sig) => {
4561                debug_assert!(sig.epoch == epoch_store.epoch());
4562                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4563            }
4564            _ => {
4565                let sig = AuthoritySignInfo::new(
4566                    epoch_store.epoch(),
4567                    &effects,
4568                    Intent::iota_app(IntentScope::TransactionEffects),
4569                    self.name,
4570                    &*self.secret,
4571                );
4572
4573                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4574
4575                epoch_store.insert_effects_digest_and_signature(
4576                    &tx_digest,
4577                    effects.digest(),
4578                    &sig,
4579                )?;
4580
4581                effects
4582            }
4583        };
4584
4585        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4586            signed_effects,
4587        ))
4588    }
4589
4590    // Returns coin objects for indexing for fullnode if indexing is enabled.
4591    #[instrument(level = "trace", skip_all)]
4592    fn fullnode_only_get_tx_coins_for_indexing(
4593        &self,
4594        effects: &TransactionEffects,
4595        inner_temporary_store: &InnerTemporaryStore,
4596        epoch_store: &Arc<AuthorityPerEpochStore>,
4597    ) -> Option<TxCoins> {
4598        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4599            return None;
4600        }
4601        let written_coin_objects = inner_temporary_store
4602            .written
4603            .iter()
4604            .filter_map(|(k, v)| {
4605                if v.is_coin() {
4606                    Some((*k, v.clone()))
4607                } else {
4608                    None
4609                }
4610            })
4611            .collect();
4612        let mut input_coin_objects = inner_temporary_store
4613            .input_objects
4614            .iter()
4615            .filter_map(|(k, v)| {
4616                if v.is_coin() {
4617                    Some((*k, v.clone()))
4618                } else {
4619                    None
4620                }
4621            })
4622            .collect::<ObjectMap>();
4623
4624        // Check for receiving objects that were actually used and modified during
4625        // execution. Their updated version will already showup in
4626        // "written_coins" but their input isn't included in the set of input
4627        // objects in a inner_temporary_store.
4628        for (object_id, version) in effects.modified_at_versions() {
4629            if inner_temporary_store
4630                .loaded_runtime_objects
4631                .contains_key(&object_id)
4632            {
4633                if let Some(object) = self
4634                    .get_object_store()
4635                    .get_object_by_key(&object_id, version)
4636                {
4637                    if object.is_coin() {
4638                        input_coin_objects.insert(object_id, object);
4639                    }
4640                }
4641            }
4642        }
4643
4644        Some((input_coin_objects, written_coin_objects))
4645    }
4646
4647    /// Get the TransactionEnvelope that currently locks the given object, if
4648    /// any. Since object locks are only valid for one epoch, we also need
4649    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4650    /// no lock records for the given object can be found.
4651    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4652    /// object record is at a different version.
4653    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4654    /// certain transaction. Returns None if the a lock record is
4655    /// initialized for the given ObjectRef but not yet locked by any
4656    /// transaction,     or cannot find the transaction in transaction
4657    /// table, because of data race etc.
4658    #[instrument(level = "trace", skip_all)]
4659    pub async fn get_transaction_lock(
4660        &self,
4661        object_ref: &ObjectRef,
4662        epoch_store: &AuthorityPerEpochStore,
4663    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4664        let lock_info = self
4665            .get_object_cache_reader()
4666            .try_get_lock(*object_ref, epoch_store)?;
4667        let lock_info = match lock_info {
4668            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4669                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4670                    provided_obj_ref: *object_ref,
4671                    current_version: locked_ref.version,
4672                }
4673                .into());
4674            }
4675            ObjectLockStatus::Initialized => {
4676                return Ok(None);
4677            }
4678            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4679        };
4680
4681        epoch_store.get_signed_transaction(&lock_info)
4682    }
4683
4684    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4685        self.get_object_cache_reader().try_get_objects(objects)
4686    }
4687
4688    /// Non-fallible version of `try_get_objects`.
4689    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4690        self.try_get_objects(objects)
4691            .await
4692            .expect("storage access failed")
4693    }
4694
4695    pub async fn try_get_object_or_tombstone(
4696        &self,
4697        object_id: ObjectID,
4698    ) -> IotaResult<Option<ObjectRef>> {
4699        self.get_object_cache_reader()
4700            .try_get_latest_object_ref_or_tombstone(object_id)
4701    }
4702
4703    /// Non-fallible version of `try_get_object_or_tombstone`.
4704    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4705        self.try_get_object_or_tombstone(object_id)
4706            .await
4707            .expect("storage access failed")
4708    }
4709
4710    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4711    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4712    /// upgrade.
4713    ///
4714    /// This method can be used to dynamic adjust the amount of buffer. If set
4715    /// to 0, the upgrade will go through with only 2f+1 votes.
4716    ///
4717    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4718    /// should have the same value), or you risk halting the chain.
4719    pub fn set_override_protocol_upgrade_buffer_stake(
4720        &self,
4721        expected_epoch: EpochId,
4722        buffer_stake_bps: u64,
4723    ) -> IotaResult {
4724        let epoch_store = self.load_epoch_store_one_call_per_task();
4725        let actual_epoch = epoch_store.epoch();
4726        if actual_epoch != expected_epoch {
4727            return Err(IotaError::WrongEpoch {
4728                expected_epoch,
4729                actual_epoch,
4730            });
4731        }
4732
4733        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4734    }
4735
4736    pub fn clear_override_protocol_upgrade_buffer_stake(
4737        &self,
4738        expected_epoch: EpochId,
4739    ) -> IotaResult {
4740        let epoch_store = self.load_epoch_store_one_call_per_task();
4741        let actual_epoch = epoch_store.epoch();
4742        if actual_epoch != expected_epoch {
4743            return Err(IotaError::WrongEpoch {
4744                expected_epoch,
4745                actual_epoch,
4746            });
4747        }
4748
4749        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4750    }
4751
4752    /// Get the set of system packages that are compiled in to this build, if
4753    /// those packages are compatible with the current versions of those
4754    /// packages on-chain.
4755    pub async fn get_available_system_packages(
4756        &self,
4757        binary_config: &BinaryConfig,
4758    ) -> Vec<ObjectRef> {
4759        let mut results = vec![];
4760
4761        let system_packages = BuiltInFramework::iter_system_packages();
4762
4763        // Add extra framework packages during simtest
4764        #[cfg(msim)]
4765        let extra_packages = framework_injection::get_extra_packages(self.name);
4766        #[cfg(msim)]
4767        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4768
4769        for system_package in system_packages {
4770            let modules = system_package.modules().to_vec();
4771            // In simtests, we could override the current built-in framework packages.
4772            #[cfg(msim)]
4773            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4774                .unwrap_or(modules);
4775
4776            let Some(obj_ref) = iota_framework::compare_system_package(
4777                &self.get_object_store(),
4778                &system_package.id,
4779                &modules,
4780                system_package.dependencies.to_vec(),
4781                binary_config,
4782            )
4783            .await
4784            else {
4785                return vec![];
4786            };
4787            results.push(obj_ref);
4788        }
4789
4790        results
4791    }
4792
4793    /// Return the new versions, module bytes, and dependencies for the packages
4794    /// that have been committed to for a framework upgrade, in
4795    /// `system_packages`.  Loads the module contents from the binary, and
4796    /// performs the following checks:
4797    ///
4798    /// - Whether its contents matches what is on-chain already, in which case
4799    ///   no upgrade is required, and its contents are omitted from the output.
4800    /// - Whether the contents in the binary can form a package whose digest
4801    ///   matches the input, meaning the framework will be upgraded, and this
4802    ///   authority can satisfy that upgrade, in which case the contents are
4803    ///   included in the output.
4804    ///
4805    /// If the current version of the framework can't be loaded, the binary does
4806    /// not contain the bytes for that framework ID, or the resulting
4807    /// package fails the digest check, `None` is returned indicating that
4808    /// this authority cannot run the upgrade that the network voted on.
4809    async fn get_system_package_bytes(
4810        &self,
4811        system_packages: Vec<ObjectRef>,
4812        binary_config: &BinaryConfig,
4813    ) -> Option<Vec<SystemPackage>> {
4814        let ids: Vec<_> = system_packages
4815            .iter()
4816            .map(|object_ref| object_ref.object_id)
4817            .collect();
4818        let objects = self.get_objects(&ids).await;
4819
4820        let mut res = Vec::with_capacity(system_packages.len());
4821        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4822            let prev_transaction = match object {
4823                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4824                    // Skip this one because it doesn't need to be upgraded.
4825                    info!(
4826                        "Framework {} does not need updating",
4827                        system_package_ref.object_id
4828                    );
4829                    continue;
4830                }
4831
4832                Some(cur_object) => cur_object.previous_transaction,
4833                None => TransactionDigest::GENESIS_MARKER,
4834            };
4835
4836            #[cfg(msim)]
4837            let FrameworkSystemPackage {
4838                id: _,
4839                bytes,
4840                dependencies,
4841            } = framework_injection::get_override_system_package(
4842                &system_package_ref.object_id,
4843                self.name,
4844            )
4845            .unwrap_or_else(|| {
4846                BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone()
4847            });
4848
4849            #[cfg(not(msim))]
4850            let FrameworkSystemPackage {
4851                id: _,
4852                bytes,
4853                dependencies,
4854            } = BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone();
4855
4856            let modules: Vec<_> = bytes
4857                .iter()
4858                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4859                .collect();
4860
4861            let new_object = Object::new_system_package(
4862                &modules,
4863                system_package_ref.version,
4864                dependencies.clone(),
4865                prev_transaction,
4866            );
4867
4868            let new_ref = new_object.compute_object_reference();
4869            if new_ref != system_package_ref {
4870                error!(
4871                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4872                );
4873                return None;
4874            }
4875
4876            res.push(SystemPackage {
4877                version: system_package_ref.version,
4878                modules: bytes,
4879                dependencies,
4880            });
4881        }
4882
4883        Some(res)
4884    }
4885
4886    /// Returns the new protocol version and system packages that the network
4887    /// has voted to upgrade to. If the proposed protocol version is not
4888    /// supported, None is returned.
4889    fn is_protocol_version_supported_v1(
4890        proposed_protocol_version: ProtocolVersion,
4891        committee: &Committee,
4892        capabilities: Vec<AuthorityCapabilitiesV1>,
4893        mut buffer_stake_bps: u64,
4894    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4895        if buffer_stake_bps > 10000 {
4896            warn!("clamping buffer_stake_bps to 10000");
4897            buffer_stake_bps = 10000;
4898        }
4899
4900        // For each validator, gather the protocol version and system packages that it
4901        // would like to upgrade to in the next epoch.
4902        let mut desired_upgrades: Vec<_> = capabilities
4903            .into_iter()
4904            .filter_map(|mut cap| {
4905                // A validator that lists no packages is voting against any change at all.
4906                if cap.available_system_packages.is_empty() {
4907                    return None;
4908                }
4909
4910                cap.available_system_packages.sort();
4911
4912                info!(
4913                    "validator {:?} supports {:?} with system packages: {:?}",
4914                    cap.authority.concise(),
4915                    cap.supported_protocol_versions,
4916                    cap.available_system_packages,
4917                );
4918
4919                // A validator that only supports the current protocol version is also voting
4920                // against any change, because framework upgrades always require a protocol
4921                // version bump.
4922                cap.supported_protocol_versions
4923                    .get_version_digest(proposed_protocol_version)
4924                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4925            })
4926            .collect();
4927
4928        // There can only be one set of votes that have a majority, find one if it
4929        // exists.
4930        desired_upgrades.sort();
4931        desired_upgrades
4932            .into_iter()
4933            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4934            .into_iter()
4935            .find_map(|((digest, packages), group)| {
4936                // should have been filtered out earlier.
4937                assert!(!packages.is_empty());
4938
4939                let mut stake_aggregator: StakeAggregator<(), true> =
4940                    StakeAggregator::new(Arc::new(committee.clone()));
4941
4942                for (_, _, authority) in group {
4943                    stake_aggregator.insert_generic(authority, ());
4944                }
4945
4946                let total_votes = stake_aggregator.total_votes();
4947                let quorum_threshold = committee.quorum_threshold();
4948                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4949
4950                info!(
4951                    protocol_config_digest = ?digest,
4952                    ?total_votes,
4953                    ?quorum_threshold,
4954                    ?buffer_stake_bps,
4955                    ?effective_threshold,
4956                    ?proposed_protocol_version,
4957                    ?packages,
4958                    "support for upgrade"
4959                );
4960
4961                let has_support = total_votes >= effective_threshold;
4962                has_support.then_some((proposed_protocol_version, digest, packages))
4963            })
4964    }
4965
4966    /// Selects the highest supported protocol version and system packages that
4967    /// the network has voted to upgrade to. If no upgrade is supported,
4968    /// returns the current protocol version and system packages.
4969    fn choose_protocol_version_and_system_packages_v1(
4970        current_protocol_version: ProtocolVersion,
4971        current_protocol_digest: Digest,
4972        committee: &Committee,
4973        capabilities: Vec<AuthorityCapabilitiesV1>,
4974        buffer_stake_bps: u64,
4975    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4976        let mut next_protocol_version = current_protocol_version;
4977        let mut system_packages = vec![];
4978        let mut protocol_version_digest = current_protocol_digest;
4979
4980        // Finds the highest supported protocol version and system packages by
4981        // incrementing the proposed protocol version by one until no further
4982        // upgrades are supported.
4983        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4984            next_protocol_version + 1,
4985            committee,
4986            capabilities.clone(),
4987            buffer_stake_bps,
4988        ) {
4989            next_protocol_version = version;
4990            protocol_version_digest = digest;
4991            system_packages = packages;
4992        }
4993
4994        (
4995            next_protocol_version,
4996            protocol_version_digest,
4997            system_packages,
4998        )
4999    }
5000
5001    /// Returns the indices of validators that support the given protocol
5002    /// version and digest. This includes both committee and non-committee
5003    /// validators based on their capabilities. Uses active validators
5004    /// instead of committee indices.
5005    fn get_validators_supporting_protocol_version(
5006        target_protocol_version: ProtocolVersion,
5007        target_digest: Digest,
5008        active_validators: &[AuthorityPublicKey],
5009        capabilities: &[AuthorityCapabilitiesV1],
5010    ) -> Vec<u64> {
5011        let mut eligible_validators = Vec::new();
5012
5013        for capability in capabilities {
5014            // Check if this validator supports the target protocol version and digest
5015            if let Some(digest) = capability
5016                .supported_protocol_versions
5017                .get_version_digest(target_protocol_version)
5018            {
5019                if digest == target_digest {
5020                    // Find the validator's index in the active validators list
5021                    if let Some(index) = active_validators
5022                        .iter()
5023                        .position(|name| AuthorityName::from(name) == capability.authority)
5024                    {
5025                        eligible_validators.push(index as u64);
5026                    }
5027                }
5028            }
5029        }
5030
5031        // Sort indices for deterministic behavior
5032        eligible_validators.sort();
5033        eligible_validators
5034    }
5035
5036    /// Calculates the sum of weights for eligible validators that are part of
5037    /// the committee. Takes the indices from
5038    /// get_validators_supporting_protocol_version and maps them back
5039    /// to committee members to get their weights.
5040    fn calculate_eligible_validators_weight(
5041        eligible_validator_indices: &[u64],
5042        active_validators: &[AuthorityPublicKey],
5043        committee: &Committee,
5044    ) -> u64 {
5045        let mut total_weight = 0u64;
5046
5047        for &index in eligible_validator_indices {
5048            let authority_pubkey = &active_validators[index as usize];
5049            // Check if this validator is in the committee and get their weight
5050            if let Some((_, weight)) = committee
5051                .members()
5052                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5053            {
5054                total_weight += weight;
5055            }
5056        }
5057
5058        total_weight
5059    }
5060
5061    /// Creates and execute the advance epoch transaction to effects without
5062    /// committing it to the database. The effects of the change epoch tx
5063    /// are only written to the database after a certified checkpoint has been
5064    /// formed and executed by CheckpointExecutor.
5065    ///
5066    /// When a framework upgraded has been decided on, but the validator does
5067    /// not have the new versions of the packages locally, the validator
5068    /// cannot form the ChangeEpochTx. In this case it returns Err,
5069    /// indicating that the checkpoint builder should give up trying to make the
5070    /// final checkpoint. As long as the network is able to create a certified
5071    /// checkpoint (which should be ensured by the capabilities vote), it
5072    /// will arrive via state sync and be executed by CheckpointExecutor.
5073    #[instrument(level = "error", skip_all)]
5074    pub async fn create_and_execute_advance_epoch_tx(
5075        &self,
5076        epoch_store: &Arc<AuthorityPerEpochStore>,
5077        gas_cost_summary: &GasCostSummary,
5078        checkpoint: CheckpointSequenceNumber,
5079        epoch_start_timestamp_ms: CheckpointTimestamp,
5080        scores: Vec<u64>,
5081    ) -> anyhow::Result<(
5082        IotaSystemState,
5083        Option<SystemEpochInfoEvent>,
5084        TransactionEffects,
5085    )> {
5086        let mut txns = Vec::new();
5087
5088        let next_epoch = epoch_store.epoch() + 1;
5089
5090        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5091        let authority_capabilities = epoch_store
5092            .get_capabilities_v1()
5093            .expect("read capabilities from db cannot fail");
5094        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5095            Self::choose_protocol_version_and_system_packages_v1(
5096                epoch_store.protocol_version(),
5097                SupportedProtocolVersionsWithHashes::protocol_config_digest(
5098                    epoch_store.protocol_config(),
5099                ),
5100                epoch_store.committee(),
5101                authority_capabilities.clone(),
5102                buffer_stake_bps,
5103            );
5104
5105        // since system packages are created during the current epoch, they should abide
5106        // by the rules of the current epoch, including the current epoch's max
5107        // Move binary format version
5108        let config = epoch_store.protocol_config();
5109        let binary_config = to_binary_config(config);
5110        let Some(next_epoch_system_package_bytes) = self
5111            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5112            .await
5113        else {
5114            error!(
5115                "upgraded system packages {:?} are not locally available, cannot create \
5116                ChangeEpochTx. validator binary must be upgraded to the correct version!",
5117                next_epoch_system_packages
5118            );
5119            // the checkpoint builder will keep retrying forever when it hits this error.
5120            // Eventually, one of two things will happen:
5121            // - The operator will upgrade this binary to one that has the new packages
5122            //   locally, and this function will succeed.
5123            // - The final checkpoint will be certified by other validators, we will receive
5124            //   it via state sync, and execute it. This will upgrade the framework
5125            //   packages, reconfigure, and most likely shut down in the new epoch (this
5126            //   validator likely doesn't support the new protocol version, or else it
5127            //   should have had the packages.)
5128            bail!("missing system packages: cannot form ChangeEpochTx");
5129        };
5130
5131        // Use ChangeEpochV3 or ChangeEpochV4 when the feature flags are enabled and
5132        // ChangeEpochV2 requirements are met
5133        if config.select_committee_from_eligible_validators() {
5134            // Get the list of eligible validators that support the target protocol version
5135            let active_validators = epoch_store.epoch_start_state().get_active_validators();
5136
5137            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5138
5139            // Use validators supporting the target protocol version as eligible validators
5140            // in the next version if select_committee_supporting_next_epoch_version feature
5141            // flag is set to true.
5142            if config.select_committee_supporting_next_epoch_version() {
5143                eligible_active_validators = Self::get_validators_supporting_protocol_version(
5144                    next_epoch_protocol_version,
5145                    next_epoch_protocol_digest,
5146                    &active_validators,
5147                    &authority_capabilities,
5148                );
5149
5150                // Calculate the total weight of eligible validators in the committee
5151                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5152                    &eligible_active_validators,
5153                    &active_validators,
5154                    epoch_store.committee(),
5155                );
5156
5157                // Safety check: ensure eligible validators have enough stake
5158                // Use the same effective threshold calculation that was used to decide the
5159                // protocol version
5160                let committee = epoch_store.committee();
5161                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5162
5163                if eligible_validators_weight < effective_threshold {
5164                    error!(
5165                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5166                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5167                    );
5168                    // Pass all active validator indices as eligible validators
5169                    // to perform selection among all of them.
5170                    eligible_active_validators = (0..active_validators.len() as u64).collect();
5171                }
5172            }
5173
5174            // Use ChangeEpochV4 when the pass_validator_scores_to_advance_epoch feature
5175            // flag is enabled.
5176            if config.pass_validator_scores_to_advance_epoch() {
5177                txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5178                    next_epoch,
5179                    next_epoch_protocol_version.as_u64(),
5180                    gas_cost_summary.storage_cost,
5181                    gas_cost_summary.computation_cost,
5182                    gas_cost_summary.computation_cost_burned,
5183                    gas_cost_summary.storage_rebate,
5184                    gas_cost_summary.non_refundable_storage_fee,
5185                    epoch_start_timestamp_ms,
5186                    next_epoch_system_package_bytes,
5187                    eligible_active_validators,
5188                    scores,
5189                    config.adjust_rewards_by_score(),
5190                ));
5191            } else {
5192                txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5193                    next_epoch,
5194                    next_epoch_protocol_version.as_u64(),
5195                    gas_cost_summary.storage_cost,
5196                    gas_cost_summary.computation_cost,
5197                    gas_cost_summary.computation_cost_burned,
5198                    gas_cost_summary.storage_rebate,
5199                    gas_cost_summary.non_refundable_storage_fee,
5200                    epoch_start_timestamp_ms,
5201                    next_epoch_system_package_bytes,
5202                    eligible_active_validators,
5203                ));
5204            }
5205        } else if config.protocol_defined_base_fee()
5206            && config.max_committee_members_count_as_option().is_some()
5207        {
5208            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5209                next_epoch,
5210                next_epoch_protocol_version.as_u64(),
5211                gas_cost_summary.storage_cost,
5212                gas_cost_summary.computation_cost,
5213                gas_cost_summary.computation_cost_burned,
5214                gas_cost_summary.storage_rebate,
5215                gas_cost_summary.non_refundable_storage_fee,
5216                epoch_start_timestamp_ms,
5217                next_epoch_system_package_bytes,
5218            ));
5219        } else {
5220            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5221                next_epoch,
5222                next_epoch_protocol_version.as_u64(),
5223                gas_cost_summary.storage_cost,
5224                gas_cost_summary.computation_cost,
5225                gas_cost_summary.storage_rebate,
5226                gas_cost_summary.non_refundable_storage_fee,
5227                epoch_start_timestamp_ms,
5228                next_epoch_system_package_bytes,
5229            ));
5230        }
5231
5232        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5233
5234        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5235            tx.clone(),
5236            epoch_store.epoch(),
5237            checkpoint,
5238        );
5239
5240        let tx_digest = executable_tx.digest();
5241
5242        info!(
5243            ?next_epoch,
5244            ?next_epoch_protocol_version,
5245            ?next_epoch_system_packages,
5246            computation_cost=?gas_cost_summary.computation_cost,
5247            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5248            storage_cost=?gas_cost_summary.storage_cost,
5249            storage_rebate=?gas_cost_summary.storage_rebate,
5250            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5251            ?tx_digest,
5252            "Creating advance epoch transaction"
5253        );
5254
5255        fail_point_async!("change_epoch_tx_delay");
5256        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5257
5258        // The tx could have been executed by state sync already - if so simply return
5259        // an error. The checkpoint builder will shortly be terminated by
5260        // reconfiguration anyway.
5261        if self
5262            .get_transaction_cache_reader()
5263            .try_is_tx_already_executed(tx_digest)?
5264        {
5265            warn!("change epoch tx has already been executed via state sync");
5266            bail!("change epoch tx has already been executed via state sync",);
5267        }
5268
5269        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5270
5271        // We must manually assign the shared object versions to the transaction before
5272        // executing it. This is because we do not sequence end-of-epoch
5273        // transactions through consensus.
5274        epoch_store.assign_shared_object_versions_idempotent(
5275            self.get_object_cache_reader().as_ref(),
5276            std::slice::from_ref(&executable_tx),
5277        )?;
5278
5279        let (input_objects, _) =
5280            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5281
5282        let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5283            &execution_guard,
5284            &executable_tx,
5285            input_objects,
5286            vec![],
5287            epoch_store,
5288        )?;
5289        let system_obj = get_iota_system_state(&temporary_store.written)
5290            .expect("change epoch tx must write to system object");
5291        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
5292        let system_epoch_info_event = temporary_store
5293            .events
5294            .data
5295            .into_iter()
5296            .find(|event| event.is_system_epoch_info_event())
5297            .map(SystemEpochInfoEvent::from);
5298        // The system epoch info event can be `None` in case if the `advance_epoch`
5299        // Move function call failed and was executed in the safe mode.
5300        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5301
5302        // We must write tx and effects to the state sync tables so that state sync is
5303        // able to deliver to the transaction to CheckpointExecutor after it is
5304        // included in a certified checkpoint.
5305        self.get_state_sync_store()
5306            .try_insert_transaction_and_effects(&tx, &effects)
5307            .map_err(|err| {
5308                let err: anyhow::Error = err.into();
5309                err
5310            })?;
5311
5312        info!(
5313            "Effects summary of the change epoch transaction: {:?}",
5314            effects.summary_for_debug()
5315        );
5316        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5317        // The change epoch transaction cannot fail to execute.
5318        assert!(effects.status().is_success());
5319        Ok((system_obj, system_epoch_info_event, effects))
5320    }
5321
5322    /// This function is called at the very end of the epoch.
5323    /// This step is required before updating new epoch in the db and calling
5324    /// reopen_epoch_db.
5325    #[instrument(level = "error", skip_all)]
5326    async fn revert_uncommitted_epoch_transactions(
5327        &self,
5328        epoch_store: &AuthorityPerEpochStore,
5329    ) -> IotaResult {
5330        {
5331            let state = epoch_store.get_reconfig_state_write_lock_guard();
5332            if state.should_accept_user_certs() {
5333                // Need to change this so that consensus adapter do not accept certificates from
5334                // user. This can happen if our local validator did not initiate
5335                // epoch change locally, but 2f+1 nodes already concluded the
5336                // epoch.
5337                //
5338                // This lock is essentially a barrier for
5339                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5340                // after this block
5341                epoch_store.close_user_certs(state);
5342            }
5343            // lock is dropped here
5344        }
5345        let pending_certificates = epoch_store.pending_consensus_certificates();
5346        info!(
5347            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5348            pending_certificates.len(),
5349            pending_certificates,
5350        );
5351        for digest in pending_certificates {
5352            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5353                info!(
5354                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5355                    digest
5356                );
5357                continue;
5358            }
5359            info!("Reverting {:?} at the end of epoch", digest);
5360            epoch_store.revert_executed_transaction(&digest)?;
5361            self.get_reconfig_api().try_revert_state_update(&digest)?;
5362        }
5363        info!("All uncommitted local transactions reverted");
5364        Ok(())
5365    }
5366
5367    #[instrument(level = "error", skip_all)]
5368    async fn reopen_epoch_db(
5369        &self,
5370        cur_epoch_store: &AuthorityPerEpochStore,
5371        new_committee: Committee,
5372        epoch_start_configuration: EpochStartConfiguration,
5373        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5374        epoch_last_checkpoint: CheckpointSequenceNumber,
5375    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5376        let new_epoch = new_committee.epoch;
5377        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5378        assert_eq!(
5379            epoch_start_configuration.epoch_start_state().epoch(),
5380            new_committee.epoch
5381        );
5382        fail_point!("before-open-new-epoch-store");
5383        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5384            self.name,
5385            new_committee,
5386            epoch_start_configuration,
5387            self.get_backing_package_store().clone(),
5388            expensive_safety_check_config,
5389            epoch_last_checkpoint,
5390        )?;
5391        self.epoch_store.store(new_epoch_store.clone());
5392        Ok(new_epoch_store)
5393    }
5394
5395    /// Checks if `authenticator` unlocks a valid Move account and returns the
5396    /// account-related `AuthenticatorFunctionRef` object.
5397    fn check_move_account(
5398        &self,
5399        auth_account_object_id: ObjectID,
5400        auth_account_object_seq_number: Option<SequenceNumber>,
5401        auth_account_object_digest: Option<ObjectDigest>,
5402        account_object: ObjectReadResult,
5403        signer: &IotaAddress,
5404    ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5405        let account_object = match account_object.object {
5406            ObjectReadResultKind::Object(object) => Ok(object),
5407            ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5408                Err(UserInputError::AccountObjectDeleted {
5409                    account_id: account_object.id(),
5410                    account_version: version,
5411                    transaction_digest: digest,
5412                })
5413            }
5414            // It is impossible to check the account object because it is used in a canceled
5415            // transaction and is not loaded.
5416            ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5417                Err(UserInputError::AccountObjectInCanceledTransaction {
5418                    account_id: account_object.id(),
5419                    account_version: version,
5420                })
5421            }
5422        }?;
5423
5424        let account_object_addr = IotaAddress::from(auth_account_object_id);
5425
5426        fp_ensure!(
5427            signer == &account_object_addr,
5428            UserInputError::IncorrectUserSignature {
5429                error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5430            }
5431            .into()
5432        );
5433
5434        fp_ensure!(
5435            account_object.is_shared() || account_object.is_immutable(),
5436            UserInputError::AccountObjectNotSupported {
5437                object_id: auth_account_object_id
5438            }
5439            .into()
5440        );
5441
5442        let auth_account_object_seq_number =
5443            if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5444                let account_object_version = account_object.version();
5445
5446                fp_ensure!(
5447                    account_object_version == auth_account_object_seq_number,
5448                    UserInputError::AccountObjectVersionMismatch {
5449                        object_id: auth_account_object_id,
5450                        expected_version: auth_account_object_seq_number,
5451                        actual_version: account_object_version,
5452                    }
5453                    .into()
5454                );
5455
5456                auth_account_object_seq_number
5457            } else {
5458                account_object.version()
5459            };
5460
5461        if let Some(auth_account_object_digest) = auth_account_object_digest {
5462            let expected_digest = account_object.digest();
5463            fp_ensure!(
5464                expected_digest == auth_account_object_digest,
5465                UserInputError::InvalidAccountObjectDigest {
5466                    object_id: auth_account_object_id,
5467                    expected_digest,
5468                    actual_digest: auth_account_object_digest,
5469                }
5470                .into()
5471            );
5472        }
5473
5474        let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5475            auth_account_object_id,
5476            &AuthenticatorFunctionRefV1Key::tag().into(),
5477            &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5478        )
5479        .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5480            account_object_id: auth_account_object_id,
5481        })?;
5482
5483        let authenticator_function_ref_field = self
5484            .get_object_cache_reader()
5485            .try_find_object_lt_or_eq_version(
5486                authenticator_function_ref_field_id,
5487                auth_account_object_seq_number,
5488            )?;
5489
5490        if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5491            let field_move_object = authenticator_function_ref_field_obj
5492                .data
5493                .as_struct_opt()
5494                .expect("dynamic field should never be a package object");
5495
5496            let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5497                field_move_object.to_rust().ok_or(
5498                    UserInputError::InvalidAuthenticatorFunctionRefField {
5499                        account_object_id: auth_account_object_id,
5500                    },
5501                )?;
5502
5503            Ok(AuthenticatorFunctionRefForExecution::new_v1(
5504                field.value,
5505                authenticator_function_ref_field_obj.compute_object_reference(),
5506                authenticator_function_ref_field_obj.owner,
5507                authenticator_function_ref_field_obj.storage_rebate,
5508                authenticator_function_ref_field_obj.previous_transaction,
5509            ))
5510        } else {
5511            Err(UserInputError::MoveAuthenticatorNotFound {
5512                authenticator_function_ref_id: authenticator_function_ref_field_id,
5513                account_object_id: auth_account_object_id,
5514                account_object_version: auth_account_object_seq_number,
5515            }
5516            .into())
5517        }
5518    }
5519
5520    #[allow(clippy::type_complexity)]
5521    fn read_objects_for_signing(
5522        &self,
5523        transaction: &VerifiedTransaction,
5524        epoch: u64,
5525    ) -> IotaResult<(
5526        InputObjects,
5527        ReceivingObjects,
5528        Vec<(InputObjects, ObjectReadResult)>,
5529    )> {
5530        let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5531            Some(transaction.digest()),
5532            &transaction.collect_all_input_object_kind_for_reading()?,
5533            &transaction.data().transaction_data().receiving_objects(),
5534            epoch,
5535        )?;
5536
5537        transaction
5538            .split_input_objects_into_groups_for_reading(input_objects)
5539            .map(|(tx_input_objects, per_authenticator_inputs)| {
5540                (
5541                    tx_input_objects,
5542                    tx_receiving_objects,
5543                    per_authenticator_inputs,
5544                )
5545            })
5546    }
5547
5548    #[allow(clippy::type_complexity)]
5549    fn check_transaction_inputs_for_signing(
5550        &self,
5551        protocol_config: &ProtocolConfig,
5552        reference_gas_price: u64,
5553        tx_data: &TransactionData,
5554        tx_input_objects: InputObjects,
5555        tx_receiving_objects: &ReceivingObjects,
5556        move_authenticators: &Vec<&MoveAuthenticator>,
5557        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5558    ) -> IotaResult<(
5559        IotaGasStatus,
5560        CheckedInputObjects,
5561        Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5562    )> {
5563        let authenticator_gas_budget = if move_authenticators.is_empty() {
5564            0
5565        } else {
5566            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
5567            // not a part of the transaction data.
5568            protocol_config.max_auth_gas()
5569        };
5570
5571        debug_assert_eq!(
5572            move_authenticators.len(),
5573            per_authenticator_inputs.len(),
5574            "Move authenticators amount must match the number of authenticator inputs"
5575        );
5576
5577        let per_authenticator_checked_inputs = move_authenticators
5578            .iter()
5579            .zip(per_authenticator_inputs)
5580            .map(
5581                |(move_authenticator, (authenticator_input_objects, account_object))| {
5582                    // Check basic `object_to_authenticate` preconditions and get its components.
5583                    let (
5584                        auth_account_object_id,
5585                        auth_account_object_seq_number,
5586                        auth_account_object_digest,
5587                    ) = move_authenticator.object_to_authenticate_components()?;
5588
5589                    let signer = move_authenticator.address()?;
5590
5591                    // Make sure the signer is a Move account.
5592                    let AuthenticatorFunctionRefForExecution {
5593                        authenticator_function_ref,
5594                        ..
5595                    } = self.check_move_account(
5596                        auth_account_object_id,
5597                        auth_account_object_seq_number,
5598                        auth_account_object_digest,
5599                        account_object,
5600                        &signer,
5601                    )?;
5602
5603                    // Check the MoveAuthenticator input objects.
5604                    let authenticator_checked_input_objects =
5605                        iota_transaction_checks::check_move_authenticator_input_for_signing(
5606                            authenticator_input_objects,
5607                        )?;
5608
5609                    Ok((
5610                        authenticator_checked_input_objects,
5611                        authenticator_function_ref,
5612                    ))
5613                },
5614            )
5615            .collect::<IotaResult<Vec<_>>>()?;
5616
5617        // Check the transaction inputs.
5618        let (gas_status, tx_checked_input_objects) =
5619            iota_transaction_checks::check_transaction_input(
5620                protocol_config,
5621                reference_gas_price,
5622                tx_data,
5623                tx_input_objects,
5624                tx_receiving_objects,
5625                &self.metrics.bytecode_verifier_metrics,
5626                &self.config.verifier_signing_config,
5627                authenticator_gas_budget,
5628            )?;
5629
5630        Ok((
5631            gas_status,
5632            tx_checked_input_objects,
5633            per_authenticator_checked_inputs,
5634        ))
5635    }
5636
5637    #[cfg(test)]
5638    pub(crate) fn iter_live_object_set_for_testing(
5639        &self,
5640    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5641        self.get_global_state_hash_store()
5642            .iter_cached_live_object_set_for_testing()
5643    }
5644
5645    #[cfg(test)]
5646    pub(crate) fn shutdown_execution_for_test(&self) {
5647        self.tx_execution_shutdown
5648            .lock()
5649            .take()
5650            .unwrap()
5651            .send(())
5652            .unwrap();
5653    }
5654
5655    /// NOTE: this function is only to be used for fuzzing and testing. Never
5656    /// use in prod
5657    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5658        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5659        self.get_object_cache_reader()
5660            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5661        self.get_reconfig_api()
5662            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5663    }
5664}
5665
5666pub struct RandomnessRoundReceiver {
5667    authority_state: Arc<AuthorityState>,
5668    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5669}
5670
5671impl RandomnessRoundReceiver {
5672    pub fn spawn(
5673        authority_state: Arc<AuthorityState>,
5674        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5675    ) -> JoinHandle<()> {
5676        let rrr = RandomnessRoundReceiver {
5677            authority_state,
5678            randomness_rx,
5679        };
5680        spawn_monitored_task!(rrr.run())
5681    }
5682
5683    async fn run(mut self) {
5684        info!("RandomnessRoundReceiver event loop started");
5685
5686        loop {
5687            tokio::select! {
5688                maybe_recv = self.randomness_rx.recv() => {
5689                    if let Some((epoch, round, bytes)) = maybe_recv {
5690                        self.handle_new_randomness(epoch, round, bytes);
5691                    } else {
5692                        break;
5693                    }
5694                },
5695            }
5696        }
5697
5698        info!("RandomnessRoundReceiver event loop ended");
5699    }
5700
5701    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5702    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5703        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5704        if epoch_store.epoch() != epoch {
5705            warn!(
5706                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5707                epoch_store.epoch()
5708            );
5709            return;
5710        }
5711        let transaction = VerifiedTransaction::new_randomness_state_update(
5712            epoch,
5713            round,
5714            bytes,
5715            epoch_store
5716                .epoch_start_config()
5717                .randomness_obj_initial_shared_version(),
5718        );
5719        debug!(
5720            "created randomness state update transaction with digest: {:?}",
5721            transaction.digest()
5722        );
5723        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5724        let digest = *transaction.digest();
5725
5726        // Randomness state updates contain the full bls signature for the random round,
5727        // which cannot necessarily be reconstructed again later. Therefore we must
5728        // immediately persist this transaction. If we crash before its outputs
5729        // are committed, this ensures we will be able to re-execute it.
5730        self.authority_state
5731            .get_cache_commit()
5732            .persist_transaction(&transaction);
5733
5734        // Send transaction to TransactionManager for execution.
5735        self.authority_state
5736            .transaction_manager()
5737            .enqueue(vec![transaction], &epoch_store);
5738
5739        let authority_state = self.authority_state.clone();
5740        spawn_monitored_task!(async move {
5741            // Wait for transaction execution in a separate task, to avoid deadlock in case
5742            // of out-of-order randomness generation. (Each
5743            // RandomnessStateUpdate depends on the output of the
5744            // RandomnessStateUpdate from the previous round.)
5745            //
5746            // We set a very long timeout so that in case this gets stuck for some reason,
5747            // the validator will eventually crash rather than continuing in a
5748            // zombie mode.
5749            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5750            let result = tokio::time::timeout(
5751                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5752                authority_state
5753                    .get_transaction_cache_reader()
5754                    .try_notify_read_executed_effects(&[digest]),
5755            )
5756            .await;
5757            let result = match result {
5758                Ok(result) => result,
5759                Err(_) => {
5760                    if cfg!(debug_assertions) {
5761                        // Crash on randomness update execution timeout in debug builds.
5762                        panic!(
5763                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5764                        );
5765                    }
5766                    warn!(
5767                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5768                    );
5769                    // Continue waiting as long as necessary in non-debug builds.
5770                    authority_state
5771                        .get_transaction_cache_reader()
5772                        .try_notify_read_executed_effects(&[digest])
5773                        .await
5774                }
5775            };
5776
5777            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5778            let effects = effects.pop().expect("should return effects");
5779            if *effects.status() != ExecutionStatus::Success {
5780                fatal!(
5781                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5782                );
5783            }
5784            debug!(
5785                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5786            );
5787        });
5788    }
5789}
5790
5791#[async_trait]
5792impl TransactionKeyValueStoreTrait for AuthorityState {
5793    async fn multi_get(
5794        &self,
5795        transaction_keys: &[TransactionDigest],
5796        effects_keys: &[TransactionDigest],
5797    ) -> IotaResult<KVStoreTransactionData> {
5798        let txns = if !transaction_keys.is_empty() {
5799            self.get_transaction_cache_reader()
5800                .try_multi_get_transaction_blocks(transaction_keys)?
5801                .into_iter()
5802                .map(|t| t.map(|t| (*t).clone().into_inner()))
5803                .collect()
5804        } else {
5805            vec![]
5806        };
5807
5808        let fx = if !effects_keys.is_empty() {
5809            self.get_transaction_cache_reader()
5810                .try_multi_get_executed_effects(effects_keys)?
5811        } else {
5812            vec![]
5813        };
5814
5815        Ok((txns, fx))
5816    }
5817
5818    async fn multi_get_checkpoints(
5819        &self,
5820        checkpoint_summaries: &[CheckpointSequenceNumber],
5821        checkpoint_contents: &[CheckpointSequenceNumber],
5822        checkpoint_summaries_by_digest: &[CheckpointDigest],
5823    ) -> IotaResult<(
5824        Vec<Option<CertifiedCheckpointSummary>>,
5825        Vec<Option<CheckpointContents>>,
5826        Vec<Option<CertifiedCheckpointSummary>>,
5827    )> {
5828        // TODO: use multi-get methods if it ever becomes important (unlikely)
5829        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5830        let store = self.get_checkpoint_store();
5831        for seq in checkpoint_summaries {
5832            let checkpoint = store
5833                .get_checkpoint_by_sequence_number(*seq)?
5834                .map(|c| c.into_inner());
5835
5836            summaries.push(checkpoint);
5837        }
5838
5839        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5840        for seq in checkpoint_contents {
5841            let checkpoint = store
5842                .get_checkpoint_by_sequence_number(*seq)?
5843                .and_then(|summary| {
5844                    store
5845                        .get_checkpoint_contents(&summary.content_digest)
5846                        .expect("db read cannot fail")
5847                });
5848            contents.push(checkpoint);
5849        }
5850
5851        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5852        for digest in checkpoint_summaries_by_digest {
5853            let checkpoint = store
5854                .get_checkpoint_by_digest(digest)?
5855                .map(|c| c.into_inner());
5856            summaries_by_digest.push(checkpoint);
5857        }
5858
5859        Ok((summaries, contents, summaries_by_digest))
5860    }
5861
5862    async fn get_transaction_perpetual_checkpoint(
5863        &self,
5864        digest: TransactionDigest,
5865    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5866        self.get_checkpoint_cache()
5867            .try_get_transaction_perpetual_checkpoint(&digest)
5868            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5869    }
5870
5871    async fn get_object(
5872        &self,
5873        object_id: ObjectID,
5874        version: VersionNumber,
5875    ) -> IotaResult<Option<Object>> {
5876        self.get_object_cache_reader()
5877            .try_get_object_by_key(&object_id, version)
5878    }
5879
5880    #[instrument(skip_all)]
5881    async fn multi_get_objects(
5882        &self,
5883        object_keys: &[ObjectKey],
5884    ) -> IotaResult<Vec<Option<Object>>> {
5885        Ok(self
5886            .get_object_cache_reader()
5887            .multi_get_objects_by_key(object_keys))
5888    }
5889
5890    async fn multi_get_transactions_perpetual_checkpoints(
5891        &self,
5892        digests: &[TransactionDigest],
5893    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5894        let res = self
5895            .get_checkpoint_cache()
5896            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5897
5898        Ok(res
5899            .into_iter()
5900            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5901            .collect())
5902    }
5903
5904    #[instrument(skip(self, digests), fields(digests = digests.iter().map(|d| d.to_string()).collect::<Vec<String>>().join(", ")))]
5905    async fn multi_get_events_by_tx_digests(
5906        &self,
5907        digests: &[TransactionDigest],
5908    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5909        if digests.is_empty() {
5910            return Ok(vec![]);
5911        }
5912
5913        Ok(self
5914            .get_transaction_cache_reader()
5915            .multi_get_events(digests))
5916    }
5917}
5918
5919#[cfg(msim)]
5920pub mod framework_injection {
5921    use std::{
5922        cell::RefCell,
5923        collections::{BTreeMap, BTreeSet},
5924    };
5925
5926    use iota_framework::{BuiltInFramework, SystemPackage};
5927    use iota_types::base_types::{AuthorityName, ObjectID};
5928    use move_binary_format::CompiledModule;
5929
5930    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5931
5932    // Thread local cache because all simtests run in a single unique thread.
5933    thread_local! {
5934        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5935    }
5936
5937    type Framework = Vec<CompiledModule>;
5938
5939    pub type PackageUpgradeCallback =
5940        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5941
5942    enum PackageOverrideConfig {
5943        Global(Framework),
5944        PerValidator(PackageUpgradeCallback),
5945    }
5946
5947    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5948        modules
5949            .iter()
5950            .map(|m| {
5951                let mut buf = Vec::new();
5952                m.serialize_with_version(m.version, &mut buf).unwrap();
5953                buf
5954            })
5955            .collect()
5956    }
5957
5958    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5959        OVERRIDE.with(|bs| {
5960            bs.borrow_mut()
5961                .insert(package_id, PackageOverrideConfig::Global(modules))
5962        });
5963    }
5964
5965    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5966        OVERRIDE.with(|bs| {
5967            bs.borrow_mut()
5968                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5969        });
5970    }
5971
5972    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5973        OVERRIDE.with(|cfg| {
5974            cfg.borrow().get(package_id).and_then(|entry| match entry {
5975                PackageOverrideConfig::Global(framework) => {
5976                    Some(compiled_modules_to_bytes(framework))
5977                }
5978                PackageOverrideConfig::PerValidator(func) => {
5979                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
5980                }
5981            })
5982        })
5983    }
5984
5985    pub fn get_override_modules(
5986        package_id: &ObjectID,
5987        name: AuthorityName,
5988    ) -> Option<Vec<CompiledModule>> {
5989        OVERRIDE.with(|cfg| {
5990            cfg.borrow().get(package_id).and_then(|entry| match entry {
5991                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5992                PackageOverrideConfig::PerValidator(func) => func(name),
5993            })
5994        })
5995    }
5996
5997    pub fn get_override_system_package(
5998        package_id: &ObjectID,
5999        name: AuthorityName,
6000    ) -> Option<SystemPackage> {
6001        let bytes = get_override_bytes(package_id, name)?;
6002        let dependencies = if package_id.is_system_package() {
6003            BuiltInFramework::get_package_by_id(package_id)
6004                .dependencies
6005                .to_vec()
6006        } else {
6007            // Assume that entirely new injected packages depend on all existing system
6008            // packages.
6009            BuiltInFramework::all_package_ids()
6010        };
6011        Some(SystemPackage {
6012            id: *package_id,
6013            bytes,
6014            dependencies,
6015        })
6016    }
6017
6018    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6019        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6020        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6021            cfg.borrow()
6022                .keys()
6023                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6024                .collect()
6025        });
6026
6027        extra
6028            .into_iter()
6029            .map(|package| SystemPackage {
6030                id: package,
6031                bytes: get_override_bytes(&package, name).unwrap(),
6032                dependencies: BuiltInFramework::all_package_ids(),
6033            })
6034            .collect()
6035    }
6036}
6037
6038#[derive(Debug, Serialize, Deserialize, Clone)]
6039pub struct ObjDumpFormat {
6040    pub id: ObjectID,
6041    pub version: VersionNumber,
6042    pub digest: ObjectDigest,
6043    pub object: Object,
6044}
6045
6046impl ObjDumpFormat {
6047    fn new(object: Object) -> Self {
6048        let oref = object.compute_object_reference();
6049        Self {
6050            id: oref.object_id,
6051            version: oref.version,
6052            digest: oref.digest,
6053            object,
6054        }
6055    }
6056}
6057
6058#[derive(Debug, Serialize, Deserialize, Clone)]
6059pub struct NodeStateDump {
6060    pub tx_digest: TransactionDigest,
6061    pub sender_signed_data: SenderSignedData,
6062    pub executed_epoch: u64,
6063    pub reference_gas_price: u64,
6064    pub protocol_version: u64,
6065    pub epoch_start_timestamp_ms: u64,
6066    pub computed_effects: TransactionEffects,
6067    pub expected_effects_digest: TransactionEffectsDigest,
6068    pub relevant_system_packages: Vec<ObjDumpFormat>,
6069    pub shared_objects: Vec<ObjDumpFormat>,
6070    pub loaded_child_objects: Vec<ObjDumpFormat>,
6071    pub modified_at_versions: Vec<ObjDumpFormat>,
6072    pub runtime_reads: Vec<ObjDumpFormat>,
6073    pub input_objects: Vec<ObjDumpFormat>,
6074}
6075
6076impl NodeStateDump {
6077    pub fn new(
6078        tx_digest: &TransactionDigest,
6079        effects: &TransactionEffects,
6080        expected_effects_digest: TransactionEffectsDigest,
6081        object_store: &dyn ObjectStore,
6082        epoch_store: &Arc<AuthorityPerEpochStore>,
6083        inner_temporary_store: &InnerTemporaryStore,
6084        certificate: &VerifiedExecutableTransaction,
6085    ) -> IotaResult<Self> {
6086        // Epoch info
6087        let executed_epoch = epoch_store.epoch();
6088        let reference_gas_price = epoch_store.reference_gas_price();
6089        let epoch_start_config = epoch_store.epoch_start_config();
6090        let protocol_version = epoch_store.protocol_version().as_u64();
6091        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6092
6093        // Record all system packages at this version
6094        let mut relevant_system_packages = Vec::new();
6095        for sys_package_id in BuiltInFramework::all_package_ids() {
6096            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6097                relevant_system_packages.push(ObjDumpFormat::new(w))
6098            }
6099        }
6100
6101        // Record all the shared objects
6102        let mut shared_objects = Vec::new();
6103        for kind in effects.input_shared_objects() {
6104            match kind {
6105                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6106                    if let Some(w) =
6107                        object_store.try_get_object_by_key(&obj_ref.object_id, obj_ref.version)?
6108                    {
6109                        shared_objects.push(ObjDumpFormat::new(w))
6110                    }
6111                }
6112                InputSharedObject::ReadDeleted(..)
6113                | InputSharedObject::MutateDeleted(..)
6114                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
6115                                                           * objects. */
6116            }
6117        }
6118
6119        // Record all loaded child objects
6120        // Child objects which are read but not mutated are not tracked anywhere else
6121        let mut loaded_child_objects = Vec::new();
6122        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6123            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6124                loaded_child_objects.push(ObjDumpFormat::new(w))
6125            }
6126        }
6127
6128        // Record all modified objects
6129        let mut modified_at_versions = Vec::new();
6130        for (id, ver) in effects.modified_at_versions() {
6131            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6132                modified_at_versions.push(ObjDumpFormat::new(w))
6133            }
6134        }
6135
6136        // Packages read at runtime, which were not previously loaded into the temoorary
6137        // store Some packages may be fetched at runtime and wont show up in
6138        // input objects
6139        let mut runtime_reads = Vec::new();
6140        for obj in inner_temporary_store
6141            .runtime_packages_loaded_from_db
6142            .values()
6143        {
6144            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6145        }
6146
6147        // All other input objects should already be in `inner_temporary_store.objects`
6148
6149        Ok(Self {
6150            tx_digest: *tx_digest,
6151            executed_epoch,
6152            reference_gas_price,
6153            epoch_start_timestamp_ms,
6154            protocol_version,
6155            relevant_system_packages,
6156            shared_objects,
6157            loaded_child_objects,
6158            modified_at_versions,
6159            runtime_reads,
6160            sender_signed_data: certificate.clone().into_message(),
6161            input_objects: inner_temporary_store
6162                .input_objects
6163                .values()
6164                .map(|o| ObjDumpFormat::new(o.clone()))
6165                .collect(),
6166            computed_effects: effects.clone(),
6167            expected_effects_digest,
6168        })
6169    }
6170
6171    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6172        let mut objects = Vec::new();
6173        objects.extend(self.relevant_system_packages.clone());
6174        objects.extend(self.shared_objects.clone());
6175        objects.extend(self.loaded_child_objects.clone());
6176        objects.extend(self.modified_at_versions.clone());
6177        objects.extend(self.runtime_reads.clone());
6178        objects.extend(self.input_objects.clone());
6179        objects
6180    }
6181
6182    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6183        let file_name = format!(
6184            "{}_{}_NODE_DUMP.json",
6185            self.tx_digest,
6186            AuthorityState::unixtime_now_ms()
6187        );
6188        let mut path = path.to_path_buf();
6189        path.push(&file_name);
6190        let mut file = File::create(path.clone())?;
6191        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6192        Ok(path)
6193    }
6194
6195    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6196        let file = File::open(path)?;
6197        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6198    }
6199}
6200
6201/// Returns the [`MoveAuthenticator`]s to execute during the pre-consensus
6202/// phase.
6203///
6204/// When `pre_consensus_sponsor_only_move_authentication` is enabled:
6205/// - For sponsored transactions: only the sponsor's [`MoveAuthenticator`] is
6206///   returned (empty if the sponsor does not use one).
6207/// - For non-sponsored transactions: all [`MoveAuthenticator`]s are returned
6208///   (currently only the sender's).
6209///
6210/// When the flag is not set, all [`MoveAuthenticator`]s are returned for
6211/// compatibility.
6212fn pre_consensus_move_authenticators<'a>(
6213    tx: &'a VerifiedTransaction,
6214    protocol_config: &ProtocolConfig,
6215) -> Vec<&'a MoveAuthenticator> {
6216    if protocol_config.pre_consensus_sponsor_only_move_authentication() {
6217        if tx.transaction_data().is_sponsored_tx() {
6218            if let Some(sponsor_move_authenticator) = tx.sponsor_move_authenticator() {
6219                vec![sponsor_move_authenticator]
6220            } else {
6221                vec![]
6222            }
6223        } else {
6224            tx.move_authenticators()
6225        }
6226    } else {
6227        tx.move_authenticators()
6228    }
6229}