iota_core/
authority.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::{BTreeMap, HashMap, HashSet},
8    fs,
9    fs::File,
10    io::Write,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, atomic::Ordering},
14    time::{Duration, SystemTime, UNIX_EPOCH},
15    vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24    encoding::{Base58, Encoding},
25    hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30    NodeConfig,
31    genesis::Genesis,
32    node::{
33        AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34        StateDebugDumpConfig,
35    },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39    DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40    IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41    IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45    TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49    key_value_store::{
50        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51    },
52    key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57    IOTA_SYSTEM_ADDRESS, TypeTag,
58    account_abstraction::{
59        account::AuthenticatorFunctionRefV1Key,
60        authenticator_function::{
61            AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62            AuthenticatorFunctionRefV1,
63        },
64    },
65    authenticator_state::get_authenticator_state,
66    base_types::*,
67    committee::{Committee, EpochId, ProtocolVersion},
68    crypto::{
69        AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70        default_hash,
71    },
72    deny_list_v1::check_coin_deny_list_v1_during_signing,
73    digests::{ChainIdentifier, Digest},
74    dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75    effects::{
76        InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77        TransactionEvents, VerifiedSignedTransactionEffects,
78    },
79    error::{ExecutionError, IotaError, IotaResult, UserInputError},
80    event::{Event, EventID, SystemEpochInfoEvent},
81    executable_transaction::VerifiedExecutableTransaction,
82    execution_config_utils::to_binary_config,
83    execution_status::ExecutionStatus,
84    fp_ensure,
85    gas::{GasCostSummary, IotaGasStatus},
86    gas_coin::NANOS_PER_IOTA,
87    inner_temporary_store::{
88        InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89        WrittenObjects,
90    },
91    iota_system_state::{
92        IotaSystemState, IotaSystemStateTrait,
93        epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94    },
95    is_system_package,
96    layout_resolver::{LayoutResolver, into_struct_layout},
97    message_envelope::Message,
98    messages_checkpoint::{
99        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100        CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101        CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102        CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103    },
104    messages_consensus::AuthorityCapabilitiesV1,
105    messages_grpc::{
106        HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107        ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108        TransactionStatus,
109    },
110    metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111    move_authenticator::MoveAuthenticator,
112    object::{
113        MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114        bounded_visitor::BoundedVisitor,
115    },
116    storage::{
117        BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118    },
119    supported_protocol_versions::{
120        ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121    },
122    transaction::*,
123    transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131    register_histogram_vec_with_registry, register_histogram_with_registry,
132    register_int_counter_vec_with_registry, register_int_counter_with_registry,
133    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138    sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139    task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145    authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149    CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152    authority::{
153        authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154        authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155        authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156        authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157        authority_store_tables::AuthorityPrunerTables,
158        epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159    },
160    authority_client::NetworkAuthorityClient,
161    checkpoints::CheckpointStore,
162    congestion_tracker::CongestionTracker,
163    consensus_adapter::ConsensusAdapter,
164    epoch::committee_store::CommitteeStore,
165    execution_cache::{
166        CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167        ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168        TransactionCacheRead,
169    },
170    execution_driver::execution_process,
171    grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
172    jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
173    metrics::{LatencyObserver, RateTracker},
174    module_cache_metrics::ResolverMetrics,
175    overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
176    stake_aggregator::StakeAggregator,
177    state_accumulator::{AccumulatorStore, StateAccumulator},
178    subscription_handler::SubscriptionHandler,
179    transaction_input_loader::TransactionInputLoader,
180    transaction_manager::TransactionManager,
181    transaction_outputs::TransactionOutputs,
182    validator_tx_finalizer::ValidatorTxFinalizer,
183    verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223mod authority_store_migrations;
224pub mod authority_store_pruner;
225pub mod authority_store_tables;
226pub mod authority_store_types;
227pub mod epoch_start_configuration;
228pub mod shared_object_congestion_tracker;
229pub mod shared_object_version_manager;
230pub mod suggested_gas_price_calculator;
231pub mod test_authority_builder;
232pub mod transaction_deferral;
233
234pub(crate) mod authority_store;
235pub mod backpressure;
236
237/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
238pub struct AuthorityMetrics {
239    tx_orders: IntCounter,
240    total_certs: IntCounter,
241    total_cert_attempts: IntCounter,
242    total_effects: IntCounter,
243    pub shared_obj_tx: IntCounter,
244    sponsored_tx: IntCounter,
245    tx_already_processed: IntCounter,
246    num_input_objs: Histogram,
247    num_shared_objects: Histogram,
248    batch_size: Histogram,
249
250    authority_state_handle_transaction_latency: Histogram,
251
252    execute_certificate_latency_single_writer: Histogram,
253    execute_certificate_latency_shared_object: Histogram,
254
255    internal_execution_latency: Histogram,
256    execution_load_input_objects_latency: Histogram,
257    prepare_certificate_latency: Histogram,
258    commit_certificate_latency: Histogram,
259    db_checkpoint_latency: Histogram,
260
261    pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
262    pub(crate) transaction_manager_num_missing_objects: IntGauge,
263    pub(crate) transaction_manager_num_pending_certificates: IntGauge,
264    pub(crate) transaction_manager_num_executing_certificates: IntGauge,
265    pub(crate) transaction_manager_num_ready: IntGauge,
266    pub(crate) transaction_manager_object_cache_size: IntGauge,
267    pub(crate) transaction_manager_object_cache_hits: IntCounter,
268    pub(crate) transaction_manager_object_cache_misses: IntCounter,
269    pub(crate) transaction_manager_object_cache_evictions: IntCounter,
270    pub(crate) transaction_manager_package_cache_size: IntGauge,
271    pub(crate) transaction_manager_package_cache_hits: IntCounter,
272    pub(crate) transaction_manager_package_cache_misses: IntCounter,
273    pub(crate) transaction_manager_package_cache_evictions: IntCounter,
274    pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
275
276    pub(crate) execution_driver_executed_transactions: IntCounter,
277    pub(crate) execution_driver_dispatch_queue: IntGauge,
278    pub(crate) execution_queueing_delay_s: Histogram,
279    pub(crate) prepare_cert_gas_latency_ratio: Histogram,
280    pub(crate) execution_gas_latency_ratio: Histogram,
281
282    pub(crate) skipped_consensus_txns: IntCounter,
283    pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
284
285    pub(crate) authority_overload_status: IntGauge,
286    pub(crate) authority_load_shedding_percentage: IntGauge,
287
288    pub(crate) transaction_overload_sources: IntCounterVec,
289
290    /// Post processing metrics
291    post_processing_total_events_emitted: IntCounter,
292    post_processing_total_tx_indexed: IntCounter,
293    post_processing_total_tx_had_event_processed: IntCounter,
294    post_processing_total_failures: IntCounter,
295
296    /// Consensus handler metrics
297    pub consensus_handler_processed: IntCounterVec,
298    pub consensus_handler_transaction_sizes: HistogramVec,
299    pub consensus_handler_num_low_scoring_authorities: IntGauge,
300    pub consensus_handler_scores: IntGaugeVec,
301    pub consensus_handler_deferred_transactions: IntCounter,
302    pub consensus_handler_congested_transactions: IntCounter,
303    pub consensus_handler_cancelled_transactions: IntCounter,
304    pub consensus_handler_max_object_costs: IntGaugeVec,
305    pub consensus_committed_subdags: IntCounterVec,
306    pub consensus_committed_messages: IntGaugeVec,
307    pub consensus_committed_user_transactions: IntGaugeVec,
308    pub consensus_handler_leader_round: IntGauge,
309    pub consensus_calculated_throughput: IntGauge,
310    pub consensus_calculated_throughput_profile: IntGauge,
311
312    pub limits_metrics: Arc<LimitsMetrics>,
313
314    /// bytecode verifier metrics for tracking timeouts
315    pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
316
317    /// Count of zklogin signatures
318    pub zklogin_sig_count: IntCounter,
319    /// Count of multisig signatures
320    pub multisig_sig_count: IntCounter,
321
322    // Tracks recent average txn queueing delay between when it is ready for execution
323    // until it starts executing.
324    pub execution_queueing_latency: LatencyObserver,
325
326    // Tracks the rate of transactions become ready for execution in transaction manager.
327    // The need for the Mutex is that the tracker is updated in transaction manager and read
328    // in the overload_monitor. There should be low mutex contention because
329    // transaction manager is single threaded and the read rate in overload_monitor is
330    // low. In the case where transaction manager becomes multi-threaded, we can
331    // create one rate tracker per thread.
332    pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
333
334    // Tracks the rate of transactions starts execution in execution driver.
335    // Similar reason for using a Mutex here as to `txn_ready_rate_tracker`.
336    pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
337}
338
339// Override default Prom buckets for positive numbers in 0-10M range
340const POSITIVE_INT_BUCKETS: &[f64] = &[
341    1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
342    20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
343    7000000., 10000000.,
344];
345
346const LATENCY_SEC_BUCKETS: &[f64] = &[
347    0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
348    10., 20., 30., 60., 90.,
349];
350
351// Buckets for low latency samples. Starts from 10us.
352const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
353    0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
354    0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
355];
356
357const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
358    10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
359    3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
360];
361
362/// Gas coin value used in dev-inspect and dry-runs if no gas coin was provided.
363pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; // 1B IOTA
364
365impl AuthorityMetrics {
366    pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
367        let execute_certificate_latency = register_histogram_vec_with_registry!(
368            "authority_state_execute_certificate_latency",
369            "Latency of executing certificates, including waiting for inputs",
370            &["tx_type"],
371            LATENCY_SEC_BUCKETS.to_vec(),
372            registry,
373        )
374        .unwrap();
375
376        let execute_certificate_latency_single_writer =
377            execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
378        let execute_certificate_latency_shared_object =
379            execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
380
381        Self {
382            tx_orders: register_int_counter_with_registry!(
383                "total_transaction_orders",
384                "Total number of transaction orders",
385                registry,
386            )
387            .unwrap(),
388            total_certs: register_int_counter_with_registry!(
389                "total_transaction_certificates",
390                "Total number of transaction certificates handled",
391                registry,
392            )
393            .unwrap(),
394            total_cert_attempts: register_int_counter_with_registry!(
395                "total_handle_certificate_attempts",
396                "Number of calls to handle_certificate",
397                registry,
398            )
399            .unwrap(),
400            // total_effects == total transactions finished
401            total_effects: register_int_counter_with_registry!(
402                "total_transaction_effects",
403                "Total number of transaction effects produced",
404                registry,
405            )
406            .unwrap(),
407
408            shared_obj_tx: register_int_counter_with_registry!(
409                "num_shared_obj_tx",
410                "Number of transactions involving shared objects",
411                registry,
412            )
413            .unwrap(),
414
415            sponsored_tx: register_int_counter_with_registry!(
416                "num_sponsored_tx",
417                "Number of sponsored transactions",
418                registry,
419            )
420            .unwrap(),
421
422            tx_already_processed: register_int_counter_with_registry!(
423                "num_tx_already_processed",
424                "Number of transaction orders already processed previously",
425                registry,
426            )
427            .unwrap(),
428            num_input_objs: register_histogram_with_registry!(
429                "num_input_objects",
430                "Distribution of number of input TX objects per TX",
431                POSITIVE_INT_BUCKETS.to_vec(),
432                registry,
433            )
434            .unwrap(),
435            num_shared_objects: register_histogram_with_registry!(
436                "num_shared_objects",
437                "Number of shared input objects per TX",
438                POSITIVE_INT_BUCKETS.to_vec(),
439                registry,
440            )
441            .unwrap(),
442            batch_size: register_histogram_with_registry!(
443                "batch_size",
444                "Distribution of size of transaction batch",
445                POSITIVE_INT_BUCKETS.to_vec(),
446                registry,
447            )
448            .unwrap(),
449            authority_state_handle_transaction_latency: register_histogram_with_registry!(
450                "authority_state_handle_transaction_latency",
451                "Latency of handling transactions",
452                LATENCY_SEC_BUCKETS.to_vec(),
453                registry,
454            )
455            .unwrap(),
456            execute_certificate_latency_single_writer,
457            execute_certificate_latency_shared_object,
458            internal_execution_latency: register_histogram_with_registry!(
459                "authority_state_internal_execution_latency",
460                "Latency of actual certificate executions",
461                LATENCY_SEC_BUCKETS.to_vec(),
462                registry,
463            )
464            .unwrap(),
465            execution_load_input_objects_latency: register_histogram_with_registry!(
466                "authority_state_execution_load_input_objects_latency",
467                "Latency of loading input objects for execution",
468                LOW_LATENCY_SEC_BUCKETS.to_vec(),
469                registry,
470            )
471            .unwrap(),
472            prepare_certificate_latency: register_histogram_with_registry!(
473                "authority_state_prepare_certificate_latency",
474                "Latency of executing certificates, before committing the results",
475                LATENCY_SEC_BUCKETS.to_vec(),
476                registry,
477            )
478            .unwrap(),
479            commit_certificate_latency: register_histogram_with_registry!(
480                "authority_state_commit_certificate_latency",
481                "Latency of committing certificate execution results",
482                LATENCY_SEC_BUCKETS.to_vec(),
483                registry,
484            )
485            .unwrap(),
486            db_checkpoint_latency: register_histogram_with_registry!(
487                "db_checkpoint_latency",
488                "Latency of checkpointing dbs",
489                LATENCY_SEC_BUCKETS.to_vec(),
490                registry,
491            ).unwrap(),
492            transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
493                "transaction_manager_num_enqueued_certificates",
494                "Current number of certificates enqueued to TransactionManager",
495                &["result"],
496                registry,
497            )
498            .unwrap(),
499            transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
500                "transaction_manager_num_missing_objects",
501                "Current number of missing objects in TransactionManager",
502                registry,
503            )
504            .unwrap(),
505            transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
506                "transaction_manager_num_pending_certificates",
507                "Number of certificates pending in TransactionManager, with at least 1 missing input object",
508                registry,
509            )
510            .unwrap(),
511            transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
512                "transaction_manager_num_executing_certificates",
513                "Number of executing certificates, including queued and actually running certificates",
514                registry,
515            )
516            .unwrap(),
517            transaction_manager_num_ready: register_int_gauge_with_registry!(
518                "transaction_manager_num_ready",
519                "Number of ready transactions in TransactionManager",
520                registry,
521            )
522            .unwrap(),
523            transaction_manager_object_cache_size: register_int_gauge_with_registry!(
524                "transaction_manager_object_cache_size",
525                "Current size of object-availability cache in TransactionManager",
526                registry,
527            )
528            .unwrap(),
529            transaction_manager_object_cache_hits: register_int_counter_with_registry!(
530                "transaction_manager_object_cache_hits",
531                "Number of object-availability cache hits in TransactionManager",
532                registry,
533            )
534            .unwrap(),
535            authority_overload_status: register_int_gauge_with_registry!(
536                "authority_overload_status",
537                "Whether authority is current experiencing overload and enters load shedding mode.",
538                registry)
539            .unwrap(),
540            authority_load_shedding_percentage: register_int_gauge_with_registry!(
541                "authority_load_shedding_percentage",
542                "The percentage of transactions is shed when the authority is in load shedding mode.",
543                registry)
544            .unwrap(),
545            transaction_manager_object_cache_misses: register_int_counter_with_registry!(
546                "transaction_manager_object_cache_misses",
547                "Number of object-availability cache misses in TransactionManager",
548                registry,
549            )
550            .unwrap(),
551            transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
552                "transaction_manager_object_cache_evictions",
553                "Number of object-availability cache evictions in TransactionManager",
554                registry,
555            )
556            .unwrap(),
557            transaction_manager_package_cache_size: register_int_gauge_with_registry!(
558                "transaction_manager_package_cache_size",
559                "Current size of package-availability cache in TransactionManager",
560                registry,
561            )
562            .unwrap(),
563            transaction_manager_package_cache_hits: register_int_counter_with_registry!(
564                "transaction_manager_package_cache_hits",
565                "Number of package-availability cache hits in TransactionManager",
566                registry,
567            )
568            .unwrap(),
569            transaction_manager_package_cache_misses: register_int_counter_with_registry!(
570                "transaction_manager_package_cache_misses",
571                "Number of package-availability cache misses in TransactionManager",
572                registry,
573            )
574            .unwrap(),
575            transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
576                "transaction_manager_package_cache_evictions",
577                "Number of package-availability cache evictions in TransactionManager",
578                registry,
579            )
580            .unwrap(),
581            transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
582                "transaction_manager_transaction_queue_age_s",
583                "Time spent in waiting for transaction in the queue",
584                LATENCY_SEC_BUCKETS.to_vec(),
585                registry,
586            )
587            .unwrap(),
588            transaction_overload_sources: register_int_counter_vec_with_registry!(
589                "transaction_overload_sources",
590                "Number of times each source indicates transaction overload.",
591                &["source"],
592                registry)
593            .unwrap(),
594            execution_driver_executed_transactions: register_int_counter_with_registry!(
595                "execution_driver_executed_transactions",
596                "Cumulative number of transaction executed by execution driver",
597                registry,
598            )
599            .unwrap(),
600            execution_driver_dispatch_queue: register_int_gauge_with_registry!(
601                "execution_driver_dispatch_queue",
602                "Number of transaction pending in execution driver dispatch queue",
603                registry,
604            )
605            .unwrap(),
606            execution_queueing_delay_s: register_histogram_with_registry!(
607                "execution_queueing_delay_s",
608                "Queueing delay between a transaction is ready for execution until it starts executing.",
609                LATENCY_SEC_BUCKETS.to_vec(),
610                registry
611            )
612            .unwrap(),
613            prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
614                "prepare_cert_gas_latency_ratio",
615                "The ratio of computation gas divided by VM execution latency.",
616                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
617                registry
618            )
619            .unwrap(),
620            execution_gas_latency_ratio: register_histogram_with_registry!(
621                "execution_gas_latency_ratio",
622                "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
623                GAS_LATENCY_RATIO_BUCKETS.to_vec(),
624                registry
625            )
626            .unwrap(),
627            skipped_consensus_txns: register_int_counter_with_registry!(
628                "skipped_consensus_txns",
629                "Total number of consensus transactions skipped",
630                registry,
631            )
632            .unwrap(),
633            skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
634                "skipped_consensus_txns_cache_hit",
635                "Total number of consensus transactions skipped because of local cache hit",
636                registry,
637            )
638            .unwrap(),
639            post_processing_total_events_emitted: register_int_counter_with_registry!(
640                "post_processing_total_events_emitted",
641                "Total number of events emitted in post processing",
642                registry,
643            )
644            .unwrap(),
645            post_processing_total_tx_indexed: register_int_counter_with_registry!(
646                "post_processing_total_tx_indexed",
647                "Total number of txes indexed in post processing",
648                registry,
649            )
650            .unwrap(),
651            post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
652                "post_processing_total_tx_had_event_processed",
653                "Total number of txes finished event processing in post processing",
654                registry,
655            )
656            .unwrap(),
657            post_processing_total_failures: register_int_counter_with_registry!(
658                "post_processing_total_failures",
659                "Total number of failure in post processing",
660                registry,
661            )
662            .unwrap(),
663            consensus_handler_processed: register_int_counter_vec_with_registry!(
664                "consensus_handler_processed",
665                "Number of transactions processed by consensus handler",
666                &["class"],
667                registry
668            ).unwrap(),
669            consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
670                "consensus_handler_transaction_sizes",
671                "Sizes of each type of transactions processed by consensus handler",
672                &["class"],
673                POSITIVE_INT_BUCKETS.to_vec(),
674                registry
675            ).unwrap(),
676            consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
677                "consensus_handler_num_low_scoring_authorities",
678                "Number of low scoring authorities based on reputation scores from consensus",
679                registry
680            ).unwrap(),
681            consensus_handler_scores: register_int_gauge_vec_with_registry!(
682                "consensus_handler_scores",
683                "scores from consensus for each authority",
684                &["authority"],
685                registry,
686            ).unwrap(),
687            consensus_handler_deferred_transactions: register_int_counter_with_registry!(
688                "consensus_handler_deferred_transactions",
689                "Number of transactions deferred by consensus handler",
690                registry,
691            ).unwrap(),
692            consensus_handler_congested_transactions: register_int_counter_with_registry!(
693                "consensus_handler_congested_transactions",
694                "Number of transactions deferred by consensus handler due to congestion",
695                registry,
696            ).unwrap(),
697            consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
698                "consensus_handler_cancelled_transactions",
699                "Number of transactions cancelled by consensus handler",
700                registry,
701            ).unwrap(),
702            consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
703                "consensus_handler_max_congestion_control_object_costs",
704                "Max object costs for congestion control in the current consensus commit",
705                &["commit_type"],
706                registry,
707            ).unwrap(),
708            consensus_committed_subdags: register_int_counter_vec_with_registry!(
709                "consensus_committed_subdags",
710                "Number of committed subdags, sliced by leader",
711                &["authority"],
712                registry,
713            ).unwrap(),
714            consensus_committed_messages: register_int_gauge_vec_with_registry!(
715                "consensus_committed_messages",
716                "Total number of committed consensus messages, sliced by author",
717                &["authority"],
718                registry,
719            ).unwrap(),
720            consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
721                "consensus_committed_user_transactions",
722                "Number of committed user transactions, sliced by submitter",
723                &["authority"],
724                registry,
725            ).unwrap(),
726            consensus_handler_leader_round: register_int_gauge_with_registry!(
727                "consensus_handler_leader_round",
728                "The leader round of the current consensus output being processed in the consensus handler",
729                registry,
730            ).unwrap(),
731            limits_metrics: Arc::new(LimitsMetrics::new(registry)),
732            bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
733            zklogin_sig_count: register_int_counter_with_registry!(
734                "zklogin_sig_count",
735                "Count of zkLogin signatures",
736                registry,
737            )
738            .unwrap(),
739            multisig_sig_count: register_int_counter_with_registry!(
740                "multisig_sig_count",
741                "Count of zkLogin signatures",
742                registry,
743            )
744            .unwrap(),
745            consensus_calculated_throughput: register_int_gauge_with_registry!(
746                "consensus_calculated_throughput",
747                "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
748                registry,
749            ).unwrap(),
750            consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
751                "consensus_calculated_throughput_profile",
752                "The current active calculated throughput profile",
753                registry
754            ).unwrap(),
755            execution_queueing_latency: LatencyObserver::new(),
756            txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757            execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
758        }
759    }
760
761    /// Reset metrics that contain `hostname` as one of the labels. This is
762    /// needed to avoid retaining metrics for long-gone committee members and
763    /// only exposing metrics for the committee in the current epoch.
764    pub fn reset_on_reconfigure(&self) {
765        self.consensus_committed_messages.reset();
766        self.consensus_handler_scores.reset();
767        self.consensus_committed_user_transactions.reset();
768    }
769}
770
771/// a Trait object for `Signer` that is:
772/// - Pin, i.e. confined to one place in memory (we don't want to copy private
773///   keys).
774/// - Sync, i.e. can be safely shared between threads.
775///
776/// Typically instantiated with Box::pin(keypair) where keypair is a `KeyPair`
777pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
778
779pub struct AuthorityState {
780    // Fixed size, static, identity of the authority
781    /// The name of this authority.
782    pub name: AuthorityName,
783    /// The signature key of the authority.
784    pub secret: StableSyncAuthoritySigner,
785
786    /// The database
787    input_loader: TransactionInputLoader,
788    execution_cache_trait_pointers: ExecutionCacheTraitPointers,
789
790    epoch_store: ArcSwap<AuthorityPerEpochStore>,
791
792    /// This lock denotes current 'execution epoch'.
793    /// Execution acquires read lock, checks certificate epoch and holds it
794    /// until all writes are complete. Reconfiguration acquires write lock,
795    /// changes the epoch and revert all transactions from previous epoch
796    /// that are executed but did not make into checkpoint.
797    execution_lock: RwLock<EpochId>,
798
799    pub indexes: Option<Arc<IndexStore>>,
800    pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
801
802    pub subscription_handler: Arc<SubscriptionHandler>,
803    pub checkpoint_store: Arc<CheckpointStore>,
804
805    committee_store: Arc<CommitteeStore>,
806
807    /// Manages pending certificates and their missing input objects.
808    transaction_manager: Arc<TransactionManager>,
809
810    /// Shuts down the execution task. Used only in testing.
811    #[cfg_attr(not(test), expect(unused))]
812    tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
813
814    pub metrics: Arc<AuthorityMetrics>,
815    _pruner: AuthorityStorePruner,
816    _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
817
818    /// Take db checkpoints of different dbs
819    db_checkpoint_config: DBCheckpointConfig,
820
821    pub config: NodeConfig,
822
823    /// Current overload status in this authority. Updated periodically.
824    pub overload_info: AuthorityOverloadInfo,
825
826    pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
827
828    /// The chain identifier is derived from the digest of the genesis
829    /// checkpoint.
830    chain_identifier: ChainIdentifier,
831
832    pub(crate) congestion_tracker: Arc<CongestionTracker>,
833}
834
835/// The authority state encapsulates all state, drives execution, and ensures
836/// safety.
837///
838/// Note the authority operations can be accessed through a read ref (&) and do
839/// not require &mut. Internally a database is synchronized through a mutex
840/// lock.
841///
842/// Repeating valid commands should produce no changes and return no error.
843impl AuthorityState {
844    pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
845        epoch_store.committee().authority_exists(&self.name)
846    }
847
848    pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
849        epoch_store
850            .active_validators()
851            .iter()
852            .any(|a| AuthorityName::from(a) == self.name)
853    }
854
855    pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
856        !self.is_committee_validator(epoch_store)
857    }
858
859    pub fn committee_store(&self) -> &Arc<CommitteeStore> {
860        &self.committee_store
861    }
862
863    pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
864        self.committee_store.clone()
865    }
866
867    pub fn overload_config(&self) -> &AuthorityOverloadConfig {
868        &self.config.authority_overload_config
869    }
870
871    pub fn get_epoch_state_commitments(
872        &self,
873        epoch: EpochId,
874    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
875        self.checkpoint_store.get_epoch_state_commitments(epoch)
876    }
877
878    /// This is a private method and should be kept that way. It doesn't check
879    /// whether the provided transaction is a system transaction, and hence
880    /// can only be called internally.
881    #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
882    async fn handle_transaction_impl(
883        &self,
884        transaction: VerifiedTransaction,
885        epoch_store: &Arc<AuthorityPerEpochStore>,
886    ) -> IotaResult<VerifiedSignedTransaction> {
887        // Ensure that validator cannot reconfigure while we are signing the tx
888        let _execution_lock = self.execution_lock_for_signing()?;
889
890        let protocol_config = epoch_store.protocol_config();
891        let reference_gas_price = epoch_store.reference_gas_price();
892
893        let epoch = epoch_store.epoch();
894
895        let tx_data = transaction.data().transaction_data();
896
897        // Note: the deny checks may do redundant package loads but:
898        // - they only load packages when there is an active package deny map
899        // - the loads are cached anyway
900        iota_transaction_checks::deny::check_transaction_for_signing(
901            tx_data,
902            transaction.tx_signatures(),
903            &transaction.input_objects()?,
904            &tx_data.receiving_objects(),
905            &self.config.transaction_deny_config,
906            self.get_backing_package_store().as_ref(),
907        )?;
908
909        // Load all transaction-related input objects.
910        // Authenticator input objects and the account objects are loaded in the same
911        // call if there are `MoveAuthenticator` signatures present in the transaction.
912        let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
913            self.read_objects_for_signing(&transaction, epoch)?;
914
915        // Get the `MoveAuthenticator`s, if any.
916        let move_authenticators = transaction.move_authenticators();
917
918        // Check the inputs for signing.
919        // If there are `MoveAuthenticator` signatures, their input objects and the
920        // account objects are also checked and must be provided.
921        // It is also checked if there is enough gas to execute the transaction and its
922        // authenticators.
923        let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
924            .check_transaction_inputs_for_signing(
925                protocol_config,
926                reference_gas_price,
927                tx_data,
928                tx_input_objects,
929                &tx_receiving_objects,
930                &move_authenticators,
931                per_authenticator_inputs,
932            )?;
933
934        // Get the input objects for the authenticators, if there are
935        // `MoveAuthenticator`s.
936        let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
937            .iter()
938            .map(|i| &i.0)
939            .collect();
940
941        // Check if any of the sender, the transaction input objects, the receiving
942        // objects and the authenticator input objects are in the coin deny
943        // list, which would prevent the transaction from being signed.
944        check_coin_deny_list_v1_during_signing(
945            tx_data.sender(),
946            &tx_checked_input_objects,
947            &tx_receiving_objects,
948            &per_authenticator_checked_input_objects,
949            &self.get_object_store(),
950        )?;
951
952        // If there are `MoveAuthenticator` signatures, execute them and check if they
953        // all succeed.
954        if !move_authenticators.is_empty() {
955            let aggregated_authenticator_input_objects =
956                iota_transaction_checks::aggregate_authenticator_input_objects(
957                    &per_authenticator_checked_input_objects,
958                )?;
959
960            debug_assert_eq!(
961                move_authenticators.len(),
962                per_authenticator_checked_inputs.len(),
963                "Move authenticators amount must match the number of checked authenticator inputs"
964            );
965
966            let move_authenticators = move_authenticators
967                .into_iter()
968                .zip(per_authenticator_checked_inputs)
969                .map(
970                    |(
971                        move_authenticator,
972                        (authenticator_checked_input_objects, authenticator_function_ref),
973                    )| {
974                        (
975                            move_authenticator.to_owned(),
976                            authenticator_function_ref,
977                            authenticator_checked_input_objects,
978                        )
979                    },
980                )
981                .collect();
982
983            // It is supposed that `MoveAuthenticator` availability is checked in
984            // `SenderSignedData::validity_check`.
985
986            // Serialize the TransactionData for the auth context before decomposing.
987            let tx_data_bytes =
988                bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
989
990            let (kind, signer, gas_data) = tx_data.execution_parts();
991
992            // Execute the Move authenticators.
993            let validation_result = epoch_store.executor().authenticate_transaction(
994                self.get_backing_store().as_ref(),
995                protocol_config,
996                self.metrics.limits_metrics.clone(),
997                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
998                epoch_store
999                    .epoch_start_config()
1000                    .epoch_data()
1001                    .epoch_start_timestamp(),
1002                gas_data,
1003                gas_status,
1004                move_authenticators,
1005                aggregated_authenticator_input_objects,
1006                kind,
1007                signer,
1008                transaction.digest().to_owned(),
1009                tx_data_bytes,
1010                &mut None,
1011            );
1012
1013            if let Err(validation_error) = validation_result {
1014                return Err(IotaError::MoveAuthenticatorExecutionFailure {
1015                    error: validation_error.to_string(),
1016                });
1017            }
1018        }
1019
1020        let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
1021
1022        let signed_transaction =
1023            VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1024
1025        // Check and write locks, to signed transaction, into the database
1026        // The call to self.set_transaction_lock checks the lock is not conflicting,
1027        // and returns ConflictingTransaction error in case there is a lock on a
1028        // different existing transaction.
1029        self.get_cache_writer().try_acquire_transaction_locks(
1030            epoch_store,
1031            &owned_objects,
1032            signed_transaction.clone(),
1033        )?;
1034
1035        Ok(signed_transaction)
1036    }
1037
1038    /// Initiate a new transaction.
1039    #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()))]
1040    pub async fn handle_transaction(
1041        &self,
1042        epoch_store: &Arc<AuthorityPerEpochStore>,
1043        transaction: VerifiedTransaction,
1044    ) -> IotaResult<HandleTransactionResponse> {
1045        let tx_digest = *transaction.digest();
1046        debug!("handle_transaction");
1047
1048        // Ensure an idempotent answer.
1049        if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1050            return Ok(HandleTransactionResponse { status });
1051        }
1052
1053        let _metrics_guard = self
1054            .metrics
1055            .authority_state_handle_transaction_latency
1056            .start_timer();
1057        self.metrics.tx_orders.inc();
1058
1059        let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1060        match signed {
1061            Ok(s) => {
1062                if self.is_committee_validator(epoch_store) {
1063                    if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1064                        let tx = s.clone();
1065                        let validator_tx_finalizer = validator_tx_finalizer.clone();
1066                        let cache_reader = self.get_transaction_cache_reader().clone();
1067                        let epoch_store = epoch_store.clone();
1068                        spawn_monitored_task!(epoch_store.within_alive_epoch(
1069                            validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1070                        ));
1071                    }
1072                }
1073                Ok(HandleTransactionResponse {
1074                    status: TransactionStatus::Signed(s.into_inner().into_sig()),
1075                })
1076            }
1077            // It happens frequently that while we are checking the validity of the transaction, it
1078            // has just been executed.
1079            // In that case, we could still return Ok to avoid showing confusing errors.
1080            Err(err) => Ok(HandleTransactionResponse {
1081                status: self
1082                    .get_transaction_status(&tx_digest, epoch_store)?
1083                    .ok_or(err)?
1084                    .1,
1085            }),
1086        }
1087    }
1088
1089    pub fn check_system_overload_at_signing(&self) -> bool {
1090        self.config
1091            .authority_overload_config
1092            .check_system_overload_at_signing
1093    }
1094
1095    pub fn check_system_overload_at_execution(&self) -> bool {
1096        self.config
1097            .authority_overload_config
1098            .check_system_overload_at_execution
1099    }
1100
1101    pub(crate) fn check_system_overload(
1102        &self,
1103        consensus_adapter: &Arc<ConsensusAdapter>,
1104        tx_data: &SenderSignedData,
1105        do_authority_overload_check: bool,
1106    ) -> IotaResult {
1107        if do_authority_overload_check {
1108            self.check_authority_overload(tx_data).tap_err(|_| {
1109                self.update_overload_metrics("execution_queue");
1110            })?;
1111        }
1112        self.transaction_manager
1113            .check_execution_overload(self.overload_config(), tx_data)
1114            .tap_err(|_| {
1115                self.update_overload_metrics("execution_pending");
1116            })?;
1117        consensus_adapter.check_consensus_overload().tap_err(|_| {
1118            self.update_overload_metrics("consensus");
1119        })?;
1120
1121        let pending_tx_count = self
1122            .get_cache_commit()
1123            .approximate_pending_transaction_count();
1124        if pending_tx_count
1125            > self
1126                .config
1127                .execution_cache_config
1128                .writeback_cache
1129                .backpressure_threshold_for_rpc()
1130        {
1131            return Err(IotaError::ValidatorOverloadedRetryAfter {
1132                retry_after_secs: 10,
1133            });
1134        }
1135
1136        Ok(())
1137    }
1138
1139    fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1140        if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1141            return Ok(());
1142        }
1143
1144        let load_shedding_percentage = self
1145            .overload_info
1146            .load_shedding_percentage
1147            .load(Ordering::Relaxed);
1148        overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1149    }
1150
1151    fn update_overload_metrics(&self, source: &str) {
1152        self.metrics
1153            .transaction_overload_sources
1154            .with_label_values(&[source])
1155            .inc();
1156    }
1157
1158    /// Executes a certificate for its effects.
1159    #[instrument(level = "trace", skip_all)]
1160    pub async fn execute_certificate(
1161        &self,
1162        certificate: &VerifiedCertificate,
1163        epoch_store: &Arc<AuthorityPerEpochStore>,
1164    ) -> IotaResult<TransactionEffects> {
1165        let _metrics_guard = if certificate.contains_shared_object() {
1166            self.metrics
1167                .execute_certificate_latency_shared_object
1168                .start_timer()
1169        } else {
1170            self.metrics
1171                .execute_certificate_latency_single_writer
1172                .start_timer()
1173        };
1174        trace!("execute_certificate");
1175
1176        self.metrics.total_cert_attempts.inc();
1177
1178        if !certificate.contains_shared_object() {
1179            // Shared object transactions need to be sequenced by the consensus before
1180            // enqueueing for execution, done in
1181            // AuthorityPerEpochStore::handle_consensus_transaction(). For owned
1182            // object transactions, they can be enqueued for execution immediately.
1183            self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1184        }
1185
1186        // tx could be reverted when epoch ends, so we must be careful not to return a
1187        // result here after the epoch ends.
1188        epoch_store
1189            .within_alive_epoch(self.notify_read_effects(certificate))
1190            .await
1191            .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1192            .and_then(|r| r)
1193    }
1194
1195    /// Internal logic to execute a certificate.
1196    ///
1197    /// Guarantees that
1198    /// - If input objects are available, return no permanent failure.
1199    /// - Execution and output commit are atomic. i.e. outputs are only written
1200    ///   to storage,
1201    /// on successful execution; crashed execution has no observable effect and
1202    /// can be retried.
1203    ///
1204    /// It is caller's responsibility to ensure input objects are available and
1205    /// locks are set. If this cannot be satisfied by the caller,
1206    /// execute_certificate() should be called instead.
1207    ///
1208    /// Should only be called within iota-core.
1209    #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1210    pub fn try_execute_immediately(
1211        &self,
1212        certificate: &VerifiedExecutableTransaction,
1213        expected_effects_digest: Option<TransactionEffectsDigest>,
1214        epoch_store: &Arc<AuthorityPerEpochStore>,
1215    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1216        let _scope = monitored_scope("Execution::try_execute_immediately");
1217        let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1218
1219        let tx_digest = certificate.digest();
1220
1221        // Acquire a lock to prevent concurrent executions of the same transaction.
1222        let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1223
1224        // The cert could have been processed by a concurrent attempt of the same cert,
1225        // so check if the effects have already been written.
1226        if let Some(effects) = self
1227            .get_transaction_cache_reader()
1228            .try_get_executed_effects(tx_digest)?
1229        {
1230            if let Some(expected_effects_digest_inner) = expected_effects_digest {
1231                assert_eq!(
1232                    effects.digest(),
1233                    expected_effects_digest_inner,
1234                    "Unexpected effects digest for transaction {tx_digest:?}"
1235                );
1236            }
1237            tx_guard.release();
1238            return Ok((effects, None));
1239        }
1240
1241        let (tx_input_objects, per_authenticator_inputs) =
1242            self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1243
1244        // If no expected_effects_digest was provided, try to get it from storage.
1245        // We could be re-executing a previously executed but uncommitted transaction,
1246        // perhaps after restarting with a new binary. In this situation, if
1247        // we have published an effects signature, we must be sure not to
1248        // equivocate. TODO: read from cache instead of DB
1249        let expected_effects_digest =
1250            expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1251
1252        self.process_certificate(
1253            tx_guard,
1254            certificate,
1255            tx_input_objects,
1256            per_authenticator_inputs,
1257            expected_effects_digest,
1258            epoch_store,
1259        )
1260        .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1261        .tap_ok(
1262            |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1263        )
1264    }
1265
1266    pub fn read_objects_for_execution(
1267        &self,
1268        tx_lock: &CertLockGuard,
1269        certificate: &VerifiedExecutableTransaction,
1270        epoch_store: &Arc<AuthorityPerEpochStore>,
1271    ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1272        let _scope = monitored_scope("Execution::load_input_objects");
1273        let _metrics_guard = self
1274            .metrics
1275            .execution_load_input_objects_latency
1276            .start_timer();
1277
1278        let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1279
1280        let input_objects = self.input_loader.read_objects_for_execution(
1281            epoch_store,
1282            &certificate.key(),
1283            tx_lock,
1284            &input_objects,
1285            epoch_store.epoch(),
1286        )?;
1287
1288        certificate.split_input_objects_into_groups_for_reading(input_objects)
1289    }
1290
1291    /// Test only wrapper for `try_execute_immediately()` above, useful for
1292    /// checking errors if the pre-conditions are not satisfied, and
1293    /// executing change epoch transactions.
1294    pub fn try_execute_for_test(
1295        &self,
1296        certificate: &VerifiedCertificate,
1297    ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1298        let epoch_store = self.epoch_store_for_testing();
1299        let (effects, execution_error_opt) = self.try_execute_immediately(
1300            &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1301            None,
1302            &epoch_store,
1303        )?;
1304        let signed_effects = self.sign_effects(effects, &epoch_store)?;
1305        Ok((signed_effects, execution_error_opt))
1306    }
1307
1308    /// Non-fallible version of `try_execute_for_test()`.
1309    pub fn execute_for_test(
1310        &self,
1311        certificate: &VerifiedCertificate,
1312    ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1313        self.try_execute_for_test(certificate)
1314            .expect("try_execute_for_test should not fail")
1315    }
1316
1317    pub async fn notify_read_effects(
1318        &self,
1319        certificate: &VerifiedCertificate,
1320    ) -> IotaResult<TransactionEffects> {
1321        self.get_transaction_cache_reader()
1322            .try_notify_read_executed_effects(&[*certificate.digest()])
1323            .await
1324            .map(|mut r| r.pop().expect("must return correct number of effects"))
1325    }
1326
1327    fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1328        self.get_object_cache_reader()
1329            .try_check_owned_objects_are_live(owned_object_refs)
1330    }
1331
1332    /// This function captures the required state to debug a forked transaction.
1333    /// The dump is written to a file in dir `path`, with name prefixed by the
1334    /// transaction digest. NOTE: Since this info escapes the validator
1335    /// context, make sure not to leak any private info here
1336    pub(crate) fn debug_dump_transaction_state(
1337        &self,
1338        tx_digest: &TransactionDigest,
1339        effects: &TransactionEffects,
1340        expected_effects_digest: TransactionEffectsDigest,
1341        inner_temporary_store: &InnerTemporaryStore,
1342        certificate: &VerifiedExecutableTransaction,
1343        debug_dump_config: &StateDebugDumpConfig,
1344    ) -> IotaResult<PathBuf> {
1345        // Fall back to the OS temp directory if no dump directory is configured.
1346        // This is safe: dump files are named by transaction digest, so no collisions.
1347        let dump_dir = debug_dump_config
1348            .dump_file_directory
1349            .as_ref()
1350            .cloned()
1351            .unwrap_or(std::env::temp_dir());
1352        let epoch_store = self.load_epoch_store_one_call_per_task();
1353
1354        NodeStateDump::new(
1355            tx_digest,
1356            effects,
1357            expected_effects_digest,
1358            self.get_object_store().as_ref(),
1359            &epoch_store,
1360            inner_temporary_store,
1361            certificate,
1362        )?
1363        .write_to_file(&dump_dir)
1364        .map_err(|e| IotaError::FileIO(e.to_string()))
1365    }
1366
1367    #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?certificate.digest(), sender = ?certificate.data().transaction_data().gas_owner().to_string()))]
1368    pub(crate) fn process_certificate(
1369        &self,
1370        tx_guard: CertTxGuard,
1371        certificate: &VerifiedExecutableTransaction,
1372        tx_input_objects: InputObjects,
1373        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1374        expected_effects_digest: Option<TransactionEffectsDigest>,
1375        epoch_store: &Arc<AuthorityPerEpochStore>,
1376    ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1377        let process_certificate_start_time = tokio::time::Instant::now();
1378        let digest = *certificate.digest();
1379
1380        let _scope = monitored_scope("Execution::process_certificate");
1381
1382        fail_point_if!("correlated-crash-process-certificate", || {
1383            if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1384                iota_simulator::task::kill_current_node(None);
1385            }
1386        });
1387
1388        let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1389        // Any caller that verifies the signatures on the certificate will have already
1390        // checked the epoch. But paths that don't verify sigs (e.g. execution
1391        // from checkpoint, reading from db) present the possibility of an epoch
1392        // mismatch. If this cert is not finalzied in previous epoch, then it's
1393        // invalid.
1394        let execution_guard = match execution_guard {
1395            Ok(execution_guard) => execution_guard,
1396            Err(err) => {
1397                tx_guard.release();
1398                return Err(err);
1399            }
1400        };
1401        // Since we obtain a reference to the epoch store before taking the execution
1402        // lock, it's possible that reconfiguration has happened and they no
1403        // longer match.
1404        if *execution_guard != epoch_store.epoch() {
1405            tx_guard.release();
1406            info!("The epoch of the execution_guard doesn't match the epoch store");
1407            return Err(IotaError::WrongEpoch {
1408                expected_epoch: epoch_store.epoch(),
1409                actual_epoch: *execution_guard,
1410            });
1411        }
1412
1413        // Errors originating from prepare_certificate may be transient (failure to read
1414        // locks) or non-transient (transaction input is invalid, move vm
1415        // errors). However, all errors from this function occur before we have
1416        // written anything to the db, so we commit the tx guard and rely on the
1417        // client to retry the tx (if it was transient).
1418        let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1419            &execution_guard,
1420            certificate,
1421            tx_input_objects,
1422            per_authenticator_inputs,
1423            epoch_store,
1424        ) {
1425            Err(e) => {
1426                info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1427                tx_guard.release();
1428                return Err(e);
1429            }
1430            Ok(res) => res,
1431        };
1432
1433        if let Some(expected_effects_digest) = expected_effects_digest {
1434            if effects.digest() != expected_effects_digest {
1435                // We dont want to mask the original error, so we log it and continue.
1436                match self.debug_dump_transaction_state(
1437                    &digest,
1438                    &effects,
1439                    expected_effects_digest,
1440                    &inner_temporary_store,
1441                    certificate,
1442                    &self.config.state_debug_dump_config,
1443                ) {
1444                    Ok(out_path) => {
1445                        info!(
1446                            "Dumped node state for transaction {} to {}",
1447                            digest,
1448                            out_path.as_path().display().to_string()
1449                        );
1450                    }
1451                    Err(e) => {
1452                        error!("Error dumping state for transaction {}: {e}", digest);
1453                    }
1454                }
1455                error!(
1456                    tx_digest = ?digest,
1457                    ?expected_effects_digest,
1458                    actual_effects = ?effects,
1459                    "fork detected!"
1460                );
1461                panic!(
1462                    "Transaction {} is expected to have effects digest {}, but got {}!",
1463                    digest,
1464                    expected_effects_digest,
1465                    effects.digest(),
1466                );
1467            }
1468        }
1469
1470        fail_point!("crash");
1471
1472        self.commit_certificate(
1473            certificate,
1474            inner_temporary_store,
1475            &effects,
1476            tx_guard,
1477            execution_guard,
1478            epoch_store,
1479        )?;
1480
1481        if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1482            certificate.data().transaction_data().kind()
1483        {
1484            if let Some(err) = &execution_error_opt {
1485                debug_fatal!("Authenticator state update failed: {:?}", err);
1486            }
1487            epoch_store.update_authenticator_state(auth_state);
1488
1489            // double check that the signature verifier always matches the authenticator
1490            // state
1491            if cfg!(debug_assertions) {
1492                let authenticator_state = get_authenticator_state(self.get_object_store())
1493                    .expect("Read cannot fail")
1494                    .expect("Authenticator state must exist");
1495
1496                let mut sys_jwks: Vec<_> = authenticator_state
1497                    .active_jwks
1498                    .into_iter()
1499                    .map(|jwk| (jwk.jwk_id, jwk.jwk))
1500                    .collect();
1501                let mut active_jwks: Vec<_> = epoch_store
1502                    .signature_verifier
1503                    .get_jwks()
1504                    .into_iter()
1505                    .collect();
1506                sys_jwks.sort();
1507                active_jwks.sort();
1508
1509                assert_eq!(sys_jwks, active_jwks);
1510            }
1511        }
1512
1513        let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1514        if elapsed > 0.0 {
1515            self.metrics
1516                .execution_gas_latency_ratio
1517                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1518        };
1519        Ok((effects, execution_error_opt))
1520    }
1521
1522    #[instrument(level = "trace", skip_all)]
1523    fn commit_certificate(
1524        &self,
1525        certificate: &VerifiedExecutableTransaction,
1526        inner_temporary_store: InnerTemporaryStore,
1527        effects: &TransactionEffects,
1528        tx_guard: CertTxGuard,
1529        _execution_guard: ExecutionLockReadGuard<'_>,
1530        epoch_store: &Arc<AuthorityPerEpochStore>,
1531    ) -> IotaResult {
1532        let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1533            monitored_scope("Execution::commit_certificate");
1534        let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1535
1536        let tx_key = certificate.key();
1537        let tx_digest = certificate.digest();
1538        let input_object_count = inner_temporary_store.input_objects.len();
1539        let shared_object_count = effects.input_shared_objects().len();
1540
1541        let output_keys = inner_temporary_store.get_output_keys(effects);
1542
1543        // index certificate
1544        let _ = self
1545            .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1546            .tap_err(|e| {
1547                self.metrics.post_processing_total_failures.inc();
1548                error!(?tx_digest, "tx post processing failed: {e}");
1549            });
1550
1551        // The insertion to epoch_store is not atomic with the insertion to the
1552        // perpetual store. This is OK because we insert to the epoch store
1553        // first. And during lookups we always look up in the perpetual store first.
1554        epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1555
1556        // Allow testing what happens if we crash here.
1557        fail_point!("crash");
1558
1559        let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1560            certificate.clone().into_unsigned(),
1561            effects.clone(),
1562            inner_temporary_store,
1563        );
1564        self.get_cache_writer()
1565            .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1566
1567        if certificate.transaction_data().is_end_of_epoch_tx() {
1568            // At the end of epoch, since system packages may have been upgraded, force
1569            // reload them in the cache.
1570            self.get_object_cache_reader()
1571                .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1572        }
1573
1574        // commit_certificate finished, the tx is fully committed to the store.
1575        tx_guard.commit_tx();
1576
1577        // Notifies transaction manager about transaction and output objects committed.
1578        // This provides necessary information to transaction manager to start executing
1579        // additional ready transactions.
1580        self.transaction_manager
1581            .notify_commit(tx_digest, output_keys, epoch_store);
1582
1583        self.update_metrics(certificate, input_object_count, shared_object_count);
1584
1585        Ok(())
1586    }
1587
1588    fn update_metrics(
1589        &self,
1590        certificate: &VerifiedExecutableTransaction,
1591        input_object_count: usize,
1592        shared_object_count: usize,
1593    ) {
1594        // count signature by scheme, for zklogin and multisig
1595        if certificate.has_zklogin_sig() {
1596            self.metrics.zklogin_sig_count.inc();
1597        } else if certificate.has_upgraded_multisig() {
1598            self.metrics.multisig_sig_count.inc();
1599        }
1600
1601        self.metrics.total_effects.inc();
1602        self.metrics.total_certs.inc();
1603
1604        if shared_object_count > 0 {
1605            self.metrics.shared_obj_tx.inc();
1606        }
1607
1608        if certificate.is_sponsored_tx() {
1609            self.metrics.sponsored_tx.inc();
1610        }
1611
1612        self.metrics
1613            .num_input_objs
1614            .observe(input_object_count as f64);
1615        self.metrics
1616            .num_shared_objects
1617            .observe(shared_object_count as f64);
1618        self.metrics.batch_size.observe(
1619            certificate
1620                .data()
1621                .intent_message()
1622                .value
1623                .kind()
1624                .num_commands() as f64,
1625        );
1626    }
1627
1628    /// prepare_certificate validates the transaction input, and executes the
1629    /// certificate, returning effects, output objects, events, etc.
1630    ///
1631    /// It reads state from the db (both owned and shared locks), but it has no
1632    /// side effects.
1633    ///
1634    /// It can be generally understood that a failure of prepare_certificate
1635    /// indicates a non-transient error, e.g. the transaction input is
1636    /// somehow invalid, the correct locks are not held, etc. However, this
1637    /// is not entirely true, as a transient db read error may also cause
1638    /// this function to fail.
1639    #[instrument(level = "trace", skip_all)]
1640    fn prepare_certificate(
1641        &self,
1642        _execution_guard: &ExecutionLockReadGuard<'_>,
1643        certificate: &VerifiedExecutableTransaction,
1644        tx_input_objects: InputObjects,
1645        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1646        epoch_store: &Arc<AuthorityPerEpochStore>,
1647    ) -> IotaResult<(
1648        InnerTemporaryStore,
1649        TransactionEffects,
1650        Option<ExecutionError>,
1651    )> {
1652        let _scope = monitored_scope("Execution::prepare_certificate");
1653        let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1654        let prepare_certificate_start_time = tokio::time::Instant::now();
1655
1656        let protocol_config = epoch_store.protocol_config();
1657
1658        let reference_gas_price = epoch_store.reference_gas_price();
1659
1660        let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1661        let epoch_start_timestamp = epoch_store
1662            .epoch_start_config()
1663            .epoch_data()
1664            .epoch_start_timestamp();
1665
1666        let backing_store = self.get_backing_store().as_ref();
1667
1668        let tx_digest = *certificate.digest();
1669
1670        // TODO: We need to move this to a more appropriate place to avoid redundant
1671        // checks.
1672        let tx_data = certificate.data().transaction_data();
1673        tx_data.validity_check(protocol_config)?;
1674
1675        let (kind, signer, gas_data) = tx_data.execution_parts();
1676
1677        let move_authenticators = certificate.move_authenticators();
1678
1679        #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1680        let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1681            .is_empty()
1682        {
1683            // No Move authentication required, proceed to execute the transaction directly.
1684
1685            // The cost of partially re-auditing a transaction before execution is
1686            // tolerated.
1687            let (tx_gas_status, tx_checked_input_objects) =
1688                iota_transaction_checks::check_certificate_input(
1689                    certificate,
1690                    tx_input_objects,
1691                    protocol_config,
1692                    reference_gas_price,
1693                )?;
1694
1695            let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1696            self.check_owned_locks(&owned_object_refs)?;
1697            epoch_store.executor().execute_transaction_to_effects(
1698                backing_store,
1699                protocol_config,
1700                self.metrics.limits_metrics.clone(),
1701                // TODO: would be nice to pass the whole NodeConfig here, but it creates a
1702                // cyclic dependency w/ iota-adapter
1703                self.config
1704                    .expensive_safety_check_config
1705                    .enable_deep_per_tx_iota_conservation_check(),
1706                self.config.certificate_deny_config.certificate_deny_set(),
1707                &epoch_id,
1708                epoch_start_timestamp,
1709                tx_checked_input_objects,
1710                gas_data,
1711                tx_gas_status,
1712                kind,
1713                signer,
1714                tx_digest,
1715                &mut None,
1716            )
1717        } else {
1718            // One or more `MoveAuthenticator` signatures present — authenticate each and
1719            // then execute the transaction.
1720            // It is supposed that `MoveAuthenticator` availability is checked in
1721            // `SenderSignedData::validity_check`.
1722
1723            debug_assert_eq!(
1724                move_authenticators.len(),
1725                per_authenticator_inputs.len(),
1726                "Move authenticators amount must match the number of authenticator inputs"
1727            );
1728
1729            let per_authenticator_inputs = move_authenticators
1730                .iter()
1731                .zip(per_authenticator_inputs)
1732                .map(
1733                    |(move_authenticator, (authenticator_input_objects, account_object))| {
1734                        // Check basic `object_to_authenticate` preconditions and get its
1735                        // components.
1736                        let (
1737                            auth_account_object_id,
1738                            auth_account_object_seq_number,
1739                            auth_account_object_digest,
1740                        ) = move_authenticator.object_to_authenticate_components()?;
1741
1742                        let signer = move_authenticator.address()?;
1743
1744                        let authenticator_function_ref_for_execution = self.check_move_account(
1745                            auth_account_object_id,
1746                            auth_account_object_seq_number,
1747                            auth_account_object_digest,
1748                            account_object,
1749                            &signer,
1750                        )?;
1751
1752                        Ok((
1753                            authenticator_input_objects,
1754                            authenticator_function_ref_for_execution,
1755                        ))
1756                    },
1757                )
1758                .collect::<IotaResult<Vec<_>>>()?;
1759
1760            let per_authenticator_input_objects = per_authenticator_inputs
1761                .iter()
1762                .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1763                .collect::<Vec<_>>();
1764
1765            // Serialize the TransactionData for the auth context.
1766            let tx_data_bytes =
1767                bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1768
1769            // Check the `MoveAuthenticator` input objects.
1770            // The `MoveAuthenticator` receiving objects are checked on the signing step.
1771            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
1772            // not a part of the transaction data.
1773            let authenticator_gas_budget = protocol_config.max_auth_gas();
1774            let (
1775                gas_status,
1776                per_authenticator_checked_input_objects,
1777                authenticator_and_tx_checked_input_objects,
1778            ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1779                certificate,
1780                tx_input_objects,
1781                per_authenticator_input_objects,
1782                authenticator_gas_budget,
1783                protocol_config,
1784                reference_gas_price,
1785            )?;
1786
1787            debug_assert_eq!(
1788                move_authenticators.len(),
1789                per_authenticator_checked_input_objects.len(),
1790                "Move authenticators amount must match the number of checked authenticator inputs"
1791            );
1792
1793            let move_authenticators = move_authenticators
1794                .into_iter()
1795                .zip(per_authenticator_inputs)
1796                .zip(per_authenticator_checked_input_objects)
1797                .map(
1798                    |(
1799                        (move_authenticator, (_, authenticator_function_ref_for_execution)),
1800                        authenticator_checked_input_objects,
1801                    )| {
1802                        (
1803                            move_authenticator.to_owned(),
1804                            authenticator_function_ref_for_execution,
1805                            authenticator_checked_input_objects,
1806                        )
1807                    },
1808                )
1809                .collect::<Vec<_>>();
1810
1811            let owned_object_refs = authenticator_and_tx_checked_input_objects
1812                .inner()
1813                .filter_owned_objects();
1814            self.check_owned_locks(&owned_object_refs)?;
1815
1816            epoch_store
1817                .executor()
1818                .authenticate_then_execute_transaction_to_effects(
1819                    backing_store,
1820                    protocol_config,
1821                    self.metrics.limits_metrics.clone(),
1822                    self.config
1823                        .expensive_safety_check_config
1824                        .enable_deep_per_tx_iota_conservation_check(),
1825                    self.config.certificate_deny_config.certificate_deny_set(),
1826                    &epoch_id,
1827                    epoch_start_timestamp,
1828                    gas_data,
1829                    gas_status,
1830                    move_authenticators,
1831                    authenticator_and_tx_checked_input_objects,
1832                    kind,
1833                    signer,
1834                    tx_digest,
1835                    tx_data_bytes,
1836                    &mut None,
1837                )
1838        };
1839
1840        fail_point_if!("cp_execution_nondeterminism", || {
1841            #[cfg(msim)]
1842            self.create_fail_state(certificate, epoch_store, &mut effects);
1843        });
1844
1845        let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1846        if elapsed > 0.0 {
1847            self.metrics
1848                .prepare_cert_gas_latency_ratio
1849                .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1850        }
1851
1852        Ok((inner_temp_store, effects, execution_error_opt.err()))
1853    }
1854
1855    pub fn prepare_certificate_for_benchmark(
1856        &self,
1857        certificate: &VerifiedExecutableTransaction,
1858        input_objects: InputObjects,
1859        epoch_store: &Arc<AuthorityPerEpochStore>,
1860    ) -> IotaResult<(
1861        InnerTemporaryStore,
1862        TransactionEffects,
1863        Option<ExecutionError>,
1864    )> {
1865        let lock = RwLock::new(epoch_store.epoch());
1866        let execution_guard = lock.try_read().unwrap();
1867
1868        self.prepare_certificate(
1869            &execution_guard,
1870            certificate,
1871            input_objects,
1872            vec![],
1873            epoch_store,
1874        )
1875    }
1876
1877    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
1878    /// `VmChecks::Enabled` instead.
1879    #[instrument("dry_exec_tx", level = "trace", skip_all)]
1880    #[allow(clippy::type_complexity)]
1881    pub fn dry_exec_transaction(
1882        &self,
1883        transaction: TransactionData,
1884        transaction_digest: TransactionDigest,
1885    ) -> IotaResult<(
1886        DryRunTransactionBlockResponse,
1887        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1888        TransactionEffects,
1889        Option<ObjectID>,
1890    )> {
1891        let epoch_store = self.load_epoch_store_one_call_per_task();
1892        if !self.is_fullnode(&epoch_store) {
1893            return Err(IotaError::UnsupportedFeature {
1894                error: "dry-exec is only supported on fullnodes".to_string(),
1895            });
1896        }
1897
1898        if transaction.kind().is_system_tx() {
1899            return Err(IotaError::UnsupportedFeature {
1900                error: "dry-exec does not support system transactions".to_string(),
1901            });
1902        }
1903
1904        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1905    }
1906
1907    #[allow(clippy::type_complexity)]
1908    pub fn dry_exec_transaction_for_benchmark(
1909        &self,
1910        transaction: TransactionData,
1911        transaction_digest: TransactionDigest,
1912    ) -> IotaResult<(
1913        DryRunTransactionBlockResponse,
1914        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1915        TransactionEffects,
1916        Option<ObjectID>,
1917    )> {
1918        let epoch_store = self.load_epoch_store_one_call_per_task();
1919        self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1920    }
1921
1922    #[instrument(level = "trace", skip_all)]
1923    #[allow(clippy::type_complexity)]
1924    fn dry_exec_transaction_impl(
1925        &self,
1926        epoch_store: &AuthorityPerEpochStore,
1927        transaction: TransactionData,
1928        transaction_digest: TransactionDigest,
1929    ) -> IotaResult<(
1930        DryRunTransactionBlockResponse,
1931        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1932        TransactionEffects,
1933        Option<ObjectID>,
1934    )> {
1935        // Cheap validity checks for a transaction, including input size limits.
1936        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1937
1938        let input_object_kinds = transaction.input_objects()?;
1939        let receiving_object_refs = transaction.receiving_objects();
1940
1941        iota_transaction_checks::deny::check_transaction_for_signing(
1942            &transaction,
1943            &[],
1944            &input_object_kinds,
1945            &receiving_object_refs,
1946            &self.config.transaction_deny_config,
1947            self.get_backing_package_store().as_ref(),
1948        )?;
1949
1950        let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1951            // We don't want to cache this transaction since it's a dry run.
1952            None,
1953            &input_object_kinds,
1954            &receiving_object_refs,
1955            epoch_store.epoch(),
1956        )?;
1957
1958        // make a gas object if one was not provided
1959        let mut transaction = transaction;
1960        let reference_gas_price = epoch_store.reference_gas_price();
1961        let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1962            let sender = transaction.gas_owner();
1963            let gas_object_id = ObjectID::random();
1964            let gas_object = Object::new_move(
1965                MoveObject::new_gas_coin(
1966                    OBJECT_START_VERSION,
1967                    gas_object_id,
1968                    SIMULATION_GAS_COIN_VALUE,
1969                ),
1970                Owner::AddressOwner(sender),
1971                TransactionDigest::genesis_marker(),
1972            );
1973            let gas_object_ref = gas_object.compute_object_reference();
1974            // Add gas object to transaction gas payment
1975            transaction.gas_data_mut().payment = vec![gas_object_ref];
1976            (
1977                iota_transaction_checks::check_transaction_input_with_given_gas(
1978                    epoch_store.protocol_config(),
1979                    reference_gas_price,
1980                    &transaction,
1981                    input_objects,
1982                    receiving_objects,
1983                    gas_object,
1984                    &self.metrics.bytecode_verifier_metrics,
1985                    &self.config.verifier_signing_config,
1986                )?,
1987                Some(gas_object_id),
1988            )
1989        } else {
1990            // `MoveAuthenticator`s are not supported in dry runs, so we set the
1991            // `authenticator_gas_budget` to 0.
1992            let authenticator_gas_budget = 0;
1993
1994            (
1995                iota_transaction_checks::check_transaction_input(
1996                    epoch_store.protocol_config(),
1997                    reference_gas_price,
1998                    &transaction,
1999                    input_objects,
2000                    &receiving_objects,
2001                    &self.metrics.bytecode_verifier_metrics,
2002                    &self.config.verifier_signing_config,
2003                    authenticator_gas_budget,
2004                )?,
2005                None,
2006            )
2007        };
2008
2009        let protocol_config = epoch_store.protocol_config();
2010        let (kind, signer, gas_data) = transaction.execution_parts();
2011
2012        let silent = true;
2013        let executor = iota_execution::executor(protocol_config, silent, None)
2014            .expect("Creating an executor should not fail here");
2015
2016        let expensive_checks = false;
2017        let (inner_temp_store, _, effects, execution_error) = executor
2018            .execute_transaction_to_effects(
2019                self.get_backing_store().as_ref(),
2020                protocol_config,
2021                self.metrics.limits_metrics.clone(),
2022                expensive_checks,
2023                self.config.certificate_deny_config.certificate_deny_set(),
2024                &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2025                epoch_store
2026                    .epoch_start_config()
2027                    .epoch_data()
2028                    .epoch_start_timestamp(),
2029                checked_input_objects,
2030                gas_data,
2031                gas_status,
2032                kind,
2033                signer,
2034                transaction_digest,
2035                &mut None,
2036            );
2037        let tx_digest = *effects.transaction_digest();
2038
2039        let module_cache =
2040            TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2041
2042        let mut layout_resolver =
2043            epoch_store
2044                .executor()
2045                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2046                    &inner_temp_store,
2047                    self.get_backing_package_store(),
2048                )));
2049        // Returning empty vector here because we recalculate changes in the rpc layer.
2050        let object_changes = Vec::new();
2051
2052        // Returning empty vector here because we recalculate changes in the rpc layer.
2053        let balance_changes = Vec::new();
2054
2055        let written_with_kind = effects
2056            .created()
2057            .into_iter()
2058            .map(|(oref, _)| (oref, WriteKind::Create))
2059            .chain(
2060                effects
2061                    .unwrapped()
2062                    .into_iter()
2063                    .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2064            )
2065            .chain(
2066                effects
2067                    .mutated()
2068                    .into_iter()
2069                    .map(|(oref, _)| (oref, WriteKind::Mutate)),
2070            )
2071            .map(|(oref, kind)| {
2072                let obj = inner_temp_store.written.get(&oref.0).unwrap();
2073                // TODO: Avoid clones.
2074                (oref.0, (oref, obj.clone(), kind))
2075            })
2076            .collect();
2077
2078        let execution_error_source = execution_error
2079            .as_ref()
2080            .err()
2081            .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2082
2083        Ok((
2084            DryRunTransactionBlockResponse {
2085                // to avoid cloning `transaction`, fields are populated in this order
2086                suggested_gas_price: self
2087                    .congestion_tracker
2088                    .get_prediction_suggested_gas_price(&transaction),
2089                input: IotaTransactionBlockData::try_from_with_module_cache(
2090                    transaction,
2091                    &module_cache,
2092                    tx_digest,
2093                )
2094                .map_err(|e| IotaError::TransactionSerialization {
2095                    error: format!(
2096                        "Failed to convert transaction to IotaTransactionBlockData: {e}",
2097                    ),
2098                })?, // TODO: replace the underlying try_from to IotaError. This one goes deep
2099                effects: effects.clone().try_into()?,
2100                events: IotaTransactionBlockEvents::try_from(
2101                    inner_temp_store.events.clone(),
2102                    tx_digest,
2103                    None,
2104                    layout_resolver.as_mut(),
2105                )?,
2106                object_changes,
2107                balance_changes,
2108                execution_error_source,
2109            },
2110            written_with_kind,
2111            effects,
2112            mock_gas,
2113        ))
2114    }
2115
2116    pub fn simulate_transaction(
2117        &self,
2118        mut transaction: TransactionData,
2119        checks: VmChecks,
2120    ) -> IotaResult<SimulateTransactionResult> {
2121        if transaction.kind().is_system_tx() {
2122            return Err(IotaError::UnsupportedFeature {
2123                error: "simulate does not support system transactions".to_string(),
2124            });
2125        }
2126
2127        let epoch_store = self.load_epoch_store_one_call_per_task();
2128        if !self.is_fullnode(&epoch_store) {
2129            return Err(IotaError::UnsupportedFeature {
2130                error: "simulate is only supported on fullnodes".to_string(),
2131            });
2132        }
2133
2134        // Cheap validity checks for a transaction, including input size limits.
2135        // This does not check if gas objects are missing since we may create a
2136        // mock gas object. It checks for other transaction input validity.
2137        transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2138
2139        let input_object_kinds = transaction.input_objects()?;
2140        let receiving_object_refs = transaction.receiving_objects();
2141
2142        // Since we need to simulate a validator signing the transaction, the first step
2143        // is to check if some transaction elements are denied.
2144        iota_transaction_checks::deny::check_transaction_for_signing(
2145            &transaction,
2146            &[],
2147            &input_object_kinds,
2148            &receiving_object_refs,
2149            &self.config.transaction_deny_config,
2150            self.get_backing_package_store().as_ref(),
2151        )?;
2152
2153        // Load input and receiving objects
2154        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2155            // We don't want to cache this transaction since it's a simulation.
2156            None,
2157            &input_object_kinds,
2158            &receiving_object_refs,
2159            epoch_store.epoch(),
2160        )?;
2161
2162        // Create a mock gas object if one was not provided
2163        let mock_gas_id = if transaction.gas().is_empty() {
2164            let mock_gas_object = Object::new_move(
2165                MoveObject::new_gas_coin(
2166                    OBJECT_START_VERSION,
2167                    ObjectID::MAX,
2168                    SIMULATION_GAS_COIN_VALUE,
2169                ),
2170                Owner::AddressOwner(transaction.gas_data().owner),
2171                TransactionDigest::genesis_marker(),
2172            );
2173            let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2174            transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2175            input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2176            Some(mock_gas_object.id())
2177        } else {
2178            None
2179        };
2180
2181        let protocol_config = epoch_store.protocol_config();
2182
2183        // `MoveAuthenticator`s are not supported in simulation, so we set the
2184        // `authenticator_gas_budget` to 0.
2185        let authenticator_gas_budget = 0;
2186
2187        // Checks enabled -> DRY-RUN, it means we are simulating a real TX
2188        // Checks disabled -> DEV-INSPECT, more relaxed Move VM checks
2189        let (gas_status, checked_input_objects) = if checks.enabled() {
2190            iota_transaction_checks::check_transaction_input(
2191                protocol_config,
2192                epoch_store.reference_gas_price(),
2193                &transaction,
2194                input_objects,
2195                &receiving_objects,
2196                &self.metrics.bytecode_verifier_metrics,
2197                &self.config.verifier_signing_config,
2198                authenticator_gas_budget,
2199            )?
2200        } else {
2201            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2202                protocol_config,
2203                transaction.kind(),
2204                input_objects,
2205                receiving_objects,
2206            )?;
2207            let gas_status = IotaGasStatus::new(
2208                transaction.gas_budget(),
2209                transaction.gas_price(),
2210                epoch_store.reference_gas_price(),
2211                protocol_config,
2212            )?;
2213
2214            (gas_status, checked_input_objects)
2215        };
2216
2217        // Create a new executor for the simulation
2218        let executor = iota_execution::executor(
2219            protocol_config,
2220            true, // silent
2221            None,
2222        )
2223        .expect("Creating an executor should not fail here");
2224
2225        // Execute the simulation
2226        let (kind, signer, gas_data) = transaction.execution_parts();
2227        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2228            self.get_backing_store().as_ref(),
2229            protocol_config,
2230            self.metrics.limits_metrics.clone(),
2231            false, // expensive_checks
2232            self.config.certificate_deny_config.certificate_deny_set(),
2233            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2234            epoch_store
2235                .epoch_start_config()
2236                .epoch_data()
2237                .epoch_start_timestamp(),
2238            checked_input_objects,
2239            gas_data,
2240            gas_status,
2241            kind,
2242            signer,
2243            transaction.digest(),
2244            checks.disabled(),
2245        );
2246
2247        // In the case of a dev inspect, the execution_result could be filled with some
2248        // values. Else, execution_result is empty in the case of a dry run.
2249        Ok(SimulateTransactionResult {
2250            input_objects: inner_temp_store.input_objects,
2251            output_objects: inner_temp_store.written,
2252            events: effects.events_digest().map(|_| inner_temp_store.events),
2253            effects,
2254            execution_result,
2255            suggested_gas_price: self
2256                .congestion_tracker
2257                .get_prediction_suggested_gas_price(&transaction),
2258            mock_gas_id,
2259        })
2260    }
2261
2262    /// TO BE DEPRECATED SOON: Use `simulate_transaction` with
2263    /// `VmChecks::DISABLED` instead.
2264    /// The object ID for gas can be any
2265    /// object ID, even for an uncreated object
2266    #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2267    pub async fn dev_inspect_transaction_block(
2268        &self,
2269        sender: IotaAddress,
2270        transaction_kind: TransactionKind,
2271        gas_price: Option<u64>,
2272        gas_budget: Option<u64>,
2273        gas_sponsor: Option<IotaAddress>,
2274        gas_objects: Option<Vec<ObjectRef>>,
2275        show_raw_txn_data_and_effects: Option<bool>,
2276        skip_checks: Option<bool>,
2277    ) -> IotaResult<DevInspectResults> {
2278        let epoch_store = self.load_epoch_store_one_call_per_task();
2279
2280        if !self.is_fullnode(&epoch_store) {
2281            return Err(IotaError::UnsupportedFeature {
2282                error: "dev-inspect is only supported on fullnodes".to_string(),
2283            });
2284        }
2285
2286        if transaction_kind.is_system_tx() {
2287            return Err(IotaError::UnsupportedFeature {
2288                error: "system transactions are not supported".to_string(),
2289            });
2290        }
2291
2292        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2293        let skip_checks = skip_checks.unwrap_or(true);
2294        let reference_gas_price = epoch_store.reference_gas_price();
2295        let protocol_config = epoch_store.protocol_config();
2296        let max_tx_gas = protocol_config.max_tx_gas();
2297
2298        let price = gas_price.unwrap_or(reference_gas_price);
2299        let budget = gas_budget.unwrap_or(max_tx_gas);
2300        let owner = gas_sponsor.unwrap_or(sender);
2301        // Payment might be empty here, but it's fine we'll have to deal with it later
2302        // after reading all the input objects.
2303        let payment = gas_objects.unwrap_or_default();
2304        let mut transaction = TransactionData::V1(TransactionDataV1 {
2305            kind: transaction_kind.clone(),
2306            sender,
2307            gas_data: GasData {
2308                payment,
2309                owner,
2310                price,
2311                budget,
2312            },
2313            expiration: TransactionExpiration::None,
2314        });
2315
2316        let raw_txn_data = if show_raw_txn_data_and_effects {
2317            bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2318                error: "Failed to serialize transaction during dev inspect".to_string(),
2319            })?
2320        } else {
2321            vec![]
2322        };
2323
2324        transaction.validity_check_no_gas_check(protocol_config)?;
2325
2326        let input_object_kinds = transaction.input_objects()?;
2327        let receiving_object_refs = transaction.receiving_objects();
2328
2329        iota_transaction_checks::deny::check_transaction_for_signing(
2330            &transaction,
2331            &[],
2332            &input_object_kinds,
2333            &receiving_object_refs,
2334            &self.config.transaction_deny_config,
2335            self.get_backing_package_store().as_ref(),
2336        )?;
2337
2338        let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2339            // We don't want to cache this transaction since it's a dev inspect.
2340            None,
2341            &input_object_kinds,
2342            &receiving_object_refs,
2343            epoch_store.epoch(),
2344        )?;
2345
2346        let (gas_status, checked_input_objects) = if skip_checks {
2347            // If we are skipping checks, then we call the check_dev_inspect_input function
2348            // which will perform only lightweight checks on the transaction
2349            // input. And if the gas field is empty, that means we will
2350            // use the dummy gas object so we need to add it to the input objects vector.
2351            if transaction.gas().is_empty() {
2352                // Create and use a dummy gas object if there is no gas object provided.
2353                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2354                    SIMULATION_GAS_COIN_VALUE,
2355                    transaction.gas_owner(),
2356                );
2357                let gas_object_ref = dummy_gas_object.compute_object_reference();
2358                transaction.gas_data_mut().payment = vec![gas_object_ref];
2359                input_objects.push(ObjectReadResult::new(
2360                    InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2361                    dummy_gas_object.into(),
2362                ));
2363            }
2364            let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2365                protocol_config,
2366                &transaction_kind,
2367                input_objects,
2368                receiving_objects,
2369            )?;
2370            let gas_status = IotaGasStatus::new(
2371                max_tx_gas,
2372                transaction.gas_price(),
2373                reference_gas_price,
2374                protocol_config,
2375            )?;
2376
2377            (gas_status, checked_input_objects)
2378        } else {
2379            // If we are not skipping checks, then we call the check_transaction_input
2380            // function and its dummy gas variant which will perform full
2381            // fledged checks just like a real transaction execution.
2382            if transaction.gas().is_empty() {
2383                // Create and use a dummy gas object if there is no gas object provided.
2384                let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2385                    SIMULATION_GAS_COIN_VALUE,
2386                    transaction.gas_owner(),
2387                );
2388                let gas_object_ref = dummy_gas_object.compute_object_reference();
2389                transaction.gas_data_mut().payment = vec![gas_object_ref];
2390                iota_transaction_checks::check_transaction_input_with_given_gas(
2391                    epoch_store.protocol_config(),
2392                    reference_gas_price,
2393                    &transaction,
2394                    input_objects,
2395                    receiving_objects,
2396                    dummy_gas_object,
2397                    &self.metrics.bytecode_verifier_metrics,
2398                    &self.config.verifier_signing_config,
2399                )?
2400            } else {
2401                // `MoveAuthenticator`s are not supported in dev inspects, so we set the
2402                // `authenticator_gas_budget` to 0.
2403                let authenticator_gas_budget = 0;
2404
2405                iota_transaction_checks::check_transaction_input(
2406                    epoch_store.protocol_config(),
2407                    reference_gas_price,
2408                    &transaction,
2409                    input_objects,
2410                    &receiving_objects,
2411                    &self.metrics.bytecode_verifier_metrics,
2412                    &self.config.verifier_signing_config,
2413                    authenticator_gas_budget,
2414                )?
2415            }
2416        };
2417
2418        let executor = iota_execution::executor(protocol_config, /* silent */ true, None)
2419            .expect("Creating an executor should not fail here");
2420        let gas_data = transaction.gas_data().clone();
2421        let intent_msg = IntentMessage::new(
2422            Intent {
2423                version: IntentVersion::V0,
2424                scope: IntentScope::TransactionData,
2425                app_id: IntentAppId::Iota,
2426            },
2427            transaction,
2428        );
2429        let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2430        let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2431            self.get_backing_store().as_ref(),
2432            protocol_config,
2433            self.metrics.limits_metrics.clone(),
2434            // expensive checks
2435            false,
2436            self.config.certificate_deny_config.certificate_deny_set(),
2437            &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2438            epoch_store
2439                .epoch_start_config()
2440                .epoch_data()
2441                .epoch_start_timestamp(),
2442            checked_input_objects,
2443            gas_data,
2444            gas_status,
2445            transaction_kind,
2446            sender,
2447            transaction_digest,
2448            skip_checks,
2449        );
2450
2451        let raw_effects = if show_raw_txn_data_and_effects {
2452            bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2453                error: "Failed to serialize transaction effects during dev inspect".to_string(),
2454            })?
2455        } else {
2456            vec![]
2457        };
2458
2459        let mut layout_resolver =
2460            epoch_store
2461                .executor()
2462                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2463                    &inner_temp_store,
2464                    self.get_backing_package_store(),
2465                )));
2466
2467        DevInspectResults::new(
2468            effects,
2469            inner_temp_store.events.clone(),
2470            execution_result,
2471            raw_txn_data,
2472            raw_effects,
2473            layout_resolver.as_mut(),
2474        )
2475    }
2476
2477    // Only used for testing because of how epoch store is loaded.
2478    pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2479        let epoch_store = self.epoch_store_for_testing();
2480        Ok(epoch_store.reference_gas_price())
2481    }
2482
2483    #[instrument(level = "trace", skip_all)]
2484    pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2485        self.get_transaction_cache_reader()
2486            .try_is_tx_already_executed(digest)
2487    }
2488
2489    /// Non-fallible version of `try_is_tx_already_executed`.
2490    pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2491        self.try_is_tx_already_executed(digest)
2492            .expect("storage access failed")
2493    }
2494
2495    /// Indexes a transaction by updating various indexes in the `IndexStore`.
2496    #[instrument(level = "debug", skip_all, err)]
2497    fn index_tx(
2498        &self,
2499        indexes: &IndexStore,
2500        digest: &TransactionDigest,
2501        // TODO: index_tx really just need the transaction data here.
2502        cert: &VerifiedExecutableTransaction,
2503        effects: &TransactionEffects,
2504        events: &TransactionEvents,
2505        timestamp_ms: u64,
2506        tx_coins: Option<TxCoins>,
2507        written: &WrittenObjects,
2508        inner_temporary_store: &InnerTemporaryStore,
2509    ) -> IotaResult<u64> {
2510        let changes = self
2511            .process_object_index(effects, written, inner_temporary_store)
2512            .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2513
2514        indexes.index_tx(
2515            cert.data().intent_message().value.sender(),
2516            cert.data()
2517                .intent_message()
2518                .value
2519                .input_objects()?
2520                .iter()
2521                .map(|o| o.object_id()),
2522            effects
2523                .all_changed_objects()
2524                .into_iter()
2525                .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2526            cert.data()
2527                .intent_message()
2528                .value
2529                .move_calls()
2530                .into_iter()
2531                .map(|(package, module, function)| {
2532                    (*package, module.to_owned(), function.to_owned())
2533                }),
2534            events,
2535            changes,
2536            digest,
2537            timestamp_ms,
2538            tx_coins,
2539        )
2540    }
2541
2542    #[cfg(msim)]
2543    fn create_fail_state(
2544        &self,
2545        certificate: &VerifiedExecutableTransaction,
2546        epoch_store: &Arc<AuthorityPerEpochStore>,
2547        effects: &mut TransactionEffects,
2548    ) {
2549        use std::cell::RefCell;
2550        thread_local! {
2551            static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2552        }
2553        if !certificate.data().intent_message().value.is_system_tx() {
2554            let committee = epoch_store.committee();
2555            let cur_stake = (**committee).weight(&self.name);
2556            if cur_stake > 0 {
2557                FAIL_STATE.with_borrow_mut(|fail_state| {
2558                    // let (&mut failing_stake, &mut failing_validators) = fail_state;
2559                    if fail_state.0 < committee.validity_threshold() {
2560                        fail_state.0 += cur_stake;
2561                        fail_state.1.insert(self.name);
2562                    }
2563
2564                    if fail_state.1.contains(&self.name) {
2565                        info!("cp_exec failing tx");
2566                        effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2567                    }
2568                });
2569            }
2570        }
2571    }
2572
2573    fn process_object_index(
2574        &self,
2575        effects: &TransactionEffects,
2576        written: &WrittenObjects,
2577        inner_temporary_store: &InnerTemporaryStore,
2578    ) -> IotaResult<ObjectIndexChanges> {
2579        let epoch_store = self.load_epoch_store_one_call_per_task();
2580        let mut layout_resolver =
2581            epoch_store
2582                .executor()
2583                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2584                    inner_temporary_store,
2585                    self.get_backing_package_store(),
2586                )));
2587
2588        let modified_at_version = effects
2589            .modified_at_versions()
2590            .into_iter()
2591            .collect::<HashMap<_, _>>();
2592
2593        let tx_digest = effects.transaction_digest();
2594        let mut deleted_owners = vec![];
2595        let mut deleted_dynamic_fields = vec![];
2596        for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2597            let old_version = modified_at_version.get(&id).unwrap();
2598            // When we process the index, the latest object hasn't been written yet so
2599            // the old object must be present.
2600            match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2601                |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2602            ) {
2603                Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2604                Owner::ObjectOwner(object_id) => {
2605                    deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2606                }
2607                _ => {}
2608            }
2609        }
2610
2611        let mut new_owners = vec![];
2612        let mut new_dynamic_fields = vec![];
2613
2614        for (oref, owner, kind) in effects.all_changed_objects() {
2615            let id = &oref.0;
2616            // For mutated objects, retrieve old owner and delete old index if there is a
2617            // owner change.
2618            if let WriteKind::Mutate = kind {
2619                let Some(old_version) = modified_at_version.get(id) else {
2620                    panic!(
2621                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2622                    );
2623                };
2624                // When we process the index, the latest object hasn't been written yet so
2625                // the old object must be present.
2626                let Some(old_object) = self
2627                    .get_object_store()
2628                    .try_get_object_by_key(id, *old_version)?
2629                else {
2630                    panic!(
2631                        "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2632                    );
2633                };
2634                if old_object.owner != owner {
2635                    match old_object.owner {
2636                        Owner::AddressOwner(addr) => {
2637                            deleted_owners.push((addr, *id));
2638                        }
2639                        Owner::ObjectOwner(object_id) => {
2640                            deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2641                        }
2642                        _ => {}
2643                    }
2644                }
2645            }
2646
2647            match owner {
2648                Owner::AddressOwner(addr) => {
2649                    // TODO: We can remove the object fetching after we added ObjectType to
2650                    // TransactionEffects
2651                    let new_object = written.get(id).unwrap_or_else(
2652                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2653                    );
2654                    assert_eq!(
2655                        new_object.version(),
2656                        oref.1,
2657                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2658                        tx_digest,
2659                        id,
2660                        new_object.version(),
2661                        oref.1
2662                    );
2663
2664                    let type_ = new_object
2665                        .type_()
2666                        .map(|type_| ObjectType::Struct(type_.clone()))
2667                        .unwrap_or(ObjectType::Package);
2668
2669                    new_owners.push((
2670                        (addr, *id),
2671                        ObjectInfo {
2672                            object_id: *id,
2673                            version: oref.1,
2674                            digest: oref.2,
2675                            type_,
2676                            owner,
2677                            previous_transaction: *effects.transaction_digest(),
2678                        },
2679                    ));
2680                }
2681                Owner::ObjectOwner(owner) => {
2682                    let new_object = written.get(id).unwrap_or_else(
2683                        || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2684                    );
2685                    assert_eq!(
2686                        new_object.version(),
2687                        oref.1,
2688                        "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2689                        tx_digest,
2690                        id,
2691                        new_object.version(),
2692                        oref.1
2693                    );
2694
2695                    let Some(df_info) = self
2696                        .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2697                        .unwrap_or_else(|e| {
2698                            error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2699                            None
2700                        }
2701                    )
2702                        else {
2703                            // Skip indexing for non dynamic field objects.
2704                            continue;
2705                        };
2706                    new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2707                }
2708                _ => {}
2709            }
2710        }
2711
2712        Ok(ObjectIndexChanges {
2713            deleted_owners,
2714            deleted_dynamic_fields,
2715            new_owners,
2716            new_dynamic_fields,
2717        })
2718    }
2719
2720    fn try_create_dynamic_field_info(
2721        &self,
2722        o: &Object,
2723        written: &WrittenObjects,
2724        resolver: &mut dyn LayoutResolver,
2725    ) -> IotaResult<Option<DynamicFieldInfo>> {
2726        // Skip if not a move object
2727        let Some(move_object) = o.data.try_as_move().cloned() else {
2728            return Ok(None);
2729        };
2730
2731        // We only index dynamic field objects
2732        if !move_object.type_().is_dynamic_field() {
2733            return Ok(None);
2734        }
2735
2736        let layout = resolver
2737            .get_annotated_layout(&move_object.type_().clone().into())?
2738            .into_layout();
2739
2740        let field =
2741            DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2742                IotaError::ObjectDeserialization {
2743                    error: e.to_string(),
2744                }
2745            })?;
2746
2747        let type_ = field.kind;
2748        let name_type: TypeTag = field.name_layout.into();
2749        let bcs_name = field.name_bytes.to_owned();
2750
2751        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2752            .map_err(|e| {
2753                warn!("{e}");
2754                IotaError::ObjectDeserialization {
2755                    error: e.to_string(),
2756                }
2757            })?;
2758
2759        let name = DynamicFieldName {
2760            type_: name_type,
2761            value: IotaMoveValue::from(name_value).to_json_value(),
2762        };
2763
2764        let value_metadata = field.value_metadata().map_err(|e| {
2765            warn!("{e}");
2766            IotaError::ObjectDeserialization {
2767                error: e.to_string(),
2768            }
2769        })?;
2770
2771        Ok(Some(match value_metadata {
2772            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2773                name,
2774                bcs_name,
2775                type_,
2776                object_type: object_type.to_canonical_string(/* with_prefix */ true),
2777                object_id: o.id(),
2778                version: o.version(),
2779                digest: o.digest(),
2780            },
2781
2782            DFV::ValueMetadata::DynamicObjectField(object_id) => {
2783                // Find the actual object from storage using the object id obtained from the
2784                // wrapper.
2785
2786                // Try to find the object in the written objects first.
2787                let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2788                    (
2789                        object.version(),
2790                        object.digest(),
2791                        object.data.type_().unwrap().clone(),
2792                    )
2793                } else {
2794                    // If not found, try to find it in the database.
2795                    let object = self
2796                        .get_object_store()
2797                        .try_get_object_by_key(&object_id, o.version())?
2798                        .ok_or_else(|| UserInputError::ObjectNotFound {
2799                            object_id,
2800                            version: Some(o.version()),
2801                        })?;
2802                    let version = object.version();
2803                    let digest = object.digest();
2804                    let object_type = object.data.type_().unwrap().clone();
2805                    (version, digest, object_type)
2806                };
2807
2808                DynamicFieldInfo {
2809                    name,
2810                    bcs_name,
2811                    type_,
2812                    object_type: object_type.to_string(),
2813                    object_id,
2814                    version,
2815                    digest,
2816                }
2817            }
2818        }))
2819    }
2820
2821    #[instrument(level = "trace", skip_all, err)]
2822    fn post_process_one_tx(
2823        &self,
2824        certificate: &VerifiedExecutableTransaction,
2825        effects: &TransactionEffects,
2826        inner_temporary_store: &InnerTemporaryStore,
2827        epoch_store: &Arc<AuthorityPerEpochStore>,
2828    ) -> IotaResult {
2829        if self.indexes.is_none() {
2830            return Ok(());
2831        }
2832
2833        let _scope = monitored_scope("Execution::post_process_one_tx");
2834
2835        let tx_digest = certificate.digest();
2836        let timestamp_ms = Self::unixtime_now_ms();
2837        let events = &inner_temporary_store.events;
2838        let written = &inner_temporary_store.written;
2839        let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2840            effects,
2841            inner_temporary_store,
2842            epoch_store,
2843        );
2844
2845        // Index tx
2846        if let Some(indexes) = &self.indexes {
2847            let _ = self
2848                .index_tx(
2849                    indexes.as_ref(),
2850                    tx_digest,
2851                    certificate,
2852                    effects,
2853                    events,
2854                    timestamp_ms,
2855                    tx_coins,
2856                    written,
2857                    inner_temporary_store,
2858                )
2859                .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2860                .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2861                .expect("Indexing tx should not fail");
2862
2863            let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2864            let events = self.make_transaction_block_events(
2865                events.clone(),
2866                *tx_digest,
2867                timestamp_ms,
2868                epoch_store,
2869                inner_temporary_store,
2870            )?;
2871            // Emit events
2872            self.subscription_handler
2873                .process_tx(certificate.data().transaction_data(), &effects, &events)
2874                .tap_ok(|_| {
2875                    self.metrics
2876                        .post_processing_total_tx_had_event_processed
2877                        .inc()
2878                })
2879                .tap_err(|e| {
2880                    warn!(
2881                        ?tx_digest,
2882                        "Post processing - Couldn't process events for tx: {}", e
2883                    )
2884                })?;
2885
2886            self.metrics
2887                .post_processing_total_events_emitted
2888                .inc_by(events.data.len() as u64);
2889        };
2890        Ok(())
2891    }
2892
2893    fn make_transaction_block_events(
2894        &self,
2895        transaction_events: TransactionEvents,
2896        digest: TransactionDigest,
2897        timestamp_ms: u64,
2898        epoch_store: &Arc<AuthorityPerEpochStore>,
2899        inner_temporary_store: &InnerTemporaryStore,
2900    ) -> IotaResult<IotaTransactionBlockEvents> {
2901        let mut layout_resolver =
2902            epoch_store
2903                .executor()
2904                .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2905                    inner_temporary_store,
2906                    self.get_backing_package_store(),
2907                )));
2908        IotaTransactionBlockEvents::try_from(
2909            transaction_events,
2910            digest,
2911            Some(timestamp_ms),
2912            layout_resolver.as_mut(),
2913        )
2914    }
2915
2916    pub fn unixtime_now_ms() -> u64 {
2917        let now = SystemTime::now()
2918            .duration_since(UNIX_EPOCH)
2919            .expect("Time went backwards")
2920            .as_millis();
2921        u64::try_from(now).expect("Travelling in time machine")
2922    }
2923
2924    #[instrument(level = "trace", skip_all)]
2925    pub async fn handle_transaction_info_request(
2926        &self,
2927        request: TransactionInfoRequest,
2928    ) -> IotaResult<TransactionInfoResponse> {
2929        let epoch_store = self.load_epoch_store_one_call_per_task();
2930        let (transaction, status) = self
2931            .get_transaction_status(&request.transaction_digest, &epoch_store)?
2932            .ok_or(IotaError::TransactionNotFound {
2933                digest: request.transaction_digest,
2934            })?;
2935        Ok(TransactionInfoResponse {
2936            transaction,
2937            status,
2938        })
2939    }
2940
2941    #[instrument(level = "trace", skip_all)]
2942    pub async fn handle_object_info_request(
2943        &self,
2944        request: ObjectInfoRequest,
2945    ) -> IotaResult<ObjectInfoResponse> {
2946        let epoch_store = self.load_epoch_store_one_call_per_task();
2947
2948        let requested_object_seq = match request.request_kind {
2949            ObjectInfoRequestKind::LatestObjectInfo => {
2950                let (_, seq, _) = self
2951                    .try_get_object_or_tombstone(request.object_id)
2952                    .await?
2953                    .ok_or_else(|| {
2954                        IotaError::from(UserInputError::ObjectNotFound {
2955                            object_id: request.object_id,
2956                            version: None,
2957                        })
2958                    })?;
2959                seq
2960            }
2961            ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2962        };
2963
2964        let object = self
2965            .get_object_store()
2966            .try_get_object_by_key(&request.object_id, requested_object_seq)?
2967            .ok_or_else(|| {
2968                IotaError::from(UserInputError::ObjectNotFound {
2969                    object_id: request.object_id,
2970                    version: Some(requested_object_seq),
2971                })
2972            })?;
2973
2974        let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2975            (request.generate_layout, object.data.try_as_move())
2976        {
2977            Some(into_struct_layout(
2978                epoch_store
2979                    .executor()
2980                    .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2981                    .get_annotated_layout(&move_obj.type_().clone().into())?,
2982            )?)
2983        } else {
2984            None
2985        };
2986
2987        let lock = if !object.is_address_owned() {
2988            // Only address owned objects have locks.
2989            None
2990        } else {
2991            self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2992                .await?
2993                .map(|s| s.into_inner())
2994        };
2995
2996        Ok(ObjectInfoResponse {
2997            object,
2998            layout,
2999            lock_for_debugging: lock,
3000        })
3001    }
3002
3003    #[instrument(level = "trace", skip_all)]
3004    pub fn handle_checkpoint_request(
3005        &self,
3006        request: &CheckpointRequest,
3007    ) -> IotaResult<CheckpointResponse> {
3008        let summary = if request.certified {
3009            let summary = match request.sequence_number {
3010                Some(seq) => self
3011                    .checkpoint_store
3012                    .get_checkpoint_by_sequence_number(seq)?,
3013                None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3014            }
3015            .map(|v| v.into_inner());
3016            summary.map(CheckpointSummaryResponse::Certified)
3017        } else {
3018            let summary = match request.sequence_number {
3019                Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3020                None => self
3021                    .checkpoint_store
3022                    .get_latest_locally_computed_checkpoint()?,
3023            };
3024            summary.map(CheckpointSummaryResponse::Pending)
3025        };
3026        let contents = match &summary {
3027            Some(s) => self
3028                .checkpoint_store
3029                .get_checkpoint_contents(&s.content_digest())?,
3030            None => None,
3031        };
3032        Ok(CheckpointResponse {
3033            checkpoint: summary,
3034            contents,
3035        })
3036    }
3037
3038    fn check_protocol_version(
3039        supported_protocol_versions: SupportedProtocolVersions,
3040        current_version: ProtocolVersion,
3041    ) {
3042        info!("current protocol version is now {:?}", current_version);
3043        info!("supported versions are: {:?}", supported_protocol_versions);
3044        if !supported_protocol_versions.is_version_supported(current_version) {
3045            let msg = format!(
3046                "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3047            );
3048
3049            error!("{}", msg);
3050            eprintln!("{msg}");
3051
3052            #[cfg(not(msim))]
3053            std::process::exit(1);
3054
3055            #[cfg(msim)]
3056            iota_simulator::task::shutdown_current_node();
3057        }
3058    }
3059
3060    #[expect(clippy::disallowed_methods)] // allow unbounded_channel()
3061    pub async fn new(
3062        name: AuthorityName,
3063        secret: StableSyncAuthoritySigner,
3064        supported_protocol_versions: SupportedProtocolVersions,
3065        store: Arc<AuthorityStore>,
3066        execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3067        epoch_store: Arc<AuthorityPerEpochStore>,
3068        committee_store: Arc<CommitteeStore>,
3069        indexes: Option<Arc<IndexStore>>,
3070        grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3071        checkpoint_store: Arc<CheckpointStore>,
3072        prometheus_registry: &Registry,
3073        genesis_objects: &[Object],
3074        db_checkpoint_config: &DBCheckpointConfig,
3075        config: NodeConfig,
3076        archive_readers: ArchiveReaderBalancer,
3077        validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3078        chain_identifier: ChainIdentifier,
3079        pruner_db: Option<Arc<AuthorityPrunerTables>>,
3080    ) -> Arc<Self> {
3081        Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3082
3083        let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3084        let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3085        let transaction_manager = Arc::new(TransactionManager::new(
3086            execution_cache_trait_pointers.object_cache_reader.clone(),
3087            execution_cache_trait_pointers
3088                .transaction_cache_reader
3089                .clone(),
3090            &epoch_store,
3091            tx_ready_certificates,
3092            metrics.clone(),
3093        ));
3094        let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3095
3096        let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3097            epoch_store.get_parent_path(),
3098            &config.authority_store_pruning_config,
3099        );
3100        let _pruner = AuthorityStorePruner::new(
3101            store.perpetual_tables.clone(),
3102            checkpoint_store.clone(),
3103            grpc_indexes_store.clone(),
3104            indexes.clone(),
3105            config.authority_store_pruning_config.clone(),
3106            epoch_store.committee().authority_exists(&name),
3107            epoch_store.epoch_start_state().epoch_duration_ms(),
3108            prometheus_registry,
3109            archive_readers,
3110            pruner_db,
3111        );
3112        let input_loader =
3113            TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3114        let epoch = epoch_store.epoch();
3115        let rgp = epoch_store.reference_gas_price();
3116        let state = Arc::new(AuthorityState {
3117            name,
3118            secret,
3119            execution_lock: RwLock::new(epoch),
3120            epoch_store: ArcSwap::new(epoch_store.clone()),
3121            input_loader,
3122            execution_cache_trait_pointers,
3123            indexes,
3124            grpc_indexes_store,
3125            subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3126            checkpoint_store,
3127            committee_store,
3128            transaction_manager,
3129            tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3130            metrics,
3131            _pruner,
3132            _authority_per_epoch_pruner,
3133            db_checkpoint_config: db_checkpoint_config.clone(),
3134            config,
3135            overload_info: AuthorityOverloadInfo::default(),
3136            validator_tx_finalizer,
3137            chain_identifier,
3138            congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3139        });
3140
3141        // Start a task to execute ready certificates.
3142        let authority_state = Arc::downgrade(&state);
3143        spawn_monitored_task!(execution_process(
3144            authority_state,
3145            rx_ready_certificates,
3146            rx_execution_shutdown,
3147        ));
3148        spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3149
3150        // TODO: This doesn't belong to the constructor of AuthorityState.
3151        state
3152            .create_owner_index_if_empty(genesis_objects, &epoch_store)
3153            .expect("Error indexing genesis objects.");
3154
3155        state
3156    }
3157
3158    // TODO: Consolidate our traits to reduce the number of methods here.
3159    pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3160        &self.execution_cache_trait_pointers.object_cache_reader
3161    }
3162
3163    pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3164        &self.execution_cache_trait_pointers.transaction_cache_reader
3165    }
3166
3167    pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3168        &self.execution_cache_trait_pointers.cache_writer
3169    }
3170
3171    pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3172        &self.execution_cache_trait_pointers.backing_store
3173    }
3174
3175    pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3176        &self.execution_cache_trait_pointers.backing_package_store
3177    }
3178
3179    pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3180        &self.execution_cache_trait_pointers.object_store
3181    }
3182
3183    pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3184        &self.execution_cache_trait_pointers.reconfig_api
3185    }
3186
3187    pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3188        &self.execution_cache_trait_pointers.accumulator_store
3189    }
3190
3191    pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3192        &self.execution_cache_trait_pointers.checkpoint_cache
3193    }
3194
3195    pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3196        &self.execution_cache_trait_pointers.state_sync_store
3197    }
3198
3199    pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3200        &self.execution_cache_trait_pointers.cache_commit
3201    }
3202
3203    pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3204        self.execution_cache_trait_pointers
3205            .testing_api
3206            .database_for_testing()
3207    }
3208
3209    pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3210        &self,
3211        config: NodeConfig,
3212        metrics: Arc<AuthorityStorePruningMetrics>,
3213    ) -> anyhow::Result<()> {
3214        let archive_readers =
3215            ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3216        AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3217            &self.database_for_testing().perpetual_tables,
3218            &self.checkpoint_store,
3219            self.grpc_indexes_store.as_deref(),
3220            None,
3221            config.authority_store_pruning_config,
3222            metrics,
3223            archive_readers,
3224            EPOCH_DURATION_MS_FOR_TESTING,
3225        )
3226        .await
3227    }
3228
3229    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3230        &self.transaction_manager
3231    }
3232
3233    /// Adds transactions / certificates to transaction manager for ordered
3234    /// execution.
3235    pub fn enqueue_transactions_for_execution(
3236        &self,
3237        txns: Vec<VerifiedExecutableTransaction>,
3238        epoch_store: &Arc<AuthorityPerEpochStore>,
3239    ) {
3240        self.transaction_manager.enqueue(txns, epoch_store)
3241    }
3242
3243    /// Adds certificates to transaction manager for ordered execution.
3244    pub fn enqueue_certificates_for_execution(
3245        &self,
3246        certs: Vec<VerifiedCertificate>,
3247        epoch_store: &Arc<AuthorityPerEpochStore>,
3248    ) {
3249        self.transaction_manager
3250            .enqueue_certificates(certs, epoch_store)
3251    }
3252
3253    pub fn enqueue_with_expected_effects_digest(
3254        &self,
3255        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3256        epoch_store: &AuthorityPerEpochStore,
3257    ) {
3258        self.transaction_manager
3259            .enqueue_with_expected_effects_digest(certs, epoch_store)
3260    }
3261
3262    fn create_owner_index_if_empty(
3263        &self,
3264        genesis_objects: &[Object],
3265        epoch_store: &Arc<AuthorityPerEpochStore>,
3266    ) -> IotaResult {
3267        let Some(index_store) = &self.indexes else {
3268            return Ok(());
3269        };
3270        if !index_store.is_empty() {
3271            return Ok(());
3272        }
3273
3274        let mut new_owners = vec![];
3275        let mut new_dynamic_fields = vec![];
3276        let mut layout_resolver = epoch_store
3277            .executor()
3278            .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3279        for o in genesis_objects.iter() {
3280            match o.owner {
3281                Owner::AddressOwner(addr) => new_owners.push((
3282                    (addr, o.id()),
3283                    ObjectInfo::new(&o.compute_object_reference(), o),
3284                )),
3285                Owner::ObjectOwner(object_id) => {
3286                    let id = o.id();
3287                    let info = match self.try_create_dynamic_field_info(
3288                        o,
3289                        &BTreeMap::new(),
3290                        layout_resolver.as_mut(),
3291                    ) {
3292                        Ok(Some(info)) => info,
3293                        Ok(None) => continue,
3294                        Err(IotaError::UserInput {
3295                            error:
3296                                UserInputError::ObjectNotFound {
3297                                    object_id: not_found_id,
3298                                    version,
3299                                },
3300                        }) => {
3301                            warn!(
3302                                ?not_found_id,
3303                                ?version,
3304                                object_owner=?object_id,
3305                                field=?id,
3306                                "Skipping dynamic field: referenced genesis object not found"
3307                            );
3308                            continue;
3309                        }
3310                        Err(e) => return Err(e),
3311                    };
3312                    new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3313                }
3314                _ => {}
3315            }
3316        }
3317
3318        index_store.insert_genesis_objects(ObjectIndexChanges {
3319            deleted_owners: vec![],
3320            deleted_dynamic_fields: vec![],
3321            new_owners,
3322            new_dynamic_fields,
3323        })
3324    }
3325
3326    /// Attempts to acquire execution lock for an executable transaction.
3327    /// Returns the lock if the transaction is matching current executed epoch
3328    /// Returns None otherwise
3329    pub fn execution_lock_for_executable_transaction(
3330        &self,
3331        transaction: &VerifiedExecutableTransaction,
3332    ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3333        let lock = self
3334            .execution_lock
3335            .try_read()
3336            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3337        if *lock == transaction.auth_sig().epoch() {
3338            Ok(lock)
3339        } else {
3340            Err(IotaError::WrongEpoch {
3341                expected_epoch: *lock,
3342                actual_epoch: transaction.auth_sig().epoch(),
3343            })
3344        }
3345    }
3346
3347    /// Acquires the execution lock for the duration of a transaction signing
3348    /// request. This prevents reconfiguration from starting until we are
3349    /// finished handling the signing request. Otherwise, in-memory lock
3350    /// state could be cleared (by `ObjectLocks::clear_cached_locks`)
3351    /// while we are attempting to acquire locks for the transaction.
3352    pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3353        self.execution_lock
3354            .try_read()
3355            .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3356    }
3357
3358    pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3359        self.execution_lock.write().await
3360    }
3361
3362    #[instrument(level = "error", skip_all)]
3363    pub async fn reconfigure(
3364        &self,
3365        cur_epoch_store: &AuthorityPerEpochStore,
3366        supported_protocol_versions: SupportedProtocolVersions,
3367        new_committee: Committee,
3368        epoch_start_configuration: EpochStartConfiguration,
3369        accumulator: Arc<StateAccumulator>,
3370        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3371        epoch_supply_change: i64,
3372        epoch_last_checkpoint: CheckpointSequenceNumber,
3373    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3374        Self::check_protocol_version(
3375            supported_protocol_versions,
3376            epoch_start_configuration
3377                .epoch_start_state()
3378                .protocol_version(),
3379        );
3380        self.metrics.reset_on_reconfigure();
3381        self.committee_store.insert_new_committee(&new_committee)?;
3382
3383        // Wait until no transactions are being executed.
3384        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3385
3386        // Terminate all epoch-specific tasks (those started with within_alive_epoch).
3387        cur_epoch_store.epoch_terminated().await;
3388
3389        let highest_locally_built_checkpoint_seq = self
3390            .checkpoint_store
3391            .get_latest_locally_computed_checkpoint()?
3392            .map(|c| *c.sequence_number())
3393            .unwrap_or(0);
3394
3395        assert!(
3396            epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3397            "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3398        );
3399        if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3400            // if we built the last checkpoint locally (as opposed to receiving it from a
3401            // peer), then all shared_version_assignments except the one for the
3402            // ChangeEpoch transaction should have been removed
3403            let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3404            // Note that while 1 is the typical value, 0 is possible if the node restarts
3405            // after committing the last checkpoint but before reconfiguring.
3406            if num_shared_version_assignments > 1 {
3407                // If this happens in prod, we have a memory leak, but not a correctness issue.
3408                debug_fatal!(
3409                    "all shared_version_assignments should have been removed \
3410                    (num_shared_version_assignments: {num_shared_version_assignments})"
3411                );
3412            }
3413        }
3414
3415        // Safe to reconfigure now. No transactions are being executed,
3416        // and no epoch-specific tasks are running.
3417
3418        // TODO: revert_uncommitted_epoch_transactions will soon be unnecessary -
3419        // clear_state_end_of_epoch() can simply drop all uncommitted transactions
3420        self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3421            .await?;
3422        self.get_reconfig_api()
3423            .clear_state_end_of_epoch(&execution_lock);
3424        self.check_system_consistency(
3425            cur_epoch_store,
3426            accumulator,
3427            expensive_safety_check_config,
3428            epoch_supply_change,
3429        )?;
3430        self.get_reconfig_api()
3431            .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3432        if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3433            if self
3434                .db_checkpoint_config
3435                .perform_db_checkpoints_at_epoch_end
3436            {
3437                let checkpoint_indexes = self
3438                    .db_checkpoint_config
3439                    .perform_index_db_checkpoints_at_epoch_end
3440                    .unwrap_or(false);
3441                let current_epoch = cur_epoch_store.epoch();
3442                let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3443                self.checkpoint_all_dbs(
3444                    &epoch_checkpoint_path,
3445                    cur_epoch_store,
3446                    checkpoint_indexes,
3447                )?;
3448            }
3449        }
3450
3451        self.get_reconfig_api()
3452            .reconfigure_cache(&epoch_start_configuration)
3453            .await;
3454
3455        let new_epoch = new_committee.epoch;
3456        let new_epoch_store = self
3457            .reopen_epoch_db(
3458                cur_epoch_store,
3459                new_committee,
3460                epoch_start_configuration,
3461                expensive_safety_check_config,
3462                epoch_last_checkpoint,
3463            )
3464            .await?;
3465        assert_eq!(new_epoch_store.epoch(), new_epoch);
3466        self.transaction_manager.reconfigure(new_epoch);
3467        *execution_lock = new_epoch;
3468        // drop execution_lock after epoch store was updated
3469        // see also assert in AuthorityState::process_certificate
3470        // on the epoch store and execution lock epoch match
3471        Ok(new_epoch_store)
3472    }
3473
3474    /// Advance the epoch store to the next epoch for testing only.
3475    /// This only manually sets all the places where we have the epoch number.
3476    /// It doesn't properly reconfigure the node, hence should be only used for
3477    /// testing.
3478    pub async fn reconfigure_for_testing(&self) {
3479        let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3480        let epoch_store = self.epoch_store_for_testing().clone();
3481        let protocol_config = epoch_store.protocol_config().clone();
3482        // The current protocol config used in the epoch store may have been overridden
3483        // and diverged from the protocol config definitions. That override may
3484        // have now been dropped when the initial guard was dropped. We reapply
3485        // the override before creating the new epoch store, to make sure that
3486        // the new epoch store has the same protocol config as the current one.
3487        // Since this is for testing only, we mostly like to keep the protocol config
3488        // the same across epochs.
3489        let _guard =
3490            ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3491        let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3492            self.get_backing_package_store().clone(),
3493            self.get_object_store().clone(),
3494            &self.config.expensive_safety_check_config,
3495            self.checkpoint_store
3496                .get_epoch_last_checkpoint(epoch_store.epoch())
3497                .unwrap()
3498                .map(|c| *c.sequence_number())
3499                .unwrap_or_default(),
3500        );
3501        let new_epoch = new_epoch_store.epoch();
3502        self.transaction_manager.reconfigure(new_epoch);
3503        self.epoch_store.store(new_epoch_store);
3504        epoch_store.epoch_terminated().await;
3505        *execution_lock = new_epoch;
3506    }
3507
3508    #[instrument(level = "error", skip_all)]
3509    fn check_system_consistency(
3510        &self,
3511        cur_epoch_store: &AuthorityPerEpochStore,
3512        accumulator: Arc<StateAccumulator>,
3513        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3514        epoch_supply_change: i64,
3515    ) -> IotaResult<()> {
3516        info!(
3517            "Performing iota conservation consistency check for epoch {}",
3518            cur_epoch_store.epoch()
3519        );
3520
3521        if cfg!(debug_assertions) {
3522            cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3523        }
3524
3525        self.get_reconfig_api()
3526            .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3527
3528        // check for root state hash consistency with live object set
3529        if expensive_safety_check_config.enable_state_consistency_check() {
3530            info!(
3531                "Performing state consistency check for epoch {}",
3532                cur_epoch_store.epoch()
3533            );
3534            self.expensive_check_is_consistent_state(
3535                accumulator,
3536                cur_epoch_store,
3537                cfg!(debug_assertions), // panic in debug mode only
3538            );
3539        }
3540
3541        if expensive_safety_check_config.enable_secondary_index_checks() {
3542            if let Some(indexes) = self.indexes.clone() {
3543                verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3544                    .expect("secondary indexes are inconsistent");
3545            }
3546        }
3547
3548        Ok(())
3549    }
3550
3551    fn expensive_check_is_consistent_state(
3552        &self,
3553        accumulator: Arc<StateAccumulator>,
3554        cur_epoch_store: &AuthorityPerEpochStore,
3555        panic: bool,
3556    ) {
3557        let live_object_set_hash = accumulator.digest_live_object_set();
3558
3559        let root_state_hash: ECMHLiveObjectSetDigest = self
3560            .get_accumulator_store()
3561            .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3562            .expect("Retrieving root state hash cannot fail")
3563            .expect("Root state hash for epoch must exist")
3564            .1
3565            .digest()
3566            .into();
3567
3568        let is_inconsistent = root_state_hash != live_object_set_hash;
3569        if is_inconsistent {
3570            if panic {
3571                panic!(
3572                    "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3573                );
3574            } else {
3575                error!(
3576                    "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3577                    root_state_hash, live_object_set_hash
3578                );
3579            }
3580        } else {
3581            info!("State consistency check passed");
3582        }
3583
3584        if !panic {
3585            accumulator.set_inconsistent_state(is_inconsistent);
3586        }
3587    }
3588
3589    pub fn current_epoch_for_testing(&self) -> EpochId {
3590        self.epoch_store_for_testing().epoch()
3591    }
3592
3593    #[instrument(level = "error", skip_all)]
3594    pub fn checkpoint_all_dbs(
3595        &self,
3596        checkpoint_path: &Path,
3597        cur_epoch_store: &AuthorityPerEpochStore,
3598        checkpoint_indexes: bool,
3599    ) -> IotaResult {
3600        let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3601        let current_epoch = cur_epoch_store.epoch();
3602
3603        if checkpoint_path.exists() {
3604            info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3605            return Ok(());
3606        }
3607
3608        let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3609        let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3610
3611        if checkpoint_path_tmp.exists() {
3612            fs::remove_dir_all(&checkpoint_path_tmp)
3613                .map_err(|e| IotaError::FileIO(e.to_string()))?;
3614        }
3615
3616        fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3617        fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3618
3619        // NOTE: Do not change the order of invoking these checkpoint calls
3620        // We want to snapshot checkpoint db first to not race with state sync
3621        self.checkpoint_store
3622            .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3623
3624        self.get_reconfig_api()
3625            .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3626
3627        self.committee_store
3628            .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3629
3630        if checkpoint_indexes {
3631            if let Some(indexes) = self.indexes.as_ref() {
3632                indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3633            }
3634            if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3635                grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3636            }
3637        }
3638
3639        fs::rename(checkpoint_path_tmp, checkpoint_path)
3640            .map_err(|e| IotaError::FileIO(e.to_string()))?;
3641        Ok(())
3642    }
3643
3644    /// Load the current epoch store. This can change during reconfiguration. To
3645    /// ensure that we never end up accessing different epoch stores in a
3646    /// single task, we need to make sure that this is called once per task.
3647    /// Each call needs to be carefully audited to ensure it is
3648    /// the case. This also means we should minimize the number of call-sites.
3649    /// Only call it when there is no way to obtain it from somewhere else.
3650    pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3651        self.epoch_store.load()
3652    }
3653
3654    // Load the epoch store, should be used in tests only.
3655    pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3656        self.load_epoch_store_one_call_per_task()
3657    }
3658
3659    pub fn clone_committee_for_testing(&self) -> Committee {
3660        Committee::clone(self.epoch_store_for_testing().committee())
3661    }
3662
3663    #[instrument(level = "trace", skip_all)]
3664    pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3665        self.get_object_store()
3666            .try_get_object(object_id)
3667            .map_err(Into::into)
3668    }
3669
3670    /// Non-fallible version of `try_get_object`.
3671    pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3672        self.try_get_object(object_id)
3673            .await
3674            .expect("storage access failed")
3675    }
3676
3677    pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3678        Ok(self
3679            .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3680            .await?
3681            .expect("framework object should always exist")
3682            .compute_object_reference())
3683    }
3684
3685    // This function is only used for testing.
3686    pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3687        self.get_object_cache_reader()
3688            .try_get_iota_system_state_object_unsafe()
3689    }
3690
3691    #[instrument(level = "trace", skip_all)]
3692    pub fn get_checkpoint_by_sequence_number(
3693        &self,
3694        sequence_number: CheckpointSequenceNumber,
3695    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3696        Ok(self
3697            .checkpoint_store
3698            .get_checkpoint_by_sequence_number(sequence_number)?)
3699    }
3700
3701    /// Wait for the given transactions to be included in a checkpoint.
3702    ///
3703    /// Returns a mapping from transaction digest to
3704    /// `(checkpoint_sequence_number, checkpoint_timestamp_ms)`.
3705    /// On timeout, returns partial results for any transactions that were
3706    /// already checkpointed.
3707    pub async fn wait_for_checkpoint_inclusion(
3708        &self,
3709        digests: &[TransactionDigest],
3710        timeout: Duration,
3711    ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3712        let epoch_store = self.load_epoch_store_one_call_per_task();
3713
3714        // Local cache so multiple transactions in the same checkpoint only
3715        // trigger a single checkpoint summary lookup.
3716        let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3717
3718        let results = epoch_store
3719            .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3720                *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3721                    self.get_checkpoint_by_sequence_number(seq)
3722                        .ok()
3723                        .flatten()
3724                        .map(|c| c.timestamp_ms)
3725                        .unwrap_or(0)
3726                })
3727            })
3728            .await?;
3729
3730        Ok(digests
3731            .iter()
3732            .copied()
3733            .zip(results)
3734            .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3735            .collect())
3736    }
3737
3738    #[instrument(level = "trace", skip_all)]
3739    pub fn get_transaction_checkpoint_for_tests(
3740        &self,
3741        digest: &TransactionDigest,
3742        epoch_store: &AuthorityPerEpochStore,
3743    ) -> IotaResult<Option<VerifiedCheckpoint>> {
3744        let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3745        let Some(checkpoint) = checkpoint else {
3746            return Ok(None);
3747        };
3748        let checkpoint = self
3749            .checkpoint_store
3750            .get_checkpoint_by_sequence_number(checkpoint)?;
3751        Ok(checkpoint)
3752    }
3753
3754    #[instrument(level = "trace", skip_all)]
3755    pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3756        Ok(
3757            match self
3758                .get_object_cache_reader()
3759                .try_get_latest_object_or_tombstone(*object_id)?
3760            {
3761                Some((_, ObjectOrTombstone::Object(object))) => {
3762                    let layout = self.get_object_layout(&object)?;
3763                    ObjectRead::Exists(object.compute_object_reference(), object, layout)
3764                }
3765                Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3766                None => ObjectRead::NotExists(*object_id),
3767            },
3768        )
3769    }
3770
3771    /// Chain Identifier is the digest of the genesis checkpoint.
3772    pub fn get_chain_identifier(&self) -> ChainIdentifier {
3773        self.chain_identifier
3774    }
3775
3776    #[instrument(level = "trace", skip_all)]
3777    pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3778    where
3779        T: DeserializeOwned,
3780    {
3781        let o = self.get_object_read(object_id)?.into_object()?;
3782        if let Some(move_object) = o.data.try_as_move() {
3783            Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3784                IotaError::ObjectDeserialization {
3785                    error: format!("{e}"),
3786                }
3787            })?)
3788        } else {
3789            Err(IotaError::ObjectDeserialization {
3790                error: format!("Provided object : [{object_id}] is not a Move object."),
3791            })
3792        }
3793    }
3794
3795    /// This function aims to serve rpc reads on past objects and
3796    /// we don't expect it to be called for other purposes.
3797    /// Depending on the object pruning policies that will be enforced in the
3798    /// future there is no software-level guarantee/SLA to retrieve an object
3799    /// with an old version even if it exists/existed.
3800    #[instrument(level = "trace", skip_all)]
3801    pub fn get_past_object_read(
3802        &self,
3803        object_id: &ObjectID,
3804        version: SequenceNumber,
3805    ) -> IotaResult<PastObjectRead> {
3806        // Firstly we see if the object ever existed by getting its latest data
3807        let Some(obj_ref) = self
3808            .get_object_cache_reader()
3809            .try_get_latest_object_ref_or_tombstone(*object_id)?
3810        else {
3811            return Ok(PastObjectRead::ObjectNotExists(*object_id));
3812        };
3813
3814        if version > obj_ref.1 {
3815            return Ok(PastObjectRead::VersionTooHigh {
3816                object_id: *object_id,
3817                asked_version: version,
3818                latest_version: obj_ref.1,
3819            });
3820        }
3821
3822        if version < obj_ref.1 {
3823            // Read past objects
3824            return Ok(match self.read_object_at_version(object_id, version)? {
3825                Some((object, layout)) => {
3826                    let obj_ref = object.compute_object_reference();
3827                    PastObjectRead::VersionFound(obj_ref, object, layout)
3828                }
3829
3830                None => PastObjectRead::VersionNotFound(*object_id, version),
3831            });
3832        }
3833
3834        if !obj_ref.2.is_alive() {
3835            return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3836        }
3837
3838        match self.read_object_at_version(object_id, obj_ref.1)? {
3839            Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3840            None => {
3841                error!(
3842                    "Object with in parent_entry is missing from object store, datastore is \
3843                     inconsistent",
3844                );
3845                Err(UserInputError::ObjectNotFound {
3846                    object_id: *object_id,
3847                    version: Some(obj_ref.1),
3848                }
3849                .into())
3850            }
3851        }
3852    }
3853
3854    #[instrument(level = "trace", skip_all)]
3855    fn read_object_at_version(
3856        &self,
3857        object_id: &ObjectID,
3858        version: SequenceNumber,
3859    ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3860        let Some(object) = self
3861            .get_object_cache_reader()
3862            .try_get_object_by_key(object_id, version)?
3863        else {
3864            return Ok(None);
3865        };
3866
3867        let layout = self.get_object_layout(&object)?;
3868        Ok(Some((object, layout)))
3869    }
3870
3871    fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3872        let layout = object
3873            .data
3874            .try_as_move()
3875            .map(|object| {
3876                into_struct_layout(
3877                    self.load_epoch_store_one_call_per_task()
3878                        .executor()
3879                        // TODO(cache) - must read through cache
3880                        .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3881                        .get_annotated_layout(&object.type_().clone().into())?,
3882                )
3883            })
3884            .transpose()?;
3885        Ok(layout)
3886    }
3887
3888    fn get_owner_at_version(
3889        &self,
3890        object_id: &ObjectID,
3891        version: SequenceNumber,
3892    ) -> IotaResult<Owner> {
3893        self.get_object_store()
3894            .try_get_object_by_key(object_id, version)?
3895            .ok_or_else(|| {
3896                IotaError::from(UserInputError::ObjectNotFound {
3897                    object_id: *object_id,
3898                    version: Some(version),
3899                })
3900            })
3901            .map(|o| o.owner)
3902    }
3903
3904    #[instrument(level = "trace", skip_all)]
3905    pub fn get_owner_objects(
3906        &self,
3907        owner: IotaAddress,
3908        // If `Some`, the query will start from the next item after the specified cursor
3909        cursor: Option<ObjectID>,
3910        limit: usize,
3911        filter: Option<IotaObjectDataFilter>,
3912    ) -> IotaResult<Vec<ObjectInfo>> {
3913        if let Some(indexes) = &self.indexes {
3914            indexes.get_owner_objects(owner, cursor, limit, filter)
3915        } else {
3916            Err(IotaError::IndexStoreNotAvailable)
3917        }
3918    }
3919
3920    #[instrument(level = "trace", skip_all)]
3921    pub fn get_owned_coins_iterator_with_cursor(
3922        &self,
3923        owner: IotaAddress,
3924        // If `Some`, the query will start from the next item after the specified cursor
3925        cursor: (String, ObjectID),
3926        limit: usize,
3927        one_coin_type_only: bool,
3928    ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3929        if let Some(indexes) = &self.indexes {
3930            indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3931        } else {
3932            Err(IotaError::IndexStoreNotAvailable)
3933        }
3934    }
3935
3936    #[instrument(level = "trace", skip_all)]
3937    pub fn get_owner_objects_iterator(
3938        &self,
3939        owner: IotaAddress,
3940        // If `Some`, the query will start from the next item after the specified cursor
3941        cursor: Option<ObjectID>,
3942        filter: Option<IotaObjectDataFilter>,
3943    ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3944        let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3945        if let Some(indexes) = &self.indexes {
3946            indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3947        } else {
3948            Err(IotaError::IndexStoreNotAvailable)
3949        }
3950    }
3951
3952    #[instrument(level = "trace", skip_all)]
3953    pub async fn get_move_objects<T>(
3954        &self,
3955        owner: IotaAddress,
3956        type_: MoveObjectType,
3957    ) -> IotaResult<Vec<T>>
3958    where
3959        T: DeserializeOwned,
3960    {
3961        let object_ids = self
3962            .get_owner_objects_iterator(owner, None, None)?
3963            .filter(|o| match &o.type_ {
3964                ObjectType::Struct(s) => &type_ == s,
3965                ObjectType::Package => false,
3966            })
3967            .map(|info| ObjectKey(info.object_id, info.version))
3968            .collect::<Vec<_>>();
3969        let mut move_objects = vec![];
3970
3971        let objects = self
3972            .get_object_store()
3973            .try_multi_get_objects_by_key(&object_ids)?;
3974
3975        for (o, id) in objects.into_iter().zip(object_ids) {
3976            let object = o.ok_or_else(|| {
3977                IotaError::from(UserInputError::ObjectNotFound {
3978                    object_id: id.0,
3979                    version: Some(id.1),
3980                })
3981            })?;
3982            let move_object = object.data.try_as_move().ok_or_else(|| {
3983                IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3984            })?;
3985            move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3986                IotaError::ObjectDeserialization {
3987                    error: format!("{e}"),
3988                }
3989            })?);
3990        }
3991        Ok(move_objects)
3992    }
3993
3994    #[instrument(level = "trace", skip_all)]
3995    pub fn get_dynamic_fields(
3996        &self,
3997        owner: ObjectID,
3998        // If `Some`, the query will start from the next item after the specified cursor
3999        cursor: Option<ObjectID>,
4000        limit: usize,
4001    ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4002        Ok(self
4003            .get_dynamic_fields_iterator(owner, cursor)?
4004            .take(limit)
4005            .collect::<Result<Vec<_>, _>>()?)
4006    }
4007
4008    fn get_dynamic_fields_iterator(
4009        &self,
4010        owner: ObjectID,
4011        // If `Some`, the query will start from the next item after the specified cursor
4012        cursor: Option<ObjectID>,
4013    ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4014    {
4015        if let Some(indexes) = &self.indexes {
4016            indexes.get_dynamic_fields_iterator(owner, cursor)
4017        } else {
4018            Err(IotaError::IndexStoreNotAvailable)
4019        }
4020    }
4021
4022    #[instrument(level = "trace", skip_all)]
4023    pub fn get_dynamic_field_object_id(
4024        &self,
4025        owner: ObjectID,
4026        name_type: TypeTag,
4027        name_bcs_bytes: &[u8],
4028    ) -> IotaResult<Option<ObjectID>> {
4029        if let Some(indexes) = &self.indexes {
4030            indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4031        } else {
4032            Err(IotaError::IndexStoreNotAvailable)
4033        }
4034    }
4035
4036    #[instrument(level = "trace", skip_all)]
4037    pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4038        Ok(self.get_indexes()?.next_sequence_number())
4039    }
4040
4041    #[instrument(level = "trace", skip_all)]
4042    pub async fn get_executed_transaction_and_effects(
4043        &self,
4044        digest: TransactionDigest,
4045        kv_store: Arc<TransactionKeyValueStore>,
4046    ) -> IotaResult<(Transaction, TransactionEffects)> {
4047        let transaction = kv_store.get_tx(digest).await?;
4048        let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4049        Ok((transaction, effects))
4050    }
4051
4052    #[instrument(level = "trace", skip_all)]
4053    pub fn multi_get_checkpoint_by_sequence_number(
4054        &self,
4055        sequence_numbers: &[CheckpointSequenceNumber],
4056    ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4057        Ok(self
4058            .checkpoint_store
4059            .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4060    }
4061
4062    #[instrument(level = "trace", skip_all)]
4063    pub fn get_transaction_events(
4064        &self,
4065        digest: &TransactionDigest,
4066    ) -> IotaResult<TransactionEvents> {
4067        self.get_transaction_cache_reader()
4068            .try_get_events(digest)?
4069            .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4070    }
4071
4072    pub fn get_transaction_input_objects(
4073        &self,
4074        effects: &TransactionEffects,
4075    ) -> anyhow::Result<Vec<Object>> {
4076        iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4077            .map_err(Into::into)
4078    }
4079
4080    pub fn get_transaction_output_objects(
4081        &self,
4082        effects: &TransactionEffects,
4083    ) -> anyhow::Result<Vec<Object>> {
4084        iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4085            .map_err(Into::into)
4086    }
4087
4088    fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4089        match &self.indexes {
4090            Some(i) => Ok(i.clone()),
4091            None => Err(IotaError::UnsupportedFeature {
4092                error: "extended object indexing is not enabled on this server".into(),
4093            }),
4094        }
4095    }
4096
4097    pub async fn get_transactions_for_tests(
4098        self: &Arc<Self>,
4099        filter: Option<TransactionFilter>,
4100        cursor: Option<TransactionDigest>,
4101        limit: Option<usize>,
4102        reverse: bool,
4103    ) -> IotaResult<Vec<TransactionDigest>> {
4104        let metrics = KeyValueStoreMetrics::new_for_tests();
4105        let kv_store = Arc::new(TransactionKeyValueStore::new(
4106            "rocksdb",
4107            metrics,
4108            self.clone(),
4109        ));
4110        self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4111            .await
4112    }
4113
4114    #[instrument(level = "trace", skip_all)]
4115    pub async fn get_transactions(
4116        &self,
4117        kv_store: &Arc<TransactionKeyValueStore>,
4118        filter: Option<TransactionFilter>,
4119        // If `Some`, the query will start from the next item after the specified cursor
4120        cursor: Option<TransactionDigest>,
4121        limit: Option<usize>,
4122        reverse: bool,
4123    ) -> IotaResult<Vec<TransactionDigest>> {
4124        if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4125            let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4126            let iter = checkpoint_contents.iter().map(|c| c.transaction);
4127            if reverse {
4128                let iter = iter
4129                    .rev()
4130                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4131                    .skip(usize::from(cursor.is_some()));
4132                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4133            } else {
4134                let iter = iter
4135                    .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4136                    .skip(usize::from(cursor.is_some()));
4137                return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4138            }
4139        }
4140        self.get_indexes()?
4141            .get_transactions(filter, cursor, limit, reverse)
4142    }
4143
4144    pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4145        &self.checkpoint_store
4146    }
4147
4148    pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4149        self.get_checkpoint_store()
4150            .get_highest_executed_checkpoint_seq_number()?
4151            .ok_or(IotaError::UserInput {
4152                error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4153            })
4154    }
4155
4156    #[cfg(msim)]
4157    pub fn get_highest_pruned_checkpoint_for_testing(
4158        &self,
4159    ) -> IotaResult<CheckpointSequenceNumber> {
4160        self.database_for_testing()
4161            .perpetual_tables
4162            .get_highest_pruned_checkpoint()
4163            .map(|c| c.unwrap_or(0))
4164            .map_err(Into::into)
4165    }
4166
4167    #[instrument(level = "trace", skip_all)]
4168    pub fn get_checkpoint_summary_by_sequence_number(
4169        &self,
4170        sequence_number: CheckpointSequenceNumber,
4171    ) -> IotaResult<CheckpointSummary> {
4172        let verified_checkpoint = self
4173            .get_checkpoint_store()
4174            .get_checkpoint_by_sequence_number(sequence_number)?;
4175        match verified_checkpoint {
4176            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4177            None => Err(IotaError::UserInput {
4178                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4179            }),
4180        }
4181    }
4182
4183    #[instrument(level = "trace", skip_all)]
4184    pub fn get_checkpoint_summary_by_digest(
4185        &self,
4186        digest: CheckpointDigest,
4187    ) -> IotaResult<CheckpointSummary> {
4188        let verified_checkpoint = self
4189            .get_checkpoint_store()
4190            .get_checkpoint_by_digest(&digest)?;
4191        match verified_checkpoint {
4192            Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4193            None => Err(IotaError::UserInput {
4194                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4195            }),
4196        }
4197    }
4198
4199    #[instrument(level = "trace", skip_all)]
4200    pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4201        if is_system_package(package_id) {
4202            return self.find_genesis_txn_digest();
4203        }
4204        Ok(self
4205            .get_object_read(&package_id)?
4206            .into_object()?
4207            .previous_transaction)
4208    }
4209
4210    #[instrument(level = "trace", skip_all)]
4211    pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4212        let summary = self
4213            .get_verified_checkpoint_by_sequence_number(0)?
4214            .into_message();
4215        let content = self.get_checkpoint_contents(summary.content_digest)?;
4216        let genesis_transaction = content.enumerate_transactions(&summary).next();
4217        Ok(genesis_transaction
4218            .ok_or(IotaError::UserInput {
4219                error: UserInputError::GenesisTransactionNotFound,
4220            })?
4221            .1
4222            .transaction)
4223    }
4224
4225    #[instrument(level = "trace", skip_all)]
4226    pub fn get_verified_checkpoint_by_sequence_number(
4227        &self,
4228        sequence_number: CheckpointSequenceNumber,
4229    ) -> IotaResult<VerifiedCheckpoint> {
4230        let verified_checkpoint = self
4231            .get_checkpoint_store()
4232            .get_checkpoint_by_sequence_number(sequence_number)?;
4233        match verified_checkpoint {
4234            Some(verified_checkpoint) => Ok(verified_checkpoint),
4235            None => Err(IotaError::UserInput {
4236                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4237            }),
4238        }
4239    }
4240
4241    #[instrument(level = "trace", skip_all)]
4242    pub fn get_verified_checkpoint_summary_by_digest(
4243        &self,
4244        digest: CheckpointDigest,
4245    ) -> IotaResult<VerifiedCheckpoint> {
4246        let verified_checkpoint = self
4247            .get_checkpoint_store()
4248            .get_checkpoint_by_digest(&digest)?;
4249        match verified_checkpoint {
4250            Some(verified_checkpoint) => Ok(verified_checkpoint),
4251            None => Err(IotaError::UserInput {
4252                error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4253            }),
4254        }
4255    }
4256
4257    #[instrument(level = "trace", skip_all)]
4258    pub fn get_checkpoint_contents(
4259        &self,
4260        digest: CheckpointContentsDigest,
4261    ) -> IotaResult<CheckpointContents> {
4262        self.get_checkpoint_store()
4263            .get_checkpoint_contents(&digest)?
4264            .ok_or(IotaError::UserInput {
4265                error: UserInputError::CheckpointContentsNotFound(digest),
4266            })
4267    }
4268
4269    #[instrument(level = "trace", skip_all)]
4270    pub fn get_checkpoint_contents_by_sequence_number(
4271        &self,
4272        sequence_number: CheckpointSequenceNumber,
4273    ) -> IotaResult<CheckpointContents> {
4274        let verified_checkpoint = self
4275            .get_checkpoint_store()
4276            .get_checkpoint_by_sequence_number(sequence_number)?;
4277        match verified_checkpoint {
4278            Some(verified_checkpoint) => {
4279                let content_digest = verified_checkpoint.into_inner().content_digest;
4280                self.get_checkpoint_contents(content_digest)
4281            }
4282            None => Err(IotaError::UserInput {
4283                error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4284            }),
4285        }
4286    }
4287
4288    #[instrument(level = "trace", skip_all)]
4289    pub async fn query_events(
4290        &self,
4291        kv_store: &Arc<TransactionKeyValueStore>,
4292        query: EventFilter,
4293        // If `Some`, the query will start from the next item after the specified cursor
4294        cursor: Option<EventID>,
4295        limit: usize,
4296        descending: bool,
4297    ) -> IotaResult<Vec<IotaEvent>> {
4298        let index_store = self.get_indexes()?;
4299
4300        // Get the tx_num from tx_digest
4301        let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4302            let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4303                IotaError::TransactionNotFound {
4304                    digest: cursor.tx_digest,
4305                },
4306            )?;
4307            (tx_seq, cursor.event_seq as usize)
4308        } else if descending {
4309            (u64::MAX, usize::MAX)
4310        } else {
4311            (0, 0)
4312        };
4313
4314        let limit = limit + 1;
4315        let mut event_keys = match query {
4316            EventFilter::All(filters) => {
4317                if filters.is_empty() {
4318                    index_store.all_events(tx_num, event_num, limit, descending)?
4319                } else {
4320                    return Err(IotaError::UserInput {
4321                        error: UserInputError::Unsupported(
4322                            "This query type does not currently support filter combinations"
4323                                .to_string(),
4324                        ),
4325                    });
4326                }
4327            }
4328            EventFilter::Transaction(digest) => {
4329                index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4330            }
4331            EventFilter::MoveModule { package, module } => {
4332                let module_id = ModuleId::new(package.into(), module);
4333                index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4334            }
4335            EventFilter::MoveEventType(struct_name) => index_store
4336                .events_by_move_event_struct_name(
4337                    &struct_name,
4338                    tx_num,
4339                    event_num,
4340                    limit,
4341                    descending,
4342                )?,
4343            EventFilter::Sender(sender) => {
4344                index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4345            }
4346            EventFilter::TimeRange {
4347                start_time,
4348                end_time,
4349            } => index_store
4350                .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4351            EventFilter::MoveEventModule { package, module } => index_store
4352                .events_by_move_event_module(
4353                    &ModuleId::new(package.into(), module),
4354                    tx_num,
4355                    event_num,
4356                    limit,
4357                    descending,
4358                )?,
4359            // not using "_ =>" because we want to make sure we remember to add new variants here
4360            EventFilter::Package(_)
4361            | EventFilter::MoveEventField { .. }
4362            | EventFilter::Any(_)
4363            | EventFilter::And(_, _)
4364            | EventFilter::Or(_, _) => {
4365                return Err(IotaError::UserInput {
4366                    error: UserInputError::Unsupported(
4367                        "This query type is not supported by the full node.".to_string(),
4368                    ),
4369                });
4370            }
4371        };
4372
4373        // skip one event if exclusive cursor is provided,
4374        // otherwise truncate to the original limit.
4375        if cursor.is_some() {
4376            if !event_keys.is_empty() {
4377                event_keys.remove(0);
4378            }
4379        } else {
4380            event_keys.truncate(limit - 1);
4381        }
4382
4383        // get the unique set of digests from the event_keys
4384        let transaction_digests = event_keys
4385            .iter()
4386            .map(|(_, digest, _, _)| *digest)
4387            .collect::<HashSet<_>>()
4388            .into_iter()
4389            .collect::<Vec<_>>();
4390
4391        let events = kv_store
4392            .multi_get_events_by_tx_digests(&transaction_digests)
4393            .await?;
4394
4395        let events_map: HashMap<_, _> =
4396            transaction_digests.iter().zip(events.into_iter()).collect();
4397
4398        let stored_events = event_keys
4399            .into_iter()
4400            .map(|k| {
4401                (
4402                    k,
4403                    events_map
4404                        .get(&k.1)
4405                        .expect("fetched digest is missing")
4406                        .clone()
4407                        .and_then(|e| e.data.get(k.2).cloned()),
4408                )
4409            })
4410            .map(
4411                |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4412                    event
4413                        .map(|e| (e, tx_digest, event_seq, timestamp))
4414                        .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4415                },
4416            )
4417            .collect::<Result<Vec<_>, _>>()?;
4418
4419        let epoch_store = self.load_epoch_store_one_call_per_task();
4420        let backing_store = self.get_backing_package_store().as_ref();
4421        let mut layout_resolver = epoch_store
4422            .executor()
4423            .type_layout_resolver(Box::new(backing_store));
4424        let mut events = vec![];
4425        for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4426            events.push(IotaEvent::try_from(
4427                e.clone(),
4428                tx_digest,
4429                event_seq as u64,
4430                Some(timestamp),
4431                layout_resolver.get_annotated_layout(&e.type_)?,
4432            )?)
4433        }
4434        Ok(events)
4435    }
4436
4437    pub async fn insert_genesis_object(&self, object: Object) {
4438        self.get_reconfig_api()
4439            .try_insert_genesis_object(object)
4440            .expect("Cannot insert genesis object")
4441    }
4442
4443    pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4444        futures::future::join_all(
4445            objects
4446                .iter()
4447                .map(|o| self.insert_genesis_object(o.clone())),
4448        )
4449        .await;
4450    }
4451
4452    /// Make a status response for a transaction
4453    #[instrument(level = "trace", skip_all)]
4454    pub fn get_transaction_status(
4455        &self,
4456        transaction_digest: &TransactionDigest,
4457        epoch_store: &Arc<AuthorityPerEpochStore>,
4458    ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4459        // TODO: In the case of read path, we should not have to re-sign the effects.
4460        if let Some(effects) =
4461            self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4462        {
4463            if let Some(transaction) = self
4464                .get_transaction_cache_reader()
4465                .try_get_transaction_block(transaction_digest)?
4466            {
4467                let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4468                let events = if effects.events_digest().is_some() {
4469                    self.get_transaction_events(effects.transaction_digest())?
4470                } else {
4471                    TransactionEvents::default()
4472                };
4473                return Ok(Some((
4474                    (*transaction).clone().into_message(),
4475                    TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4476                )));
4477            } else {
4478                // The read of effects and read of transaction are not atomic. It's possible
4479                // that we reverted the transaction (during epoch change) in
4480                // between the above two reads, and we end up having effects but
4481                // not transaction. In this case, we just fall through.
4482                debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4483            }
4484        }
4485        if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4486            self.metrics.tx_already_processed.inc();
4487            let (transaction, sig) = signed.into_inner().into_data_and_sig();
4488            Ok(Some((transaction, TransactionStatus::Signed(sig))))
4489        } else {
4490            Ok(None)
4491        }
4492    }
4493
4494    /// Get the signed effects of the given transaction. If the effects was
4495    /// signed in a previous epoch, re-sign it so that the caller is able to
4496    /// form a cert of the effects in the current epoch.
4497    #[instrument(level = "trace", skip_all)]
4498    pub fn get_signed_effects_and_maybe_resign(
4499        &self,
4500        transaction_digest: &TransactionDigest,
4501        epoch_store: &Arc<AuthorityPerEpochStore>,
4502    ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4503        let effects = self
4504            .get_transaction_cache_reader()
4505            .try_get_executed_effects(transaction_digest)?;
4506        match effects {
4507            Some(effects) => {
4508                // If the transaction was executed in previous epochs, the validator will
4509                // re-sign the effects with new current epoch so that a client is always able to
4510                // obtain an effects certificate at the current epoch.
4511                //
4512                // Why is this necessary? Consider the following case:
4513                // - assume there are 4 validators
4514                // - Quorum driver gets 2 signed effects before reconfig halt
4515                // - The tx makes it into final checkpoint.
4516                // - 2 validators go away and are replaced in the new epoch.
4517                // - The new epoch begins.
4518                // - The quorum driver cannot complete the partial effects cert from the
4519                //   previous epoch, because it may not be able to reach either of the 2 former
4520                //   validators.
4521                // - But, if the 2 validators that stayed are willing to re-sign the effects in
4522                //   the new epoch, the QD can make a new effects cert and return it to the
4523                //   client.
4524                //
4525                // This is a considered a short-term workaround. Eventually, Quorum Driver
4526                // should be able to return either an effects certificate, -or-
4527                // a proof of inclusion in a checkpoint. In the case above, the
4528                // Quorum Driver would return a proof of inclusion in the final
4529                // checkpoint, and this code would no longer be necessary.
4530                if effects.executed_epoch() != epoch_store.epoch() {
4531                    debug!(
4532                        tx_digest=?transaction_digest,
4533                        effects_epoch=?effects.executed_epoch(),
4534                        epoch=?epoch_store.epoch(),
4535                        "Re-signing the effects with the current epoch"
4536                    );
4537                }
4538                Ok(Some(self.sign_effects(effects, epoch_store)?))
4539            }
4540            None => Ok(None),
4541        }
4542    }
4543
4544    #[instrument(level = "trace", skip_all)]
4545    pub(crate) fn sign_effects(
4546        &self,
4547        effects: TransactionEffects,
4548        epoch_store: &Arc<AuthorityPerEpochStore>,
4549    ) -> IotaResult<VerifiedSignedTransactionEffects> {
4550        let tx_digest = *effects.transaction_digest();
4551        let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4552            Some(sig) => {
4553                debug_assert!(sig.epoch == epoch_store.epoch());
4554                SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4555            }
4556            _ => {
4557                let sig = AuthoritySignInfo::new(
4558                    epoch_store.epoch(),
4559                    &effects,
4560                    Intent::iota_app(IntentScope::TransactionEffects),
4561                    self.name,
4562                    &*self.secret,
4563                );
4564
4565                let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4566
4567                epoch_store.insert_effects_digest_and_signature(
4568                    &tx_digest,
4569                    effects.digest(),
4570                    &sig,
4571                )?;
4572
4573                effects
4574            }
4575        };
4576
4577        Ok(VerifiedSignedTransactionEffects::new_unchecked(
4578            signed_effects,
4579        ))
4580    }
4581
4582    // Returns coin objects for indexing for fullnode if indexing is enabled.
4583    #[instrument(level = "trace", skip_all)]
4584    fn fullnode_only_get_tx_coins_for_indexing(
4585        &self,
4586        effects: &TransactionEffects,
4587        inner_temporary_store: &InnerTemporaryStore,
4588        epoch_store: &Arc<AuthorityPerEpochStore>,
4589    ) -> Option<TxCoins> {
4590        if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4591            return None;
4592        }
4593        let written_coin_objects = inner_temporary_store
4594            .written
4595            .iter()
4596            .filter_map(|(k, v)| {
4597                if v.is_coin() {
4598                    Some((*k, v.clone()))
4599                } else {
4600                    None
4601                }
4602            })
4603            .collect();
4604        let mut input_coin_objects = inner_temporary_store
4605            .input_objects
4606            .iter()
4607            .filter_map(|(k, v)| {
4608                if v.is_coin() {
4609                    Some((*k, v.clone()))
4610                } else {
4611                    None
4612                }
4613            })
4614            .collect::<ObjectMap>();
4615
4616        // Check for receiving objects that were actually used and modified during
4617        // execution. Their updated version will already showup in
4618        // "written_coins" but their input isn't included in the set of input
4619        // objects in a inner_temporary_store.
4620        for (object_id, version) in effects.modified_at_versions() {
4621            if inner_temporary_store
4622                .loaded_runtime_objects
4623                .contains_key(&object_id)
4624            {
4625                if let Some(object) = self
4626                    .get_object_store()
4627                    .get_object_by_key(&object_id, version)
4628                {
4629                    if object.is_coin() {
4630                        input_coin_objects.insert(object_id, object);
4631                    }
4632                }
4633            }
4634        }
4635
4636        Some((input_coin_objects, written_coin_objects))
4637    }
4638
4639    /// Get the TransactionEnvelope that currently locks the given object, if
4640    /// any. Since object locks are only valid for one epoch, we also need
4641    /// the epoch_id in the query. Returns UserInputError::ObjectNotFound if
4642    /// no lock records for the given object can be found.
4643    /// Returns UserInputError::ObjectVersionUnavailableForConsumption if the
4644    /// object record is at a different version.
4645    /// Returns Some(VerifiedEnvelope) if the given ObjectRef is locked by a
4646    /// certain transaction. Returns None if the a lock record is
4647    /// initialized for the given ObjectRef but not yet locked by any
4648    /// transaction,     or cannot find the transaction in transaction
4649    /// table, because of data race etc.
4650    #[instrument(level = "trace", skip_all)]
4651    pub async fn get_transaction_lock(
4652        &self,
4653        object_ref: &ObjectRef,
4654        epoch_store: &AuthorityPerEpochStore,
4655    ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4656        let lock_info = self
4657            .get_object_cache_reader()
4658            .try_get_lock(*object_ref, epoch_store)?;
4659        let lock_info = match lock_info {
4660            ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4661                return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4662                    provided_obj_ref: *object_ref,
4663                    current_version: locked_ref.1,
4664                }
4665                .into());
4666            }
4667            ObjectLockStatus::Initialized => {
4668                return Ok(None);
4669            }
4670            ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4671        };
4672
4673        epoch_store.get_signed_transaction(&lock_info)
4674    }
4675
4676    pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4677        self.get_object_cache_reader().try_get_objects(objects)
4678    }
4679
4680    /// Non-fallible version of `try_get_objects`.
4681    pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4682        self.try_get_objects(objects)
4683            .await
4684            .expect("storage access failed")
4685    }
4686
4687    pub async fn try_get_object_or_tombstone(
4688        &self,
4689        object_id: ObjectID,
4690    ) -> IotaResult<Option<ObjectRef>> {
4691        self.get_object_cache_reader()
4692            .try_get_latest_object_ref_or_tombstone(object_id)
4693    }
4694
4695    /// Non-fallible version of `try_get_object_or_tombstone`.
4696    pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4697        self.try_get_object_or_tombstone(object_id)
4698            .await
4699            .expect("storage access failed")
4700    }
4701
4702    /// Ordinarily, protocol upgrades occur when 2f + 1 + (f *
4703    /// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps) vote for the
4704    /// upgrade.
4705    ///
4706    /// This method can be used to dynamic adjust the amount of buffer. If set
4707    /// to 0, the upgrade will go through with only 2f+1 votes.
4708    ///
4709    /// IMPORTANT: If this is used, it must be used on >=2f+1 validators (all
4710    /// should have the same value), or you risk halting the chain.
4711    pub fn set_override_protocol_upgrade_buffer_stake(
4712        &self,
4713        expected_epoch: EpochId,
4714        buffer_stake_bps: u64,
4715    ) -> IotaResult {
4716        let epoch_store = self.load_epoch_store_one_call_per_task();
4717        let actual_epoch = epoch_store.epoch();
4718        if actual_epoch != expected_epoch {
4719            return Err(IotaError::WrongEpoch {
4720                expected_epoch,
4721                actual_epoch,
4722            });
4723        }
4724
4725        epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4726    }
4727
4728    pub fn clear_override_protocol_upgrade_buffer_stake(
4729        &self,
4730        expected_epoch: EpochId,
4731    ) -> IotaResult {
4732        let epoch_store = self.load_epoch_store_one_call_per_task();
4733        let actual_epoch = epoch_store.epoch();
4734        if actual_epoch != expected_epoch {
4735            return Err(IotaError::WrongEpoch {
4736                expected_epoch,
4737                actual_epoch,
4738            });
4739        }
4740
4741        epoch_store.clear_override_protocol_upgrade_buffer_stake()
4742    }
4743
4744    /// Get the set of system packages that are compiled in to this build, if
4745    /// those packages are compatible with the current versions of those
4746    /// packages on-chain.
4747    pub async fn get_available_system_packages(
4748        &self,
4749        binary_config: &BinaryConfig,
4750    ) -> Vec<ObjectRef> {
4751        let mut results = vec![];
4752
4753        let system_packages = BuiltInFramework::iter_system_packages();
4754
4755        // Add extra framework packages during simtest
4756        #[cfg(msim)]
4757        let extra_packages = framework_injection::get_extra_packages(self.name);
4758        #[cfg(msim)]
4759        let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4760
4761        for system_package in system_packages {
4762            let modules = system_package.modules().to_vec();
4763            // In simtests, we could override the current built-in framework packages.
4764            #[cfg(msim)]
4765            let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4766                .unwrap_or(modules);
4767
4768            let Some(obj_ref) = iota_framework::compare_system_package(
4769                &self.get_object_store(),
4770                &system_package.id,
4771                &modules,
4772                system_package.dependencies.to_vec(),
4773                binary_config,
4774            )
4775            .await
4776            else {
4777                return vec![];
4778            };
4779            results.push(obj_ref);
4780        }
4781
4782        results
4783    }
4784
4785    /// Return the new versions, module bytes, and dependencies for the packages
4786    /// that have been committed to for a framework upgrade, in
4787    /// `system_packages`.  Loads the module contents from the binary, and
4788    /// performs the following checks:
4789    ///
4790    /// - Whether its contents matches what is on-chain already, in which case
4791    ///   no upgrade is required, and its contents are omitted from the output.
4792    /// - Whether the contents in the binary can form a package whose digest
4793    ///   matches the input, meaning the framework will be upgraded, and this
4794    ///   authority can satisfy that upgrade, in which case the contents are
4795    ///   included in the output.
4796    ///
4797    /// If the current version of the framework can't be loaded, the binary does
4798    /// not contain the bytes for that framework ID, or the resulting
4799    /// package fails the digest check, `None` is returned indicating that
4800    /// this authority cannot run the upgrade that the network voted on.
4801    async fn get_system_package_bytes(
4802        &self,
4803        system_packages: Vec<ObjectRef>,
4804        binary_config: &BinaryConfig,
4805    ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4806        let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4807        let objects = self.get_objects(&ids).await;
4808
4809        let mut res = Vec::with_capacity(system_packages.len());
4810        for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4811            let prev_transaction = match object {
4812                Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4813                    // Skip this one because it doesn't need to be upgraded.
4814                    info!("Framework {} does not need updating", system_package_ref.0);
4815                    continue;
4816                }
4817
4818                Some(cur_object) => cur_object.previous_transaction,
4819                None => TransactionDigest::genesis_marker(),
4820            };
4821
4822            #[cfg(msim)]
4823            let SystemPackage {
4824                id: _,
4825                bytes,
4826                dependencies,
4827            } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4828                .unwrap_or_else(|| {
4829                    BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4830                });
4831
4832            #[cfg(not(msim))]
4833            let SystemPackage {
4834                id: _,
4835                bytes,
4836                dependencies,
4837            } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4838
4839            let modules: Vec<_> = bytes
4840                .iter()
4841                .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4842                .collect();
4843
4844            let new_object = Object::new_system_package(
4845                &modules,
4846                system_package_ref.1,
4847                dependencies.clone(),
4848                prev_transaction,
4849            );
4850
4851            let new_ref = new_object.compute_object_reference();
4852            if new_ref != system_package_ref {
4853                error!(
4854                    "Framework mismatch -- binary: {new_ref:?}\n  upgrade: {system_package_ref:?}"
4855                );
4856                return None;
4857            }
4858
4859            res.push((system_package_ref.1, bytes, dependencies));
4860        }
4861
4862        Some(res)
4863    }
4864
4865    /// Returns the new protocol version and system packages that the network
4866    /// has voted to upgrade to. If the proposed protocol version is not
4867    /// supported, None is returned.
4868    fn is_protocol_version_supported_v1(
4869        proposed_protocol_version: ProtocolVersion,
4870        committee: &Committee,
4871        capabilities: Vec<AuthorityCapabilitiesV1>,
4872        mut buffer_stake_bps: u64,
4873    ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4874        if buffer_stake_bps > 10000 {
4875            warn!("clamping buffer_stake_bps to 10000");
4876            buffer_stake_bps = 10000;
4877        }
4878
4879        // For each validator, gather the protocol version and system packages that it
4880        // would like to upgrade to in the next epoch.
4881        let mut desired_upgrades: Vec<_> = capabilities
4882            .into_iter()
4883            .filter_map(|mut cap| {
4884                // A validator that lists no packages is voting against any change at all.
4885                if cap.available_system_packages.is_empty() {
4886                    return None;
4887                }
4888
4889                cap.available_system_packages.sort();
4890
4891                info!(
4892                    "validator {:?} supports {:?} with system packages: {:?}",
4893                    cap.authority.concise(),
4894                    cap.supported_protocol_versions,
4895                    cap.available_system_packages,
4896                );
4897
4898                // A validator that only supports the current protocol version is also voting
4899                // against any change, because framework upgrades always require a protocol
4900                // version bump.
4901                cap.supported_protocol_versions
4902                    .get_version_digest(proposed_protocol_version)
4903                    .map(|digest| (digest, cap.available_system_packages, cap.authority))
4904            })
4905            .collect();
4906
4907        // There can only be one set of votes that have a majority, find one if it
4908        // exists.
4909        desired_upgrades.sort();
4910        desired_upgrades
4911            .into_iter()
4912            .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4913            .into_iter()
4914            .find_map(|((digest, packages), group)| {
4915                // should have been filtered out earlier.
4916                assert!(!packages.is_empty());
4917
4918                let mut stake_aggregator: StakeAggregator<(), true> =
4919                    StakeAggregator::new(Arc::new(committee.clone()));
4920
4921                for (_, _, authority) in group {
4922                    stake_aggregator.insert_generic(authority, ());
4923                }
4924
4925                let total_votes = stake_aggregator.total_votes();
4926                let quorum_threshold = committee.quorum_threshold();
4927                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4928
4929                info!(
4930                    protocol_config_digest = ?digest,
4931                    ?total_votes,
4932                    ?quorum_threshold,
4933                    ?buffer_stake_bps,
4934                    ?effective_threshold,
4935                    ?proposed_protocol_version,
4936                    ?packages,
4937                    "support for upgrade"
4938                );
4939
4940                let has_support = total_votes >= effective_threshold;
4941                has_support.then_some((proposed_protocol_version, digest, packages))
4942            })
4943    }
4944
4945    /// Selects the highest supported protocol version and system packages that
4946    /// the network has voted to upgrade to. If no upgrade is supported,
4947    /// returns the current protocol version and system packages.
4948    fn choose_protocol_version_and_system_packages_v1(
4949        current_protocol_version: ProtocolVersion,
4950        current_protocol_digest: Digest,
4951        committee: &Committee,
4952        capabilities: Vec<AuthorityCapabilitiesV1>,
4953        buffer_stake_bps: u64,
4954    ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4955        let mut next_protocol_version = current_protocol_version;
4956        let mut system_packages = vec![];
4957        let mut protocol_version_digest = current_protocol_digest;
4958
4959        // Finds the highest supported protocol version and system packages by
4960        // incrementing the proposed protocol version by one until no further
4961        // upgrades are supported.
4962        while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4963            next_protocol_version + 1,
4964            committee,
4965            capabilities.clone(),
4966            buffer_stake_bps,
4967        ) {
4968            next_protocol_version = version;
4969            protocol_version_digest = digest;
4970            system_packages = packages;
4971        }
4972
4973        (
4974            next_protocol_version,
4975            protocol_version_digest,
4976            system_packages,
4977        )
4978    }
4979
4980    /// Returns the indices of validators that support the given protocol
4981    /// version and digest. This includes both committee and non-committee
4982    /// validators based on their capabilities. Uses active validators
4983    /// instead of committee indices.
4984    fn get_validators_supporting_protocol_version(
4985        target_protocol_version: ProtocolVersion,
4986        target_digest: Digest,
4987        active_validators: &[AuthorityPublicKey],
4988        capabilities: &[AuthorityCapabilitiesV1],
4989    ) -> Vec<u64> {
4990        let mut eligible_validators = Vec::new();
4991
4992        for capability in capabilities {
4993            // Check if this validator supports the target protocol version and digest
4994            if let Some(digest) = capability
4995                .supported_protocol_versions
4996                .get_version_digest(target_protocol_version)
4997            {
4998                if digest == target_digest {
4999                    // Find the validator's index in the active validators list
5000                    if let Some(index) = active_validators
5001                        .iter()
5002                        .position(|name| AuthorityName::from(name) == capability.authority)
5003                    {
5004                        eligible_validators.push(index as u64);
5005                    }
5006                }
5007            }
5008        }
5009
5010        // Sort indices for deterministic behavior
5011        eligible_validators.sort();
5012        eligible_validators
5013    }
5014
5015    /// Calculates the sum of weights for eligible validators that are part of
5016    /// the committee. Takes the indices from
5017    /// get_validators_supporting_protocol_version and maps them back
5018    /// to committee members to get their weights.
5019    fn calculate_eligible_validators_weight(
5020        eligible_validator_indices: &[u64],
5021        active_validators: &[AuthorityPublicKey],
5022        committee: &Committee,
5023    ) -> u64 {
5024        let mut total_weight = 0u64;
5025
5026        for &index in eligible_validator_indices {
5027            let authority_pubkey = &active_validators[index as usize];
5028            // Check if this validator is in the committee and get their weight
5029            if let Some((_, weight)) = committee
5030                .members()
5031                .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5032            {
5033                total_weight += weight;
5034            }
5035        }
5036
5037        total_weight
5038    }
5039
5040    #[instrument(level = "debug", skip_all)]
5041    fn create_authenticator_state_tx(
5042        &self,
5043        epoch_store: &Arc<AuthorityPerEpochStore>,
5044    ) -> Option<EndOfEpochTransactionKind> {
5045        if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5046            info!("authenticator state transactions not enabled");
5047            return None;
5048        }
5049
5050        let authenticator_state_exists = epoch_store.authenticator_state_exists();
5051        let tx = if authenticator_state_exists {
5052            let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5053            let min_epoch =
5054                next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5055            let authenticator_obj_initial_shared_version = epoch_store
5056                .epoch_start_config()
5057                .authenticator_obj_initial_shared_version()
5058                .expect("initial version must exist");
5059
5060            let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5061                min_epoch,
5062                authenticator_obj_initial_shared_version,
5063            );
5064
5065            info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5066
5067            tx
5068        } else {
5069            let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5070            info!("Creating AuthenticatorStateCreate tx");
5071            tx
5072        };
5073        Some(tx)
5074    }
5075
5076    /// Creates and execute the advance epoch transaction to effects without
5077    /// committing it to the database. The effects of the change epoch tx
5078    /// are only written to the database after a certified checkpoint has been
5079    /// formed and executed by CheckpointExecutor.
5080    ///
5081    /// When a framework upgraded has been decided on, but the validator does
5082    /// not have the new versions of the packages locally, the validator
5083    /// cannot form the ChangeEpochTx. In this case it returns Err,
5084    /// indicating that the checkpoint builder should give up trying to make the
5085    /// final checkpoint. As long as the network is able to create a certified
5086    /// checkpoint (which should be ensured by the capabilities vote), it
5087    /// will arrive via state sync and be executed by CheckpointExecutor.
5088    #[instrument(level = "error", skip_all)]
5089    pub async fn create_and_execute_advance_epoch_tx(
5090        &self,
5091        epoch_store: &Arc<AuthorityPerEpochStore>,
5092        gas_cost_summary: &GasCostSummary,
5093        checkpoint: CheckpointSequenceNumber,
5094        epoch_start_timestamp_ms: CheckpointTimestamp,
5095        scores: Vec<u64>,
5096    ) -> anyhow::Result<(
5097        IotaSystemState,
5098        Option<SystemEpochInfoEvent>,
5099        TransactionEffects,
5100    )> {
5101        let mut txns = Vec::new();
5102
5103        if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
5104            txns.push(tx);
5105        }
5106
5107        let next_epoch = epoch_store.epoch() + 1;
5108
5109        let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5110        let authority_capabilities = epoch_store
5111            .get_capabilities_v1()
5112            .expect("read capabilities from db cannot fail");
5113        let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5114            Self::choose_protocol_version_and_system_packages_v1(
5115                epoch_store.protocol_version(),
5116                SupportedProtocolVersionsWithHashes::protocol_config_digest(
5117                    epoch_store.protocol_config(),
5118                ),
5119                epoch_store.committee(),
5120                authority_capabilities.clone(),
5121                buffer_stake_bps,
5122            );
5123
5124        // since system packages are created during the current epoch, they should abide
5125        // by the rules of the current epoch, including the current epoch's max
5126        // Move binary format version
5127        let config = epoch_store.protocol_config();
5128        let binary_config = to_binary_config(config);
5129        let Some(next_epoch_system_package_bytes) = self
5130            .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5131            .await
5132        else {
5133            error!(
5134                "upgraded system packages {:?} are not locally available, cannot create \
5135                ChangeEpochTx. validator binary must be upgraded to the correct version!",
5136                next_epoch_system_packages
5137            );
5138            // the checkpoint builder will keep retrying forever when it hits this error.
5139            // Eventually, one of two things will happen:
5140            // - The operator will upgrade this binary to one that has the new packages
5141            //   locally, and this function will succeed.
5142            // - The final checkpoint will be certified by other validators, we will receive
5143            //   it via state sync, and execute it. This will upgrade the framework
5144            //   packages, reconfigure, and most likely shut down in the new epoch (this
5145            //   validator likely doesn't support the new protocol version, or else it
5146            //   should have had the packages.)
5147            bail!("missing system packages: cannot form ChangeEpochTx");
5148        };
5149
5150        // Use ChangeEpochV3 or ChangeEpochV4 when the feature flags are enabled and
5151        // ChangeEpochV2 requirements are met
5152        if config.select_committee_from_eligible_validators() {
5153            // Get the list of eligible validators that support the target protocol version
5154            let active_validators = epoch_store.epoch_start_state().get_active_validators();
5155
5156            let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5157
5158            // Use validators supporting the target protocol version as eligible validators
5159            // in the next version if select_committee_supporting_next_epoch_version feature
5160            // flag is set to true.
5161            if config.select_committee_supporting_next_epoch_version() {
5162                eligible_active_validators = Self::get_validators_supporting_protocol_version(
5163                    next_epoch_protocol_version,
5164                    next_epoch_protocol_digest,
5165                    &active_validators,
5166                    &authority_capabilities,
5167                );
5168
5169                // Calculate the total weight of eligible validators in the committee
5170                let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5171                    &eligible_active_validators,
5172                    &active_validators,
5173                    epoch_store.committee(),
5174                );
5175
5176                // Safety check: ensure eligible validators have enough stake
5177                // Use the same effective threshold calculation that was used to decide the
5178                // protocol version
5179                let committee = epoch_store.committee();
5180                let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5181
5182                if eligible_validators_weight < effective_threshold {
5183                    error!(
5184                        "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5185                        This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5186                    );
5187                    // Pass all active validator indices as eligible validators
5188                    // to perform selection among all of them.
5189                    eligible_active_validators = (0..active_validators.len() as u64).collect();
5190                }
5191            }
5192
5193            // Use ChangeEpochV4 when the pass_validator_scores_to_advance_epoch feature
5194            // flag is enabled.
5195            if config.pass_validator_scores_to_advance_epoch() {
5196                txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5197                    next_epoch,
5198                    next_epoch_protocol_version,
5199                    gas_cost_summary.storage_cost,
5200                    gas_cost_summary.computation_cost,
5201                    gas_cost_summary.computation_cost_burned,
5202                    gas_cost_summary.storage_rebate,
5203                    gas_cost_summary.non_refundable_storage_fee,
5204                    epoch_start_timestamp_ms,
5205                    next_epoch_system_package_bytes,
5206                    eligible_active_validators,
5207                    scores,
5208                    config.adjust_rewards_by_score(),
5209                ));
5210            } else {
5211                txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5212                    next_epoch,
5213                    next_epoch_protocol_version,
5214                    gas_cost_summary.storage_cost,
5215                    gas_cost_summary.computation_cost,
5216                    gas_cost_summary.computation_cost_burned,
5217                    gas_cost_summary.storage_rebate,
5218                    gas_cost_summary.non_refundable_storage_fee,
5219                    epoch_start_timestamp_ms,
5220                    next_epoch_system_package_bytes,
5221                    eligible_active_validators,
5222                ));
5223            }
5224        } else if config.protocol_defined_base_fee()
5225            && config.max_committee_members_count_as_option().is_some()
5226        {
5227            txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5228                next_epoch,
5229                next_epoch_protocol_version,
5230                gas_cost_summary.storage_cost,
5231                gas_cost_summary.computation_cost,
5232                gas_cost_summary.computation_cost_burned,
5233                gas_cost_summary.storage_rebate,
5234                gas_cost_summary.non_refundable_storage_fee,
5235                epoch_start_timestamp_ms,
5236                next_epoch_system_package_bytes,
5237            ));
5238        } else {
5239            txns.push(EndOfEpochTransactionKind::new_change_epoch(
5240                next_epoch,
5241                next_epoch_protocol_version,
5242                gas_cost_summary.storage_cost,
5243                gas_cost_summary.computation_cost,
5244                gas_cost_summary.storage_rebate,
5245                gas_cost_summary.non_refundable_storage_fee,
5246                epoch_start_timestamp_ms,
5247                next_epoch_system_package_bytes,
5248            ));
5249        }
5250
5251        let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5252
5253        let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5254            tx.clone(),
5255            epoch_store.epoch(),
5256            checkpoint,
5257        );
5258
5259        let tx_digest = executable_tx.digest();
5260
5261        info!(
5262            ?next_epoch,
5263            ?next_epoch_protocol_version,
5264            ?next_epoch_system_packages,
5265            computation_cost=?gas_cost_summary.computation_cost,
5266            computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5267            storage_cost=?gas_cost_summary.storage_cost,
5268            storage_rebate=?gas_cost_summary.storage_rebate,
5269            non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5270            ?tx_digest,
5271            "Creating advance epoch transaction"
5272        );
5273
5274        fail_point_async!("change_epoch_tx_delay");
5275        let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5276
5277        // The tx could have been executed by state sync already - if so simply return
5278        // an error. The checkpoint builder will shortly be terminated by
5279        // reconfiguration anyway.
5280        if self
5281            .get_transaction_cache_reader()
5282            .try_is_tx_already_executed(tx_digest)?
5283        {
5284            warn!("change epoch tx has already been executed via state sync");
5285            bail!("change epoch tx has already been executed via state sync",);
5286        }
5287
5288        let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5289
5290        // We must manually assign the shared object versions to the transaction before
5291        // executing it. This is because we do not sequence end-of-epoch
5292        // transactions through consensus.
5293        epoch_store.assign_shared_object_versions_idempotent(
5294            self.get_object_cache_reader().as_ref(),
5295            std::slice::from_ref(&executable_tx),
5296        )?;
5297
5298        let (input_objects, _) =
5299            self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5300
5301        let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5302            &execution_guard,
5303            &executable_tx,
5304            input_objects,
5305            vec![],
5306            epoch_store,
5307        )?;
5308        let system_obj = get_iota_system_state(&temporary_store.written)
5309            .expect("change epoch tx must write to system object");
5310        // Find the SystemEpochInfoEvent emitted by the advance_epoch transaction.
5311        let system_epoch_info_event = temporary_store
5312            .events
5313            .data
5314            .into_iter()
5315            .find(|event| event.is_system_epoch_info_event())
5316            .map(SystemEpochInfoEvent::from);
5317        // The system epoch info event can be `None` in case if the `advance_epoch`
5318        // Move function call failed and was executed in the safe mode.
5319        assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5320
5321        // We must write tx and effects to the state sync tables so that state sync is
5322        // able to deliver to the transaction to CheckpointExecutor after it is
5323        // included in a certified checkpoint.
5324        self.get_state_sync_store()
5325            .try_insert_transaction_and_effects(&tx, &effects)
5326            .map_err(|err| {
5327                let err: anyhow::Error = err.into();
5328                err
5329            })?;
5330
5331        info!(
5332            "Effects summary of the change epoch transaction: {:?}",
5333            effects.summary_for_debug()
5334        );
5335        epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5336        // The change epoch transaction cannot fail to execute.
5337        assert!(effects.status().is_ok());
5338        Ok((system_obj, system_epoch_info_event, effects))
5339    }
5340
5341    /// This function is called at the very end of the epoch.
5342    /// This step is required before updating new epoch in the db and calling
5343    /// reopen_epoch_db.
5344    #[instrument(level = "error", skip_all)]
5345    async fn revert_uncommitted_epoch_transactions(
5346        &self,
5347        epoch_store: &AuthorityPerEpochStore,
5348    ) -> IotaResult {
5349        {
5350            let state = epoch_store.get_reconfig_state_write_lock_guard();
5351            if state.should_accept_user_certs() {
5352                // Need to change this so that consensus adapter do not accept certificates from
5353                // user. This can happen if our local validator did not initiate
5354                // epoch change locally, but 2f+1 nodes already concluded the
5355                // epoch.
5356                //
5357                // This lock is essentially a barrier for
5358                // `epoch_store.pending_consensus_certificates` table we are reading on the line
5359                // after this block
5360                epoch_store.close_user_certs(state);
5361            }
5362            // lock is dropped here
5363        }
5364        let pending_certificates = epoch_store.pending_consensus_certificates();
5365        info!(
5366            "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5367            pending_certificates.len(),
5368            pending_certificates,
5369        );
5370        for digest in pending_certificates {
5371            if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5372                info!(
5373                    "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5374                    digest
5375                );
5376                continue;
5377            }
5378            info!("Reverting {:?} at the end of epoch", digest);
5379            epoch_store.revert_executed_transaction(&digest)?;
5380            self.get_reconfig_api().try_revert_state_update(&digest)?;
5381        }
5382        info!("All uncommitted local transactions reverted");
5383        Ok(())
5384    }
5385
5386    #[instrument(level = "error", skip_all)]
5387    async fn reopen_epoch_db(
5388        &self,
5389        cur_epoch_store: &AuthorityPerEpochStore,
5390        new_committee: Committee,
5391        epoch_start_configuration: EpochStartConfiguration,
5392        expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5393        epoch_last_checkpoint: CheckpointSequenceNumber,
5394    ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5395        let new_epoch = new_committee.epoch;
5396        info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5397        assert_eq!(
5398            epoch_start_configuration.epoch_start_state().epoch(),
5399            new_committee.epoch
5400        );
5401        fail_point!("before-open-new-epoch-store");
5402        let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5403            self.name,
5404            new_committee,
5405            epoch_start_configuration,
5406            self.get_backing_package_store().clone(),
5407            self.get_object_store().clone(),
5408            expensive_safety_check_config,
5409            epoch_last_checkpoint,
5410        )?;
5411        self.epoch_store.store(new_epoch_store.clone());
5412        Ok(new_epoch_store)
5413    }
5414
5415    /// Checks if `authenticator` unlocks a valid Move account and returns the
5416    /// account-related `AuthenticatorFunctionRef` object.
5417    fn check_move_account(
5418        &self,
5419        auth_account_object_id: ObjectID,
5420        auth_account_object_seq_number: Option<SequenceNumber>,
5421        auth_account_object_digest: Option<ObjectDigest>,
5422        account_object: ObjectReadResult,
5423        signer: &IotaAddress,
5424    ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5425        let account_object = match account_object.object {
5426            ObjectReadResultKind::Object(object) => Ok(object),
5427            ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5428                Err(UserInputError::AccountObjectDeleted {
5429                    account_id: account_object.id(),
5430                    account_version: version,
5431                    transaction_digest: digest,
5432                })
5433            }
5434            // It is impossible to check the account object because it is used in a canceled
5435            // transaction and is not loaded.
5436            ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5437                Err(UserInputError::AccountObjectInCanceledTransaction {
5438                    account_id: account_object.id(),
5439                    account_version: version,
5440                })
5441            }
5442        }?;
5443
5444        let account_object_addr = IotaAddress::from(auth_account_object_id);
5445
5446        fp_ensure!(
5447            signer == &account_object_addr,
5448            UserInputError::IncorrectUserSignature {
5449                error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5450            }
5451            .into()
5452        );
5453
5454        fp_ensure!(
5455            account_object.is_shared() || account_object.is_immutable(),
5456            UserInputError::AccountObjectNotSupported {
5457                object_id: auth_account_object_id
5458            }
5459            .into()
5460        );
5461
5462        let auth_account_object_seq_number =
5463            if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5464                let account_object_version = account_object.version();
5465
5466                fp_ensure!(
5467                    account_object_version == auth_account_object_seq_number,
5468                    UserInputError::AccountObjectVersionMismatch {
5469                        object_id: auth_account_object_id,
5470                        expected_version: auth_account_object_seq_number,
5471                        actual_version: account_object_version,
5472                    }
5473                    .into()
5474                );
5475
5476                auth_account_object_seq_number
5477            } else {
5478                account_object.version()
5479            };
5480
5481        if let Some(auth_account_object_digest) = auth_account_object_digest {
5482            let expected_digest = account_object.digest();
5483            fp_ensure!(
5484                expected_digest == auth_account_object_digest,
5485                UserInputError::InvalidAccountObjectDigest {
5486                    object_id: auth_account_object_id,
5487                    expected_digest,
5488                    actual_digest: auth_account_object_digest,
5489                }
5490                .into()
5491            );
5492        }
5493
5494        let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5495            auth_account_object_id,
5496            &AuthenticatorFunctionRefV1Key::tag().into(),
5497            &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5498        )
5499        .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5500            account_object_id: auth_account_object_id,
5501        })?;
5502
5503        let authenticator_function_ref_field = self
5504            .get_object_cache_reader()
5505            .try_find_object_lt_or_eq_version(
5506                authenticator_function_ref_field_id,
5507                auth_account_object_seq_number,
5508            )?;
5509
5510        if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5511            let field_move_object = authenticator_function_ref_field_obj
5512                .data
5513                .try_as_move()
5514                .expect("dynamic field should never be a package object");
5515
5516            let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5517                field_move_object.to_rust().ok_or(
5518                    UserInputError::InvalidAuthenticatorFunctionRefField {
5519                        account_object_id: auth_account_object_id,
5520                    },
5521                )?;
5522
5523            Ok(AuthenticatorFunctionRefForExecution::new_v1(
5524                field.value,
5525                authenticator_function_ref_field_obj.compute_object_reference(),
5526                authenticator_function_ref_field_obj.owner,
5527                authenticator_function_ref_field_obj.storage_rebate,
5528                authenticator_function_ref_field_obj.previous_transaction,
5529            ))
5530        } else {
5531            Err(UserInputError::MoveAuthenticatorNotFound {
5532                authenticator_function_ref_id: authenticator_function_ref_field_id,
5533                account_object_id: auth_account_object_id,
5534                account_object_version: auth_account_object_seq_number,
5535            }
5536            .into())
5537        }
5538    }
5539
5540    #[allow(clippy::type_complexity)]
5541    fn read_objects_for_signing(
5542        &self,
5543        transaction: &VerifiedTransaction,
5544        epoch: u64,
5545    ) -> IotaResult<(
5546        InputObjects,
5547        ReceivingObjects,
5548        Vec<(InputObjects, ObjectReadResult)>,
5549    )> {
5550        let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5551            Some(transaction.digest()),
5552            &transaction.collect_all_input_object_kind_for_reading()?,
5553            &transaction.data().transaction_data().receiving_objects(),
5554            epoch,
5555        )?;
5556
5557        transaction
5558            .split_input_objects_into_groups_for_reading(input_objects)
5559            .map(|(tx_input_objects, per_authenticator_inputs)| {
5560                (
5561                    tx_input_objects,
5562                    tx_receiving_objects,
5563                    per_authenticator_inputs,
5564                )
5565            })
5566    }
5567
5568    #[allow(clippy::type_complexity)]
5569    fn check_transaction_inputs_for_signing(
5570        &self,
5571        protocol_config: &ProtocolConfig,
5572        reference_gas_price: u64,
5573        tx_data: &TransactionData,
5574        tx_input_objects: InputObjects,
5575        tx_receiving_objects: &ReceivingObjects,
5576        move_authenticators: &Vec<&MoveAuthenticator>,
5577        per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5578    ) -> IotaResult<(
5579        IotaGasStatus,
5580        CheckedInputObjects,
5581        Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5582    )> {
5583        let authenticator_gas_budget = if move_authenticators.is_empty() {
5584            0
5585        } else {
5586            // `max_auth_gas` is used here as a Move authenticator gas budget until it is
5587            // not a part of the transaction data.
5588            protocol_config.max_auth_gas()
5589        };
5590
5591        debug_assert_eq!(
5592            move_authenticators.len(),
5593            per_authenticator_inputs.len(),
5594            "Move authenticators amount must match the number of authenticator inputs"
5595        );
5596
5597        let per_authenticator_checked_inputs = move_authenticators
5598            .iter()
5599            .zip(per_authenticator_inputs)
5600            .map(
5601                |(move_authenticator, (authenticator_input_objects, account_object))| {
5602                    // Check basic `object_to_authenticate` preconditions and get its components.
5603                    let (
5604                        auth_account_object_id,
5605                        auth_account_object_seq_number,
5606                        auth_account_object_digest,
5607                    ) = move_authenticator.object_to_authenticate_components()?;
5608
5609                    let signer = move_authenticator.address()?;
5610
5611                    // Make sure the signer is a Move account.
5612                    let AuthenticatorFunctionRefForExecution {
5613                        authenticator_function_ref,
5614                        ..
5615                    } = self.check_move_account(
5616                        auth_account_object_id,
5617                        auth_account_object_seq_number,
5618                        auth_account_object_digest,
5619                        account_object,
5620                        &signer,
5621                    )?;
5622
5623                    // Check the MoveAuthenticator input objects.
5624                    let authenticator_checked_input_objects =
5625                        iota_transaction_checks::check_move_authenticator_input_for_signing(
5626                            authenticator_input_objects,
5627                        )?;
5628
5629                    Ok((
5630                        authenticator_checked_input_objects,
5631                        authenticator_function_ref,
5632                    ))
5633                },
5634            )
5635            .collect::<IotaResult<Vec<_>>>()?;
5636
5637        // Check the transaction inputs.
5638        let (gas_status, tx_checked_input_objects) =
5639            iota_transaction_checks::check_transaction_input(
5640                protocol_config,
5641                reference_gas_price,
5642                tx_data,
5643                tx_input_objects,
5644                tx_receiving_objects,
5645                &self.metrics.bytecode_verifier_metrics,
5646                &self.config.verifier_signing_config,
5647                authenticator_gas_budget,
5648            )?;
5649
5650        Ok((
5651            gas_status,
5652            tx_checked_input_objects,
5653            per_authenticator_checked_inputs,
5654        ))
5655    }
5656
5657    #[cfg(test)]
5658    pub(crate) fn iter_live_object_set_for_testing(
5659        &self,
5660    ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5661        self.get_accumulator_store()
5662            .iter_cached_live_object_set_for_testing()
5663    }
5664
5665    #[cfg(test)]
5666    pub(crate) fn shutdown_execution_for_test(&self) {
5667        self.tx_execution_shutdown
5668            .lock()
5669            .take()
5670            .unwrap()
5671            .send(())
5672            .unwrap();
5673    }
5674
5675    /// NOTE: this function is only to be used for fuzzing and testing. Never
5676    /// use in prod
5677    pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5678        self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5679        self.get_object_cache_reader()
5680            .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5681        self.get_reconfig_api()
5682            .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5683    }
5684}
5685
5686pub struct RandomnessRoundReceiver {
5687    authority_state: Arc<AuthorityState>,
5688    randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5689}
5690
5691impl RandomnessRoundReceiver {
5692    pub fn spawn(
5693        authority_state: Arc<AuthorityState>,
5694        randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5695    ) -> JoinHandle<()> {
5696        let rrr = RandomnessRoundReceiver {
5697            authority_state,
5698            randomness_rx,
5699        };
5700        spawn_monitored_task!(rrr.run())
5701    }
5702
5703    async fn run(mut self) {
5704        info!("RandomnessRoundReceiver event loop started");
5705
5706        loop {
5707            tokio::select! {
5708                maybe_recv = self.randomness_rx.recv() => {
5709                    if let Some((epoch, round, bytes)) = maybe_recv {
5710                        self.handle_new_randomness(epoch, round, bytes);
5711                    } else {
5712                        break;
5713                    }
5714                },
5715            }
5716        }
5717
5718        info!("RandomnessRoundReceiver event loop ended");
5719    }
5720
5721    #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5722    fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5723        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5724        if epoch_store.epoch() != epoch {
5725            warn!(
5726                "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5727                epoch_store.epoch()
5728            );
5729            return;
5730        }
5731        let transaction = VerifiedTransaction::new_randomness_state_update(
5732            epoch,
5733            round,
5734            bytes,
5735            epoch_store
5736                .epoch_start_config()
5737                .randomness_obj_initial_shared_version(),
5738        );
5739        debug!(
5740            "created randomness state update transaction with digest: {:?}",
5741            transaction.digest()
5742        );
5743        let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5744        let digest = *transaction.digest();
5745
5746        // Randomness state updates contain the full bls signature for the random round,
5747        // which cannot necessarily be reconstructed again later. Therefore we must
5748        // immediately persist this transaction. If we crash before its outputs
5749        // are committed, this ensures we will be able to re-execute it.
5750        self.authority_state
5751            .get_cache_commit()
5752            .persist_transaction(&transaction);
5753
5754        // Send transaction to TransactionManager for execution.
5755        self.authority_state
5756            .transaction_manager()
5757            .enqueue(vec![transaction], &epoch_store);
5758
5759        let authority_state = self.authority_state.clone();
5760        spawn_monitored_task!(async move {
5761            // Wait for transaction execution in a separate task, to avoid deadlock in case
5762            // of out-of-order randomness generation. (Each
5763            // RandomnessStateUpdate depends on the output of the
5764            // RandomnessStateUpdate from the previous round.)
5765            //
5766            // We set a very long timeout so that in case this gets stuck for some reason,
5767            // the validator will eventually crash rather than continuing in a
5768            // zombie mode.
5769            const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5770            let result = tokio::time::timeout(
5771                RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5772                authority_state
5773                    .get_transaction_cache_reader()
5774                    .try_notify_read_executed_effects(&[digest]),
5775            )
5776            .await;
5777            let result = match result {
5778                Ok(result) => result,
5779                Err(_) => {
5780                    if cfg!(debug_assertions) {
5781                        // Crash on randomness update execution timeout in debug builds.
5782                        panic!(
5783                            "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5784                        );
5785                    }
5786                    warn!(
5787                        "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5788                    );
5789                    // Continue waiting as long as necessary in non-debug builds.
5790                    authority_state
5791                        .get_transaction_cache_reader()
5792                        .try_notify_read_executed_effects(&[digest])
5793                        .await
5794                }
5795            };
5796
5797            let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5798            let effects = effects.pop().expect("should return effects");
5799            if *effects.status() != ExecutionStatus::Success {
5800                fatal!(
5801                    "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5802                );
5803            }
5804            debug!(
5805                "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5806            );
5807        });
5808    }
5809}
5810
5811#[async_trait]
5812impl TransactionKeyValueStoreTrait for AuthorityState {
5813    async fn multi_get(
5814        &self,
5815        transaction_keys: &[TransactionDigest],
5816        effects_keys: &[TransactionDigest],
5817    ) -> IotaResult<KVStoreTransactionData> {
5818        let txns = if !transaction_keys.is_empty() {
5819            self.get_transaction_cache_reader()
5820                .try_multi_get_transaction_blocks(transaction_keys)?
5821                .into_iter()
5822                .map(|t| t.map(|t| (*t).clone().into_inner()))
5823                .collect()
5824        } else {
5825            vec![]
5826        };
5827
5828        let fx = if !effects_keys.is_empty() {
5829            self.get_transaction_cache_reader()
5830                .try_multi_get_executed_effects(effects_keys)?
5831        } else {
5832            vec![]
5833        };
5834
5835        Ok((txns, fx))
5836    }
5837
5838    async fn multi_get_checkpoints(
5839        &self,
5840        checkpoint_summaries: &[CheckpointSequenceNumber],
5841        checkpoint_contents: &[CheckpointSequenceNumber],
5842        checkpoint_summaries_by_digest: &[CheckpointDigest],
5843    ) -> IotaResult<(
5844        Vec<Option<CertifiedCheckpointSummary>>,
5845        Vec<Option<CheckpointContents>>,
5846        Vec<Option<CertifiedCheckpointSummary>>,
5847    )> {
5848        // TODO: use multi-get methods if it ever becomes important (unlikely)
5849        let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5850        let store = self.get_checkpoint_store();
5851        for seq in checkpoint_summaries {
5852            let checkpoint = store
5853                .get_checkpoint_by_sequence_number(*seq)?
5854                .map(|c| c.into_inner());
5855
5856            summaries.push(checkpoint);
5857        }
5858
5859        let mut contents = Vec::with_capacity(checkpoint_contents.len());
5860        for seq in checkpoint_contents {
5861            let checkpoint = store
5862                .get_checkpoint_by_sequence_number(*seq)?
5863                .and_then(|summary| {
5864                    store
5865                        .get_checkpoint_contents(&summary.content_digest)
5866                        .expect("db read cannot fail")
5867                });
5868            contents.push(checkpoint);
5869        }
5870
5871        let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5872        for digest in checkpoint_summaries_by_digest {
5873            let checkpoint = store
5874                .get_checkpoint_by_digest(digest)?
5875                .map(|c| c.into_inner());
5876            summaries_by_digest.push(checkpoint);
5877        }
5878
5879        Ok((summaries, contents, summaries_by_digest))
5880    }
5881
5882    async fn get_transaction_perpetual_checkpoint(
5883        &self,
5884        digest: TransactionDigest,
5885    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5886        self.get_checkpoint_cache()
5887            .try_get_transaction_perpetual_checkpoint(&digest)
5888            .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5889    }
5890
5891    async fn get_object(
5892        &self,
5893        object_id: ObjectID,
5894        version: VersionNumber,
5895    ) -> IotaResult<Option<Object>> {
5896        self.get_object_cache_reader()
5897            .try_get_object_by_key(&object_id, version)
5898    }
5899
5900    #[instrument(skip_all)]
5901    async fn multi_get_objects(
5902        &self,
5903        object_keys: &[ObjectKey],
5904    ) -> IotaResult<Vec<Option<Object>>> {
5905        Ok(self
5906            .get_object_cache_reader()
5907            .multi_get_objects_by_key(object_keys))
5908    }
5909
5910    async fn multi_get_transactions_perpetual_checkpoints(
5911        &self,
5912        digests: &[TransactionDigest],
5913    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5914        let res = self
5915            .get_checkpoint_cache()
5916            .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5917
5918        Ok(res
5919            .into_iter()
5920            .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5921            .collect())
5922    }
5923
5924    #[instrument(skip(self))]
5925    async fn multi_get_events_by_tx_digests(
5926        &self,
5927        digests: &[TransactionDigest],
5928    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5929        if digests.is_empty() {
5930            return Ok(vec![]);
5931        }
5932
5933        Ok(self
5934            .get_transaction_cache_reader()
5935            .multi_get_events(digests))
5936    }
5937}
5938
5939#[cfg(msim)]
5940pub mod framework_injection {
5941    use std::{
5942        cell::RefCell,
5943        collections::{BTreeMap, BTreeSet},
5944    };
5945
5946    use iota_framework::{BuiltInFramework, SystemPackage};
5947    use iota_types::{
5948        base_types::{AuthorityName, ObjectID},
5949        is_system_package,
5950    };
5951    use move_binary_format::CompiledModule;
5952
5953    type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5954
5955    // Thread local cache because all simtests run in a single unique thread.
5956    thread_local! {
5957        static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5958    }
5959
5960    type Framework = Vec<CompiledModule>;
5961
5962    pub type PackageUpgradeCallback =
5963        Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5964
5965    enum PackageOverrideConfig {
5966        Global(Framework),
5967        PerValidator(PackageUpgradeCallback),
5968    }
5969
5970    fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5971        modules
5972            .iter()
5973            .map(|m| {
5974                let mut buf = Vec::new();
5975                m.serialize_with_version(m.version, &mut buf).unwrap();
5976                buf
5977            })
5978            .collect()
5979    }
5980
5981    pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5982        OVERRIDE.with(|bs| {
5983            bs.borrow_mut()
5984                .insert(package_id, PackageOverrideConfig::Global(modules))
5985        });
5986    }
5987
5988    pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5989        OVERRIDE.with(|bs| {
5990            bs.borrow_mut()
5991                .insert(package_id, PackageOverrideConfig::PerValidator(func))
5992        });
5993    }
5994
5995    pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5996        OVERRIDE.with(|cfg| {
5997            cfg.borrow().get(package_id).and_then(|entry| match entry {
5998                PackageOverrideConfig::Global(framework) => {
5999                    Some(compiled_modules_to_bytes(framework))
6000                }
6001                PackageOverrideConfig::PerValidator(func) => {
6002                    func(name).map(|fw| compiled_modules_to_bytes(&fw))
6003                }
6004            })
6005        })
6006    }
6007
6008    pub fn get_override_modules(
6009        package_id: &ObjectID,
6010        name: AuthorityName,
6011    ) -> Option<Vec<CompiledModule>> {
6012        OVERRIDE.with(|cfg| {
6013            cfg.borrow().get(package_id).and_then(|entry| match entry {
6014                PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6015                PackageOverrideConfig::PerValidator(func) => func(name),
6016            })
6017        })
6018    }
6019
6020    pub fn get_override_system_package(
6021        package_id: &ObjectID,
6022        name: AuthorityName,
6023    ) -> Option<SystemPackage> {
6024        let bytes = get_override_bytes(package_id, name)?;
6025        let dependencies = if is_system_package(*package_id) {
6026            BuiltInFramework::get_package_by_id(package_id)
6027                .dependencies
6028                .to_vec()
6029        } else {
6030            // Assume that entirely new injected packages depend on all existing system
6031            // packages.
6032            BuiltInFramework::all_package_ids()
6033        };
6034        Some(SystemPackage {
6035            id: *package_id,
6036            bytes,
6037            dependencies,
6038        })
6039    }
6040
6041    pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6042        let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6043        let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6044            cfg.borrow()
6045                .keys()
6046                .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6047                .collect()
6048        });
6049
6050        extra
6051            .into_iter()
6052            .map(|package| SystemPackage {
6053                id: package,
6054                bytes: get_override_bytes(&package, name).unwrap(),
6055                dependencies: BuiltInFramework::all_package_ids(),
6056            })
6057            .collect()
6058    }
6059}
6060
6061#[derive(Debug, Serialize, Deserialize, Clone)]
6062pub struct ObjDumpFormat {
6063    pub id: ObjectID,
6064    pub version: VersionNumber,
6065    pub digest: ObjectDigest,
6066    pub object: Object,
6067}
6068
6069impl ObjDumpFormat {
6070    fn new(object: Object) -> Self {
6071        let oref = object.compute_object_reference();
6072        Self {
6073            id: oref.0,
6074            version: oref.1,
6075            digest: oref.2,
6076            object,
6077        }
6078    }
6079}
6080
6081#[derive(Debug, Serialize, Deserialize, Clone)]
6082pub struct NodeStateDump {
6083    pub tx_digest: TransactionDigest,
6084    pub sender_signed_data: SenderSignedData,
6085    pub executed_epoch: u64,
6086    pub reference_gas_price: u64,
6087    pub protocol_version: u64,
6088    pub epoch_start_timestamp_ms: u64,
6089    pub computed_effects: TransactionEffects,
6090    pub expected_effects_digest: TransactionEffectsDigest,
6091    pub relevant_system_packages: Vec<ObjDumpFormat>,
6092    pub shared_objects: Vec<ObjDumpFormat>,
6093    pub loaded_child_objects: Vec<ObjDumpFormat>,
6094    pub modified_at_versions: Vec<ObjDumpFormat>,
6095    pub runtime_reads: Vec<ObjDumpFormat>,
6096    pub input_objects: Vec<ObjDumpFormat>,
6097}
6098
6099impl NodeStateDump {
6100    pub fn new(
6101        tx_digest: &TransactionDigest,
6102        effects: &TransactionEffects,
6103        expected_effects_digest: TransactionEffectsDigest,
6104        object_store: &dyn ObjectStore,
6105        epoch_store: &Arc<AuthorityPerEpochStore>,
6106        inner_temporary_store: &InnerTemporaryStore,
6107        certificate: &VerifiedExecutableTransaction,
6108    ) -> IotaResult<Self> {
6109        // Epoch info
6110        let executed_epoch = epoch_store.epoch();
6111        let reference_gas_price = epoch_store.reference_gas_price();
6112        let epoch_start_config = epoch_store.epoch_start_config();
6113        let protocol_version = epoch_store.protocol_version().as_u64();
6114        let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6115
6116        // Record all system packages at this version
6117        let mut relevant_system_packages = Vec::new();
6118        for sys_package_id in BuiltInFramework::all_package_ids() {
6119            if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6120                relevant_system_packages.push(ObjDumpFormat::new(w))
6121            }
6122        }
6123
6124        // Record all the shared objects
6125        let mut shared_objects = Vec::new();
6126        for kind in effects.input_shared_objects() {
6127            match kind {
6128                InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6129                    if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
6130                        shared_objects.push(ObjDumpFormat::new(w))
6131                    }
6132                }
6133                InputSharedObject::ReadDeleted(..)
6134                | InputSharedObject::MutateDeleted(..)
6135                | InputSharedObject::Cancelled(..) => (), /* TODO: consider record congested
6136                                                           * objects. */
6137            }
6138        }
6139
6140        // Record all loaded child objects
6141        // Child objects which are read but not mutated are not tracked anywhere else
6142        let mut loaded_child_objects = Vec::new();
6143        for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6144            if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6145                loaded_child_objects.push(ObjDumpFormat::new(w))
6146            }
6147        }
6148
6149        // Record all modified objects
6150        let mut modified_at_versions = Vec::new();
6151        for (id, ver) in effects.modified_at_versions() {
6152            if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6153                modified_at_versions.push(ObjDumpFormat::new(w))
6154            }
6155        }
6156
6157        // Packages read at runtime, which were not previously loaded into the temoorary
6158        // store Some packages may be fetched at runtime and wont show up in
6159        // input objects
6160        let mut runtime_reads = Vec::new();
6161        for obj in inner_temporary_store
6162            .runtime_packages_loaded_from_db
6163            .values()
6164        {
6165            runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6166        }
6167
6168        // All other input objects should already be in `inner_temporary_store.objects`
6169
6170        Ok(Self {
6171            tx_digest: *tx_digest,
6172            executed_epoch,
6173            reference_gas_price,
6174            epoch_start_timestamp_ms,
6175            protocol_version,
6176            relevant_system_packages,
6177            shared_objects,
6178            loaded_child_objects,
6179            modified_at_versions,
6180            runtime_reads,
6181            sender_signed_data: certificate.clone().into_message(),
6182            input_objects: inner_temporary_store
6183                .input_objects
6184                .values()
6185                .map(|o| ObjDumpFormat::new(o.clone()))
6186                .collect(),
6187            computed_effects: effects.clone(),
6188            expected_effects_digest,
6189        })
6190    }
6191
6192    pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6193        let mut objects = Vec::new();
6194        objects.extend(self.relevant_system_packages.clone());
6195        objects.extend(self.shared_objects.clone());
6196        objects.extend(self.loaded_child_objects.clone());
6197        objects.extend(self.modified_at_versions.clone());
6198        objects.extend(self.runtime_reads.clone());
6199        objects.extend(self.input_objects.clone());
6200        objects
6201    }
6202
6203    pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6204        let file_name = format!(
6205            "{}_{}_NODE_DUMP.json",
6206            self.tx_digest,
6207            AuthorityState::unixtime_now_ms()
6208        );
6209        let mut path = path.to_path_buf();
6210        path.push(&file_name);
6211        let mut file = File::create(path.clone())?;
6212        file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6213        Ok(path)
6214    }
6215
6216    pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6217        let file = File::open(path)?;
6218        serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6219    }
6220}