1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_storage::{
48 key_value_store::{
49 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
50 },
51 key_value_store_metrics::KeyValueStoreMetrics,
52};
53#[cfg(msim)]
54use iota_types::committee::CommitteeTrait;
55use iota_types::{
56 IOTA_SYSTEM_ADDRESS, TypeTag,
57 authenticator_state::get_authenticator_state,
58 base_types::*,
59 committee::{Committee, EpochId, ProtocolVersion},
60 crypto::{
61 AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
62 default_hash,
63 },
64 deny_list_v1::check_coin_deny_list_v1_during_signing,
65 digests::{ChainIdentifier, Digest, TransactionEventsDigest},
66 dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
67 effects::{
68 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
69 TransactionEvents, VerifiedSignedTransactionEffects,
70 },
71 error::{ExecutionError, IotaError, IotaResult, UserInputError},
72 event::{Event, EventID, SystemEpochInfoEvent},
73 executable_transaction::VerifiedExecutableTransaction,
74 execution_config_utils::to_binary_config,
75 execution_status::ExecutionStatus,
76 gas::{GasCostSummary, IotaGasStatus},
77 gas_coin::NANOS_PER_IOTA,
78 inner_temporary_store::{
79 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
80 WrittenObjects,
81 },
82 iota_system_state::{
83 IotaSystemState, IotaSystemStateTrait,
84 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
85 },
86 is_system_package,
87 layout_resolver::{LayoutResolver, into_struct_layout},
88 message_envelope::Message,
89 messages_checkpoint::{
90 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
91 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
92 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
93 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
94 },
95 messages_consensus::AuthorityCapabilitiesV1,
96 messages_grpc::{
97 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
98 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
99 TransactionStatus,
100 },
101 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
102 object::{
103 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
104 bounded_visitor::BoundedVisitor,
105 },
106 storage::{
107 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
108 },
109 supported_protocol_versions::{
110 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
111 },
112 transaction::*,
113 transaction_executor::{SimulateTransactionResult, VmChecks},
114};
115use itertools::Itertools;
116use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
117use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
118use parking_lot::Mutex;
119use prometheus::{
120 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
121 register_histogram_vec_with_registry, register_histogram_with_registry,
122 register_int_counter_vec_with_registry, register_int_counter_with_registry,
123 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
124};
125use serde::{Deserialize, Serialize, de::DeserializeOwned};
126use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
127use tap::TapFallible;
128use tokio::{
129 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
130 task::JoinHandle,
131};
132use tracing::{debug, error, info, instrument, trace, warn};
133use typed_store::TypedStoreError;
134
135use self::{
136 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
137};
138#[cfg(msim)]
139pub use crate::checkpoints::checkpoint_executor::utils::{
140 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
141};
142use crate::{
143 authority::{
144 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
145 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
146 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
147 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
148 authority_store_tables::AuthorityPrunerTables,
149 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
150 },
151 authority_client::NetworkAuthorityClient,
152 checkpoints::CheckpointStore,
153 congestion_tracker::CongestionTracker,
154 consensus_adapter::ConsensusAdapter,
155 epoch::committee_store::CommitteeStore,
156 execution_cache::{
157 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
158 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
159 TransactionCacheRead,
160 },
161 execution_driver::execution_process,
162 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
163 metrics::{LatencyObserver, RateTracker},
164 module_cache_metrics::ResolverMetrics,
165 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
166 rest_index::RestIndexStore,
167 stake_aggregator::StakeAggregator,
168 state_accumulator::{AccumulatorStore, StateAccumulator},
169 subscription_handler::SubscriptionHandler,
170 transaction_input_loader::TransactionInputLoader,
171 transaction_manager::TransactionManager,
172 transaction_outputs::TransactionOutputs,
173 validator_tx_finalizer::ValidatorTxFinalizer,
174 verify_indexes::verify_indexes,
175};
176
177#[cfg(test)]
178#[path = "unit_tests/authority_tests.rs"]
179pub mod authority_tests;
180
181#[cfg(test)]
182#[path = "unit_tests/transaction_tests.rs"]
183pub mod transaction_tests;
184
185#[cfg(test)]
186#[path = "unit_tests/batch_transaction_tests.rs"]
187mod batch_transaction_tests;
188
189#[cfg(test)]
190#[path = "unit_tests/move_integration_tests.rs"]
191pub mod move_integration_tests;
192
193#[cfg(test)]
194#[path = "unit_tests/gas_tests.rs"]
195mod gas_tests;
196
197#[cfg(test)]
198#[path = "unit_tests/batch_verification_tests.rs"]
199mod batch_verification_tests;
200
201#[cfg(test)]
202#[path = "unit_tests/coin_deny_list_tests.rs"]
203mod coin_deny_list_tests;
204
205#[cfg(test)]
206#[path = "unit_tests/auth_unit_test_utils.rs"]
207pub mod auth_unit_test_utils;
208
209pub mod authority_test_utils;
210
211pub mod authority_per_epoch_store;
212pub mod authority_per_epoch_store_pruner;
213
214pub mod authority_store_pruner;
215pub mod authority_store_tables;
216pub mod authority_store_types;
217pub mod epoch_start_configuration;
218pub mod shared_object_congestion_tracker;
219pub mod shared_object_version_manager;
220pub mod suggested_gas_price_calculator;
221pub mod test_authority_builder;
222pub mod transaction_deferral;
223
224pub(crate) mod authority_store;
225pub mod backpressure;
226
227pub struct AuthorityMetrics {
229 tx_orders: IntCounter,
230 total_certs: IntCounter,
231 total_cert_attempts: IntCounter,
232 total_effects: IntCounter,
233 pub shared_obj_tx: IntCounter,
234 sponsored_tx: IntCounter,
235 tx_already_processed: IntCounter,
236 num_input_objs: Histogram,
237 num_shared_objects: Histogram,
238 batch_size: Histogram,
239
240 authority_state_handle_transaction_latency: Histogram,
241
242 execute_certificate_latency_single_writer: Histogram,
243 execute_certificate_latency_shared_object: Histogram,
244
245 internal_execution_latency: Histogram,
246 execution_load_input_objects_latency: Histogram,
247 prepare_certificate_latency: Histogram,
248 commit_certificate_latency: Histogram,
249 db_checkpoint_latency: Histogram,
250
251 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
252 pub(crate) transaction_manager_num_missing_objects: IntGauge,
253 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
254 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
255 pub(crate) transaction_manager_num_ready: IntGauge,
256 pub(crate) transaction_manager_object_cache_size: IntGauge,
257 pub(crate) transaction_manager_object_cache_hits: IntCounter,
258 pub(crate) transaction_manager_object_cache_misses: IntCounter,
259 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
260 pub(crate) transaction_manager_package_cache_size: IntGauge,
261 pub(crate) transaction_manager_package_cache_hits: IntCounter,
262 pub(crate) transaction_manager_package_cache_misses: IntCounter,
263 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
264 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
265
266 pub(crate) execution_driver_executed_transactions: IntCounter,
267 pub(crate) execution_driver_dispatch_queue: IntGauge,
268 pub(crate) execution_queueing_delay_s: Histogram,
269 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
270 pub(crate) execution_gas_latency_ratio: Histogram,
271
272 pub(crate) skipped_consensus_txns: IntCounter,
273 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
274
275 pub(crate) authority_overload_status: IntGauge,
276 pub(crate) authority_load_shedding_percentage: IntGauge,
277
278 pub(crate) transaction_overload_sources: IntCounterVec,
279
280 post_processing_total_events_emitted: IntCounter,
282 post_processing_total_tx_indexed: IntCounter,
283 post_processing_total_tx_had_event_processed: IntCounter,
284 post_processing_total_failures: IntCounter,
285
286 pub consensus_handler_processed: IntCounterVec,
288 pub consensus_handler_transaction_sizes: HistogramVec,
289 pub consensus_handler_num_low_scoring_authorities: IntGauge,
290 pub consensus_handler_scores: IntGaugeVec,
291 pub consensus_handler_deferred_transactions: IntCounter,
292 pub consensus_handler_congested_transactions: IntCounter,
293 pub consensus_handler_cancelled_transactions: IntCounter,
294 pub consensus_handler_max_object_costs: IntGaugeVec,
295 pub consensus_committed_subdags: IntCounterVec,
296 pub consensus_committed_messages: IntGaugeVec,
297 pub consensus_committed_user_transactions: IntGaugeVec,
298 pub consensus_calculated_throughput: IntGauge,
299 pub consensus_calculated_throughput_profile: IntGauge,
300
301 pub limits_metrics: Arc<LimitsMetrics>,
302
303 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
305
306 pub zklogin_sig_count: IntCounter,
308 pub multisig_sig_count: IntCounter,
310
311 pub execution_queueing_latency: LatencyObserver,
314
315 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
322
323 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
326}
327
328const POSITIVE_INT_BUCKETS: &[f64] = &[
330 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
331 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
332 7000000., 10000000.,
333];
334
335const LATENCY_SEC_BUCKETS: &[f64] = &[
336 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
337 10., 20., 30., 60., 90.,
338];
339
340const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
342 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
343 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
344];
345
346const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
347 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
348 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
349];
350
351pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
355 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
356 let execute_certificate_latency = register_histogram_vec_with_registry!(
357 "authority_state_execute_certificate_latency",
358 "Latency of executing certificates, including waiting for inputs",
359 &["tx_type"],
360 LATENCY_SEC_BUCKETS.to_vec(),
361 registry,
362 )
363 .unwrap();
364
365 let execute_certificate_latency_single_writer =
366 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
367 let execute_certificate_latency_shared_object =
368 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
369
370 Self {
371 tx_orders: register_int_counter_with_registry!(
372 "total_transaction_orders",
373 "Total number of transaction orders",
374 registry,
375 )
376 .unwrap(),
377 total_certs: register_int_counter_with_registry!(
378 "total_transaction_certificates",
379 "Total number of transaction certificates handled",
380 registry,
381 )
382 .unwrap(),
383 total_cert_attempts: register_int_counter_with_registry!(
384 "total_handle_certificate_attempts",
385 "Number of calls to handle_certificate",
386 registry,
387 )
388 .unwrap(),
389 total_effects: register_int_counter_with_registry!(
391 "total_transaction_effects",
392 "Total number of transaction effects produced",
393 registry,
394 )
395 .unwrap(),
396
397 shared_obj_tx: register_int_counter_with_registry!(
398 "num_shared_obj_tx",
399 "Number of transactions involving shared objects",
400 registry,
401 )
402 .unwrap(),
403
404 sponsored_tx: register_int_counter_with_registry!(
405 "num_sponsored_tx",
406 "Number of sponsored transactions",
407 registry,
408 )
409 .unwrap(),
410
411 tx_already_processed: register_int_counter_with_registry!(
412 "num_tx_already_processed",
413 "Number of transaction orders already processed previously",
414 registry,
415 )
416 .unwrap(),
417 num_input_objs: register_histogram_with_registry!(
418 "num_input_objects",
419 "Distribution of number of input TX objects per TX",
420 POSITIVE_INT_BUCKETS.to_vec(),
421 registry,
422 )
423 .unwrap(),
424 num_shared_objects: register_histogram_with_registry!(
425 "num_shared_objects",
426 "Number of shared input objects per TX",
427 POSITIVE_INT_BUCKETS.to_vec(),
428 registry,
429 )
430 .unwrap(),
431 batch_size: register_histogram_with_registry!(
432 "batch_size",
433 "Distribution of size of transaction batch",
434 POSITIVE_INT_BUCKETS.to_vec(),
435 registry,
436 )
437 .unwrap(),
438 authority_state_handle_transaction_latency: register_histogram_with_registry!(
439 "authority_state_handle_transaction_latency",
440 "Latency of handling transactions",
441 LATENCY_SEC_BUCKETS.to_vec(),
442 registry,
443 )
444 .unwrap(),
445 execute_certificate_latency_single_writer,
446 execute_certificate_latency_shared_object,
447 internal_execution_latency: register_histogram_with_registry!(
448 "authority_state_internal_execution_latency",
449 "Latency of actual certificate executions",
450 LATENCY_SEC_BUCKETS.to_vec(),
451 registry,
452 )
453 .unwrap(),
454 execution_load_input_objects_latency: register_histogram_with_registry!(
455 "authority_state_execution_load_input_objects_latency",
456 "Latency of loading input objects for execution",
457 LOW_LATENCY_SEC_BUCKETS.to_vec(),
458 registry,
459 )
460 .unwrap(),
461 prepare_certificate_latency: register_histogram_with_registry!(
462 "authority_state_prepare_certificate_latency",
463 "Latency of executing certificates, before committing the results",
464 LATENCY_SEC_BUCKETS.to_vec(),
465 registry,
466 )
467 .unwrap(),
468 commit_certificate_latency: register_histogram_with_registry!(
469 "authority_state_commit_certificate_latency",
470 "Latency of committing certificate execution results",
471 LATENCY_SEC_BUCKETS.to_vec(),
472 registry,
473 )
474 .unwrap(),
475 db_checkpoint_latency: register_histogram_with_registry!(
476 "db_checkpoint_latency",
477 "Latency of checkpointing dbs",
478 LATENCY_SEC_BUCKETS.to_vec(),
479 registry,
480 ).unwrap(),
481 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
482 "transaction_manager_num_enqueued_certificates",
483 "Current number of certificates enqueued to TransactionManager",
484 &["result"],
485 registry,
486 )
487 .unwrap(),
488 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
489 "transaction_manager_num_missing_objects",
490 "Current number of missing objects in TransactionManager",
491 registry,
492 )
493 .unwrap(),
494 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
495 "transaction_manager_num_pending_certificates",
496 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
497 registry,
498 )
499 .unwrap(),
500 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
501 "transaction_manager_num_executing_certificates",
502 "Number of executing certificates, including queued and actually running certificates",
503 registry,
504 )
505 .unwrap(),
506 transaction_manager_num_ready: register_int_gauge_with_registry!(
507 "transaction_manager_num_ready",
508 "Number of ready transactions in TransactionManager",
509 registry,
510 )
511 .unwrap(),
512 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
513 "transaction_manager_object_cache_size",
514 "Current size of object-availability cache in TransactionManager",
515 registry,
516 )
517 .unwrap(),
518 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
519 "transaction_manager_object_cache_hits",
520 "Number of object-availability cache hits in TransactionManager",
521 registry,
522 )
523 .unwrap(),
524 authority_overload_status: register_int_gauge_with_registry!(
525 "authority_overload_status",
526 "Whether authority is current experiencing overload and enters load shedding mode.",
527 registry)
528 .unwrap(),
529 authority_load_shedding_percentage: register_int_gauge_with_registry!(
530 "authority_load_shedding_percentage",
531 "The percentage of transactions is shed when the authority is in load shedding mode.",
532 registry)
533 .unwrap(),
534 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
535 "transaction_manager_object_cache_misses",
536 "Number of object-availability cache misses in TransactionManager",
537 registry,
538 )
539 .unwrap(),
540 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
541 "transaction_manager_object_cache_evictions",
542 "Number of object-availability cache evictions in TransactionManager",
543 registry,
544 )
545 .unwrap(),
546 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
547 "transaction_manager_package_cache_size",
548 "Current size of package-availability cache in TransactionManager",
549 registry,
550 )
551 .unwrap(),
552 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
553 "transaction_manager_package_cache_hits",
554 "Number of package-availability cache hits in TransactionManager",
555 registry,
556 )
557 .unwrap(),
558 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
559 "transaction_manager_package_cache_misses",
560 "Number of package-availability cache misses in TransactionManager",
561 registry,
562 )
563 .unwrap(),
564 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
565 "transaction_manager_package_cache_evictions",
566 "Number of package-availability cache evictions in TransactionManager",
567 registry,
568 )
569 .unwrap(),
570 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
571 "transaction_manager_transaction_queue_age_s",
572 "Time spent in waiting for transaction in the queue",
573 LATENCY_SEC_BUCKETS.to_vec(),
574 registry,
575 )
576 .unwrap(),
577 transaction_overload_sources: register_int_counter_vec_with_registry!(
578 "transaction_overload_sources",
579 "Number of times each source indicates transaction overload.",
580 &["source"],
581 registry)
582 .unwrap(),
583 execution_driver_executed_transactions: register_int_counter_with_registry!(
584 "execution_driver_executed_transactions",
585 "Cumulative number of transaction executed by execution driver",
586 registry,
587 )
588 .unwrap(),
589 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
590 "execution_driver_dispatch_queue",
591 "Number of transaction pending in execution driver dispatch queue",
592 registry,
593 )
594 .unwrap(),
595 execution_queueing_delay_s: register_histogram_with_registry!(
596 "execution_queueing_delay_s",
597 "Queueing delay between a transaction is ready for execution until it starts executing.",
598 LATENCY_SEC_BUCKETS.to_vec(),
599 registry
600 )
601 .unwrap(),
602 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
603 "prepare_cert_gas_latency_ratio",
604 "The ratio of computation gas divided by VM execution latency.",
605 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
606 registry
607 )
608 .unwrap(),
609 execution_gas_latency_ratio: register_histogram_with_registry!(
610 "execution_gas_latency_ratio",
611 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
612 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
613 registry
614 )
615 .unwrap(),
616 skipped_consensus_txns: register_int_counter_with_registry!(
617 "skipped_consensus_txns",
618 "Total number of consensus transactions skipped",
619 registry,
620 )
621 .unwrap(),
622 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
623 "skipped_consensus_txns_cache_hit",
624 "Total number of consensus transactions skipped because of local cache hit",
625 registry,
626 )
627 .unwrap(),
628 post_processing_total_events_emitted: register_int_counter_with_registry!(
629 "post_processing_total_events_emitted",
630 "Total number of events emitted in post processing",
631 registry,
632 )
633 .unwrap(),
634 post_processing_total_tx_indexed: register_int_counter_with_registry!(
635 "post_processing_total_tx_indexed",
636 "Total number of txes indexed in post processing",
637 registry,
638 )
639 .unwrap(),
640 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
641 "post_processing_total_tx_had_event_processed",
642 "Total number of txes finished event processing in post processing",
643 registry,
644 )
645 .unwrap(),
646 post_processing_total_failures: register_int_counter_with_registry!(
647 "post_processing_total_failures",
648 "Total number of failure in post processing",
649 registry,
650 )
651 .unwrap(),
652 consensus_handler_processed: register_int_counter_vec_with_registry!(
653 "consensus_handler_processed",
654 "Number of transactions processed by consensus handler",
655 &["class"],
656 registry
657 ).unwrap(),
658 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
659 "consensus_handler_transaction_sizes",
660 "Sizes of each type of transactions processed by consensus handler",
661 &["class"],
662 POSITIVE_INT_BUCKETS.to_vec(),
663 registry
664 ).unwrap(),
665 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
666 "consensus_handler_num_low_scoring_authorities",
667 "Number of low scoring authorities based on reputation scores from consensus",
668 registry
669 ).unwrap(),
670 consensus_handler_scores: register_int_gauge_vec_with_registry!(
671 "consensus_handler_scores",
672 "scores from consensus for each authority",
673 &["authority"],
674 registry,
675 ).unwrap(),
676 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
677 "consensus_handler_deferred_transactions",
678 "Number of transactions deferred by consensus handler",
679 registry,
680 ).unwrap(),
681 consensus_handler_congested_transactions: register_int_counter_with_registry!(
682 "consensus_handler_congested_transactions",
683 "Number of transactions deferred by consensus handler due to congestion",
684 registry,
685 ).unwrap(),
686 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
687 "consensus_handler_cancelled_transactions",
688 "Number of transactions cancelled by consensus handler",
689 registry,
690 ).unwrap(),
691 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
692 "consensus_handler_max_congestion_control_object_costs",
693 "Max object costs for congestion control in the current consensus commit",
694 &["commit_type"],
695 registry,
696 ).unwrap(),
697 consensus_committed_subdags: register_int_counter_vec_with_registry!(
698 "consensus_committed_subdags",
699 "Number of committed subdags, sliced by author",
700 &["authority"],
701 registry,
702 ).unwrap(),
703 consensus_committed_messages: register_int_gauge_vec_with_registry!(
704 "consensus_committed_messages",
705 "Total number of committed consensus messages, sliced by author",
706 &["authority"],
707 registry,
708 ).unwrap(),
709 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
710 "consensus_committed_user_transactions",
711 "Number of committed user transactions, sliced by submitter",
712 &["authority"],
713 registry,
714 ).unwrap(),
715 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
716 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
717 zklogin_sig_count: register_int_counter_with_registry!(
718 "zklogin_sig_count",
719 "Count of zkLogin signatures",
720 registry,
721 )
722 .unwrap(),
723 multisig_sig_count: register_int_counter_with_registry!(
724 "multisig_sig_count",
725 "Count of zkLogin signatures",
726 registry,
727 )
728 .unwrap(),
729 consensus_calculated_throughput: register_int_gauge_with_registry!(
730 "consensus_calculated_throughput",
731 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
732 registry,
733 ).unwrap(),
734 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
735 "consensus_calculated_throughput_profile",
736 "The current active calculated throughput profile",
737 registry
738 ).unwrap(),
739 execution_queueing_latency: LatencyObserver::new(),
740 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
741 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
742 }
743 }
744
745 pub fn reset_on_reconfigure(&self) {
749 self.consensus_committed_messages.reset();
750 self.consensus_handler_scores.reset();
751 self.consensus_committed_user_transactions.reset();
752 }
753}
754
755pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
762
763pub struct AuthorityState {
764 pub name: AuthorityName,
767 pub secret: StableSyncAuthoritySigner,
769
770 input_loader: TransactionInputLoader,
772 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
773
774 epoch_store: ArcSwap<AuthorityPerEpochStore>,
775
776 execution_lock: RwLock<EpochId>,
782
783 pub indexes: Option<Arc<IndexStore>>,
784 pub rest_index: Option<Arc<RestIndexStore>>,
785
786 pub subscription_handler: Arc<SubscriptionHandler>,
787 pub checkpoint_store: Arc<CheckpointStore>,
788
789 committee_store: Arc<CommitteeStore>,
790
791 transaction_manager: Arc<TransactionManager>,
793
794 #[cfg_attr(not(test), expect(unused))]
796 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
797
798 pub metrics: Arc<AuthorityMetrics>,
799 _pruner: AuthorityStorePruner,
800 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
801
802 db_checkpoint_config: DBCheckpointConfig,
804
805 pub config: NodeConfig,
806
807 pub overload_info: AuthorityOverloadInfo,
809
810 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
811
812 chain_identifier: ChainIdentifier,
815
816 pub(crate) congestion_tracker: Arc<CongestionTracker>,
817}
818
819impl AuthorityState {
828 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
829 epoch_store.committee().authority_exists(&self.name)
830 }
831
832 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
833 epoch_store
834 .active_validators()
835 .iter()
836 .any(|a| AuthorityName::from(a) == self.name)
837 }
838
839 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
840 !self.is_committee_validator(epoch_store)
841 }
842
843 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
844 &self.committee_store
845 }
846
847 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
848 self.committee_store.clone()
849 }
850
851 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
852 &self.config.authority_overload_config
853 }
854
855 pub fn get_epoch_state_commitments(
856 &self,
857 epoch: EpochId,
858 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
859 self.checkpoint_store.get_epoch_state_commitments(epoch)
860 }
861
862 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
866 async fn handle_transaction_impl(
867 &self,
868 transaction: VerifiedTransaction,
869 epoch_store: &Arc<AuthorityPerEpochStore>,
870 ) -> IotaResult<VerifiedSignedTransaction> {
871 let _execution_lock = self.execution_lock_for_signing();
873
874 let tx_digest = transaction.digest();
875 let tx_data = transaction.data().transaction_data();
876
877 let input_object_kinds = tx_data.input_objects()?;
878 let receiving_objects_refs = tx_data.receiving_objects();
879
880 iota_transaction_checks::deny::check_transaction_for_signing(
884 tx_data,
885 transaction.tx_signatures(),
886 &input_object_kinds,
887 &receiving_objects_refs,
888 &self.config.transaction_deny_config,
889 self.get_backing_package_store().as_ref(),
890 )?;
891
892 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
893 Some(tx_digest),
894 &input_object_kinds,
895 &receiving_objects_refs,
896 epoch_store.epoch(),
897 )?;
898
899 let (_gas_status, checked_input_objects) =
900 iota_transaction_checks::check_transaction_input(
901 epoch_store.protocol_config(),
902 epoch_store.reference_gas_price(),
903 tx_data,
904 input_objects,
905 &receiving_objects,
906 &self.metrics.bytecode_verifier_metrics,
907 &self.config.verifier_signing_config,
908 )?;
909
910 check_coin_deny_list_v1_during_signing(
911 tx_data.sender(),
912 &checked_input_objects,
913 &receiving_objects,
914 &self.get_object_store(),
915 )?;
916
917 let owned_objects = checked_input_objects.inner().filter_owned_objects();
918
919 let signed_transaction = VerifiedSignedTransaction::new(
920 epoch_store.epoch(),
921 transaction,
922 self.name,
923 &*self.secret,
924 );
925
926 self.get_cache_writer().try_acquire_transaction_locks(
931 epoch_store,
932 &owned_objects,
933 signed_transaction.clone(),
934 )?;
935
936 Ok(signed_transaction)
937 }
938
939 #[instrument(name = "handle_transaction", level = "trace", skip_all)]
941 pub async fn handle_transaction(
942 &self,
943 epoch_store: &Arc<AuthorityPerEpochStore>,
944 transaction: VerifiedTransaction,
945 ) -> IotaResult<HandleTransactionResponse> {
946 let tx_digest = *transaction.digest();
947 debug!("handle_transaction");
948
949 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
951 return Ok(HandleTransactionResponse { status });
952 }
953
954 let _metrics_guard = self
955 .metrics
956 .authority_state_handle_transaction_latency
957 .start_timer();
958 self.metrics.tx_orders.inc();
959
960 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
961 match signed {
962 Ok(s) => {
963 if self.is_committee_validator(epoch_store) {
964 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
965 let tx = s.clone();
966 let validator_tx_finalizer = validator_tx_finalizer.clone();
967 let cache_reader = self.get_transaction_cache_reader().clone();
968 let epoch_store = epoch_store.clone();
969 spawn_monitored_task!(epoch_store.within_alive_epoch(
970 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
971 ));
972 }
973 }
974 Ok(HandleTransactionResponse {
975 status: TransactionStatus::Signed(s.into_inner().into_sig()),
976 })
977 }
978 Err(err) => Ok(HandleTransactionResponse {
982 status: self
983 .get_transaction_status(&tx_digest, epoch_store)?
984 .ok_or(err)?
985 .1,
986 }),
987 }
988 }
989
990 pub fn check_system_overload_at_signing(&self) -> bool {
991 self.config
992 .authority_overload_config
993 .check_system_overload_at_signing
994 }
995
996 pub fn check_system_overload_at_execution(&self) -> bool {
997 self.config
998 .authority_overload_config
999 .check_system_overload_at_execution
1000 }
1001
1002 pub(crate) fn check_system_overload(
1003 &self,
1004 consensus_adapter: &Arc<ConsensusAdapter>,
1005 tx_data: &SenderSignedData,
1006 do_authority_overload_check: bool,
1007 ) -> IotaResult {
1008 if do_authority_overload_check {
1009 self.check_authority_overload(tx_data).tap_err(|_| {
1010 self.update_overload_metrics("execution_queue");
1011 })?;
1012 }
1013 self.transaction_manager
1014 .check_execution_overload(self.overload_config(), tx_data)
1015 .tap_err(|_| {
1016 self.update_overload_metrics("execution_pending");
1017 })?;
1018 consensus_adapter.check_consensus_overload().tap_err(|_| {
1019 self.update_overload_metrics("consensus");
1020 })?;
1021
1022 let pending_tx_count = self
1023 .get_cache_commit()
1024 .approximate_pending_transaction_count();
1025 if pending_tx_count
1026 > self
1027 .config
1028 .execution_cache_config
1029 .writeback_cache
1030 .backpressure_threshold_for_rpc()
1031 {
1032 return Err(IotaError::ValidatorOverloadedRetryAfter {
1033 retry_after_secs: 10,
1034 });
1035 }
1036
1037 Ok(())
1038 }
1039
1040 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1041 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1042 return Ok(());
1043 }
1044
1045 let load_shedding_percentage = self
1046 .overload_info
1047 .load_shedding_percentage
1048 .load(Ordering::Relaxed);
1049 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1050 }
1051
1052 fn update_overload_metrics(&self, source: &str) {
1053 self.metrics
1054 .transaction_overload_sources
1055 .with_label_values(&[source])
1056 .inc();
1057 }
1058
1059 #[instrument(level = "trace", skip_all)]
1061 pub async fn execute_certificate(
1062 &self,
1063 certificate: &VerifiedCertificate,
1064 epoch_store: &Arc<AuthorityPerEpochStore>,
1065 ) -> IotaResult<TransactionEffects> {
1066 let _metrics_guard = if certificate.contains_shared_object() {
1067 self.metrics
1068 .execute_certificate_latency_shared_object
1069 .start_timer()
1070 } else {
1071 self.metrics
1072 .execute_certificate_latency_single_writer
1073 .start_timer()
1074 };
1075 trace!("execute_certificate");
1076
1077 self.metrics.total_cert_attempts.inc();
1078
1079 if !certificate.contains_shared_object() {
1080 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1085 }
1086
1087 epoch_store
1090 .within_alive_epoch(self.notify_read_effects(certificate))
1091 .await
1092 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1093 .and_then(|r| r)
1094 }
1095
1096 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1111 pub fn try_execute_immediately(
1112 &self,
1113 certificate: &VerifiedExecutableTransaction,
1114 expected_effects_digest: Option<TransactionEffectsDigest>,
1115 epoch_store: &Arc<AuthorityPerEpochStore>,
1116 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1117 let _scope = monitored_scope("Execution::try_execute_immediately");
1118 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1119
1120 let tx_digest = certificate.digest();
1121
1122 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1124
1125 if let Some(effects) = self
1128 .get_transaction_cache_reader()
1129 .try_get_executed_effects(tx_digest)?
1130 {
1131 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1132 assert_eq!(
1133 effects.digest(),
1134 expected_effects_digest_inner,
1135 "Unexpected effects digest for transaction {tx_digest:?}"
1136 );
1137 }
1138 tx_guard.release();
1139 return Ok((effects, None));
1140 }
1141 let input_objects =
1142 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1143
1144 let expected_effects_digest =
1150 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1151
1152 self.process_certificate(
1153 tx_guard,
1154 certificate,
1155 input_objects,
1156 expected_effects_digest,
1157 epoch_store,
1158 )
1159 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1160 .tap_ok(
1161 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1162 )
1163 }
1164
1165 pub fn read_objects_for_execution(
1166 &self,
1167 tx_lock: &CertLockGuard,
1168 certificate: &VerifiedExecutableTransaction,
1169 epoch_store: &Arc<AuthorityPerEpochStore>,
1170 ) -> IotaResult<InputObjects> {
1171 let _scope = monitored_scope("Execution::load_input_objects");
1172 let _metrics_guard = self
1173 .metrics
1174 .execution_load_input_objects_latency
1175 .start_timer();
1176 let input_objects = &certificate.data().transaction_data().input_objects()?;
1177 self.input_loader.read_objects_for_execution(
1178 epoch_store,
1179 &certificate.key(),
1180 tx_lock,
1181 input_objects,
1182 epoch_store.epoch(),
1183 )
1184 }
1185
1186 pub fn try_execute_for_test(
1190 &self,
1191 certificate: &VerifiedCertificate,
1192 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1193 let epoch_store = self.epoch_store_for_testing();
1194 let (effects, execution_error_opt) = self.try_execute_immediately(
1195 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1196 None,
1197 &epoch_store,
1198 )?;
1199 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1200 Ok((signed_effects, execution_error_opt))
1201 }
1202
1203 pub fn execute_for_test(
1205 &self,
1206 certificate: &VerifiedCertificate,
1207 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1208 self.try_execute_for_test(certificate)
1209 .expect("try_execute_for_test should not fail")
1210 }
1211
1212 pub async fn notify_read_effects(
1213 &self,
1214 certificate: &VerifiedCertificate,
1215 ) -> IotaResult<TransactionEffects> {
1216 self.get_transaction_cache_reader()
1217 .try_notify_read_executed_effects(&[*certificate.digest()])
1218 .await
1219 .map(|mut r| r.pop().expect("must return correct number of effects"))
1220 }
1221
1222 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1223 self.get_object_cache_reader()
1224 .try_check_owned_objects_are_live(owned_object_refs)
1225 }
1226
1227 pub(crate) fn debug_dump_transaction_state(
1232 &self,
1233 tx_digest: &TransactionDigest,
1234 effects: &TransactionEffects,
1235 expected_effects_digest: TransactionEffectsDigest,
1236 inner_temporary_store: &InnerTemporaryStore,
1237 certificate: &VerifiedExecutableTransaction,
1238 debug_dump_config: &StateDebugDumpConfig,
1239 ) -> IotaResult<PathBuf> {
1240 let dump_dir = debug_dump_config
1241 .dump_file_directory
1242 .as_ref()
1243 .cloned()
1244 .unwrap_or(std::env::temp_dir());
1245 let epoch_store = self.load_epoch_store_one_call_per_task();
1246
1247 NodeStateDump::new(
1248 tx_digest,
1249 effects,
1250 expected_effects_digest,
1251 self.get_object_store().as_ref(),
1252 &epoch_store,
1253 inner_temporary_store,
1254 certificate,
1255 )?
1256 .write_to_file(&dump_dir)
1257 .map_err(|e| IotaError::FileIO(e.to_string()))
1258 }
1259
1260 #[instrument(level = "trace", skip_all)]
1261 pub(crate) fn process_certificate(
1262 &self,
1263 tx_guard: CertTxGuard,
1264 certificate: &VerifiedExecutableTransaction,
1265 input_objects: InputObjects,
1266 expected_effects_digest: Option<TransactionEffectsDigest>,
1267 epoch_store: &Arc<AuthorityPerEpochStore>,
1268 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1269 let process_certificate_start_time = tokio::time::Instant::now();
1270 let digest = *certificate.digest();
1271
1272 fail_point_if!("correlated-crash-process-certificate", || {
1273 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1274 iota_simulator::task::kill_current_node(None);
1275 }
1276 });
1277
1278 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1279 let execution_guard = match execution_guard {
1285 Ok(execution_guard) => execution_guard,
1286 Err(err) => {
1287 tx_guard.release();
1288 return Err(err);
1289 }
1290 };
1291 if *execution_guard != epoch_store.epoch() {
1295 tx_guard.release();
1296 info!("The epoch of the execution_guard doesn't match the epoch store");
1297 return Err(IotaError::WrongEpoch {
1298 expected_epoch: epoch_store.epoch(),
1299 actual_epoch: *execution_guard,
1300 });
1301 }
1302
1303 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1309 &execution_guard,
1310 certificate,
1311 input_objects,
1312 epoch_store,
1313 ) {
1314 Err(e) => {
1315 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1316 tx_guard.release();
1317 return Err(e);
1318 }
1319 Ok(res) => res,
1320 };
1321
1322 if let Some(expected_effects_digest) = expected_effects_digest {
1323 if effects.digest() != expected_effects_digest {
1324 match self.debug_dump_transaction_state(
1326 &digest,
1327 &effects,
1328 expected_effects_digest,
1329 &inner_temporary_store,
1330 certificate,
1331 &self.config.state_debug_dump_config,
1332 ) {
1333 Ok(out_path) => {
1334 info!(
1335 "Dumped node state for transaction {} to {}",
1336 digest,
1337 out_path.as_path().display().to_string()
1338 );
1339 }
1340 Err(e) => {
1341 error!("Error dumping state for transaction {}: {e}", digest);
1342 }
1343 }
1344 error!(
1345 tx_digest = ?digest,
1346 ?expected_effects_digest,
1347 actual_effects = ?effects,
1348 "fork detected!"
1349 );
1350 panic!(
1351 "Transaction {} is expected to have effects digest {}, but got {}!",
1352 digest,
1353 expected_effects_digest,
1354 effects.digest(),
1355 );
1356 }
1357 }
1358
1359 fail_point!("crash");
1360
1361 self.commit_certificate(
1362 certificate,
1363 inner_temporary_store,
1364 &effects,
1365 tx_guard,
1366 execution_guard,
1367 epoch_store,
1368 )?;
1369
1370 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1371 certificate.data().transaction_data().kind()
1372 {
1373 if let Some(err) = &execution_error_opt {
1374 debug_fatal!("Authenticator state update failed: {:?}", err);
1375 }
1376 epoch_store.update_authenticator_state(auth_state);
1377
1378 if cfg!(debug_assertions) {
1381 let authenticator_state = get_authenticator_state(self.get_object_store())
1382 .expect("Read cannot fail")
1383 .expect("Authenticator state must exist");
1384
1385 let mut sys_jwks: Vec<_> = authenticator_state
1386 .active_jwks
1387 .into_iter()
1388 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1389 .collect();
1390 let mut active_jwks: Vec<_> = epoch_store
1391 .signature_verifier
1392 .get_jwks()
1393 .into_iter()
1394 .collect();
1395 sys_jwks.sort();
1396 active_jwks.sort();
1397
1398 assert_eq!(sys_jwks, active_jwks);
1399 }
1400 }
1401
1402 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1403 if elapsed > 0.0 {
1404 self.metrics
1405 .execution_gas_latency_ratio
1406 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1407 };
1408 Ok((effects, execution_error_opt))
1409 }
1410
1411 #[instrument(level = "trace", skip_all)]
1412 fn commit_certificate(
1413 &self,
1414 certificate: &VerifiedExecutableTransaction,
1415 inner_temporary_store: InnerTemporaryStore,
1416 effects: &TransactionEffects,
1417 tx_guard: CertTxGuard,
1418 _execution_guard: ExecutionLockReadGuard<'_>,
1419 epoch_store: &Arc<AuthorityPerEpochStore>,
1420 ) -> IotaResult {
1421 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1422 monitored_scope("Execution::commit_certificate");
1423 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1424
1425 let tx_key = certificate.key();
1426 let tx_digest = certificate.digest();
1427 let input_object_count = inner_temporary_store.input_objects.len();
1428 let shared_object_count = effects.input_shared_objects().len();
1429
1430 let output_keys = inner_temporary_store.get_output_keys(effects);
1431
1432 let _ = self
1434 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1435 .tap_err(|e| {
1436 self.metrics.post_processing_total_failures.inc();
1437 error!(?tx_digest, "tx post processing failed: {e}");
1438 });
1439
1440 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1444
1445 fail_point!("crash");
1447
1448 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1449 certificate.clone().into_unsigned(),
1450 effects.clone(),
1451 inner_temporary_store,
1452 );
1453 self.get_cache_writer()
1454 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1455
1456 if certificate.transaction_data().is_end_of_epoch_tx() {
1457 self.get_object_cache_reader()
1460 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1461 }
1462
1463 tx_guard.commit_tx();
1465
1466 self.transaction_manager
1470 .notify_commit(tx_digest, output_keys, epoch_store);
1471
1472 self.update_metrics(certificate, input_object_count, shared_object_count);
1473
1474 Ok(())
1475 }
1476
1477 fn update_metrics(
1478 &self,
1479 certificate: &VerifiedExecutableTransaction,
1480 input_object_count: usize,
1481 shared_object_count: usize,
1482 ) {
1483 if certificate.has_zklogin_sig() {
1485 self.metrics.zklogin_sig_count.inc();
1486 } else if certificate.has_upgraded_multisig() {
1487 self.metrics.multisig_sig_count.inc();
1488 }
1489
1490 self.metrics.total_effects.inc();
1491 self.metrics.total_certs.inc();
1492
1493 if shared_object_count > 0 {
1494 self.metrics.shared_obj_tx.inc();
1495 }
1496
1497 if certificate.is_sponsored_tx() {
1498 self.metrics.sponsored_tx.inc();
1499 }
1500
1501 self.metrics
1502 .num_input_objs
1503 .observe(input_object_count as f64);
1504 self.metrics
1505 .num_shared_objects
1506 .observe(shared_object_count as f64);
1507 self.metrics.batch_size.observe(
1508 certificate
1509 .data()
1510 .intent_message()
1511 .value
1512 .kind()
1513 .num_commands() as f64,
1514 );
1515 }
1516
1517 #[instrument(level = "trace", skip_all)]
1529 fn prepare_certificate(
1530 &self,
1531 _execution_guard: &ExecutionLockReadGuard<'_>,
1532 certificate: &VerifiedExecutableTransaction,
1533 input_objects: InputObjects,
1534 epoch_store: &Arc<AuthorityPerEpochStore>,
1535 ) -> IotaResult<(
1536 InnerTemporaryStore,
1537 TransactionEffects,
1538 Option<ExecutionError>,
1539 )> {
1540 let _scope = monitored_scope("Execution::prepare_certificate");
1541 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1542 let prepare_certificate_start_time = tokio::time::Instant::now();
1543
1544 let tx_data = certificate.data().transaction_data();
1547 tx_data.validity_check(epoch_store.protocol_config())?;
1548
1549 let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1552 certificate,
1553 input_objects,
1554 epoch_store.protocol_config(),
1555 epoch_store.reference_gas_price(),
1556 )?;
1557
1558 let owned_object_refs = input_objects.inner().filter_owned_objects();
1559 self.check_owned_locks(&owned_object_refs)?;
1560 let tx_digest = *certificate.digest();
1561 let protocol_config = epoch_store.protocol_config();
1562 let transaction_data = &certificate.data().intent_message().value;
1563 let (kind, signer, gas) = transaction_data.execution_parts();
1564
1565 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1566 let (inner_temp_store, _, mut effects, execution_error_opt) =
1567 epoch_store.executor().execute_transaction_to_effects(
1568 self.get_backing_store().as_ref(),
1569 protocol_config,
1570 self.metrics.limits_metrics.clone(),
1571 self.config
1574 .expensive_safety_check_config
1575 .enable_deep_per_tx_iota_conservation_check(),
1576 self.config.certificate_deny_config.certificate_deny_set(),
1577 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1578 epoch_store
1579 .epoch_start_config()
1580 .epoch_data()
1581 .epoch_start_timestamp(),
1582 input_objects,
1583 gas,
1584 gas_status,
1585 kind,
1586 signer,
1587 tx_digest,
1588 &mut None,
1589 );
1590
1591 fail_point_if!("cp_execution_nondeterminism", || {
1592 #[cfg(msim)]
1593 self.create_fail_state(certificate, epoch_store, &mut effects);
1594 });
1595
1596 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1597 if elapsed > 0.0 {
1598 self.metrics
1599 .prepare_cert_gas_latency_ratio
1600 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1601 }
1602
1603 Ok((inner_temp_store, effects, execution_error_opt.err()))
1604 }
1605
1606 pub fn prepare_certificate_for_benchmark(
1607 &self,
1608 certificate: &VerifiedExecutableTransaction,
1609 input_objects: InputObjects,
1610 epoch_store: &Arc<AuthorityPerEpochStore>,
1611 ) -> IotaResult<(
1612 InnerTemporaryStore,
1613 TransactionEffects,
1614 Option<ExecutionError>,
1615 )> {
1616 let lock = RwLock::new(epoch_store.epoch());
1617 let execution_guard = lock.try_read().unwrap();
1618
1619 self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1620 }
1621
1622 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1625 #[allow(clippy::type_complexity)]
1626 pub fn dry_exec_transaction(
1627 &self,
1628 transaction: TransactionData,
1629 transaction_digest: TransactionDigest,
1630 ) -> IotaResult<(
1631 DryRunTransactionBlockResponse,
1632 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1633 TransactionEffects,
1634 Option<ObjectID>,
1635 )> {
1636 let epoch_store = self.load_epoch_store_one_call_per_task();
1637 if !self.is_fullnode(&epoch_store) {
1638 return Err(IotaError::UnsupportedFeature {
1639 error: "dry-exec is only supported on fullnodes".to_string(),
1640 });
1641 }
1642
1643 if transaction.kind().is_system_tx() {
1644 return Err(IotaError::UnsupportedFeature {
1645 error: "dry-exec does not support system transactions".to_string(),
1646 });
1647 }
1648
1649 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1650 }
1651
1652 #[allow(clippy::type_complexity)]
1653 pub fn dry_exec_transaction_for_benchmark(
1654 &self,
1655 transaction: TransactionData,
1656 transaction_digest: TransactionDigest,
1657 ) -> IotaResult<(
1658 DryRunTransactionBlockResponse,
1659 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1660 TransactionEffects,
1661 Option<ObjectID>,
1662 )> {
1663 let epoch_store = self.load_epoch_store_one_call_per_task();
1664 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1665 }
1666
1667 #[instrument(level = "trace", skip_all)]
1668 #[allow(clippy::type_complexity)]
1669 fn dry_exec_transaction_impl(
1670 &self,
1671 epoch_store: &AuthorityPerEpochStore,
1672 transaction: TransactionData,
1673 transaction_digest: TransactionDigest,
1674 ) -> IotaResult<(
1675 DryRunTransactionBlockResponse,
1676 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1677 TransactionEffects,
1678 Option<ObjectID>,
1679 )> {
1680 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1682
1683 let input_object_kinds = transaction.input_objects()?;
1684 let receiving_object_refs = transaction.receiving_objects();
1685
1686 iota_transaction_checks::deny::check_transaction_for_signing(
1687 &transaction,
1688 &[],
1689 &input_object_kinds,
1690 &receiving_object_refs,
1691 &self.config.transaction_deny_config,
1692 self.get_backing_package_store().as_ref(),
1693 )?;
1694
1695 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1696 None,
1698 &input_object_kinds,
1699 &receiving_object_refs,
1700 epoch_store.epoch(),
1701 )?;
1702
1703 let mut transaction = transaction;
1705 let mut gas_object_refs = transaction.gas().to_vec();
1706 let reference_gas_price = epoch_store.reference_gas_price();
1707 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1708 let sender = transaction.gas_owner();
1709 let gas_object_id = ObjectID::random();
1710 let gas_object = Object::new_move(
1711 MoveObject::new_gas_coin(
1712 OBJECT_START_VERSION,
1713 gas_object_id,
1714 SIMULATION_GAS_COIN_VALUE,
1715 ),
1716 Owner::AddressOwner(sender),
1717 TransactionDigest::genesis_marker(),
1718 );
1719 let gas_object_ref = gas_object.compute_object_reference();
1720 gas_object_refs = vec![gas_object_ref];
1721 transaction.gas_data_mut().payment = gas_object_refs.clone();
1723 (
1724 iota_transaction_checks::check_transaction_input_with_given_gas(
1725 epoch_store.protocol_config(),
1726 reference_gas_price,
1727 &transaction,
1728 input_objects,
1729 receiving_objects,
1730 gas_object,
1731 &self.metrics.bytecode_verifier_metrics,
1732 &self.config.verifier_signing_config,
1733 )?,
1734 Some(gas_object_id),
1735 )
1736 } else {
1737 (
1738 iota_transaction_checks::check_transaction_input(
1739 epoch_store.protocol_config(),
1740 reference_gas_price,
1741 &transaction,
1742 input_objects,
1743 &receiving_objects,
1744 &self.metrics.bytecode_verifier_metrics,
1745 &self.config.verifier_signing_config,
1746 )?,
1747 None,
1748 )
1749 };
1750
1751 let protocol_config = epoch_store.protocol_config();
1752 let (kind, signer, _) = transaction.execution_parts();
1753
1754 let silent = true;
1755 let executor = iota_execution::executor(protocol_config, silent, None)
1756 .expect("Creating an executor should not fail here");
1757
1758 let expensive_checks = false;
1759 let (inner_temp_store, _, effects, _execution_error) = executor
1760 .execute_transaction_to_effects(
1761 self.get_backing_store().as_ref(),
1762 protocol_config,
1763 self.metrics.limits_metrics.clone(),
1764 expensive_checks,
1765 self.config.certificate_deny_config.certificate_deny_set(),
1766 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1767 epoch_store
1768 .epoch_start_config()
1769 .epoch_data()
1770 .epoch_start_timestamp(),
1771 checked_input_objects,
1772 gas_object_refs,
1773 gas_status,
1774 kind,
1775 signer,
1776 transaction_digest,
1777 &mut None,
1778 );
1779 let tx_digest = *effects.transaction_digest();
1780
1781 let module_cache =
1782 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1783
1784 let mut layout_resolver =
1785 epoch_store
1786 .executor()
1787 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1788 &inner_temp_store,
1789 self.get_backing_package_store(),
1790 )));
1791 let object_changes = Vec::new();
1793
1794 let balance_changes = Vec::new();
1796
1797 let written_with_kind = effects
1798 .created()
1799 .into_iter()
1800 .map(|(oref, _)| (oref, WriteKind::Create))
1801 .chain(
1802 effects
1803 .unwrapped()
1804 .into_iter()
1805 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1806 )
1807 .chain(
1808 effects
1809 .mutated()
1810 .into_iter()
1811 .map(|(oref, _)| (oref, WriteKind::Mutate)),
1812 )
1813 .map(|(oref, kind)| {
1814 let obj = inner_temp_store.written.get(&oref.0).unwrap();
1815 (oref.0, (oref, obj.clone(), kind))
1817 })
1818 .collect();
1819
1820 Ok((
1821 DryRunTransactionBlockResponse {
1822 suggested_gas_price: self
1824 .congestion_tracker
1825 .get_prediction_suggested_gas_price(&transaction),
1826 input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1827 .map_err(|e| IotaError::TransactionSerialization {
1828 error: format!(
1829 "Failed to convert transaction to IotaTransactionBlockData: {e}",
1830 ),
1831 })?, effects: effects.clone().try_into()?,
1833 events: IotaTransactionBlockEvents::try_from(
1834 inner_temp_store.events.clone(),
1835 tx_digest,
1836 None,
1837 layout_resolver.as_mut(),
1838 )?,
1839 object_changes,
1840 balance_changes,
1841 },
1842 written_with_kind,
1843 effects,
1844 mock_gas,
1845 ))
1846 }
1847
1848 pub fn simulate_transaction(
1849 &self,
1850 mut transaction: TransactionData,
1851 checks: VmChecks,
1852 ) -> IotaResult<SimulateTransactionResult> {
1853 if transaction.kind().is_system_tx() {
1854 return Err(IotaError::UnsupportedFeature {
1855 error: "simulate does not support system transactions".to_string(),
1856 });
1857 }
1858
1859 let epoch_store = self.load_epoch_store_one_call_per_task();
1860 if !self.is_fullnode(&epoch_store) {
1861 return Err(IotaError::UnsupportedFeature {
1862 error: "simulate is only supported on fullnodes".to_string(),
1863 });
1864 }
1865
1866 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1870
1871 let input_object_kinds = transaction.input_objects()?;
1872 let receiving_object_refs = transaction.receiving_objects();
1873
1874 iota_transaction_checks::deny::check_transaction_for_signing(
1877 &transaction,
1878 &[],
1879 &input_object_kinds,
1880 &receiving_object_refs,
1881 &self.config.transaction_deny_config,
1882 self.get_backing_package_store().as_ref(),
1883 )?;
1884
1885 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1887 None,
1889 &input_object_kinds,
1890 &receiving_object_refs,
1891 epoch_store.epoch(),
1892 )?;
1893
1894 let mock_gas_id = if transaction.gas().is_empty() {
1896 let mock_gas_object = Object::new_move(
1897 MoveObject::new_gas_coin(
1898 OBJECT_START_VERSION,
1899 ObjectID::MAX,
1900 SIMULATION_GAS_COIN_VALUE,
1901 ),
1902 Owner::AddressOwner(transaction.gas_data().owner),
1903 TransactionDigest::genesis_marker(),
1904 );
1905 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
1906 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
1907 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
1908 Some(mock_gas_object.id())
1909 } else {
1910 None
1911 };
1912
1913 let protocol_config = epoch_store.protocol_config();
1914
1915 let (gas_status, checked_input_objects) = if checks.enabled() {
1918 iota_transaction_checks::check_transaction_input(
1919 protocol_config,
1920 epoch_store.reference_gas_price(),
1921 &transaction,
1922 input_objects,
1923 &receiving_objects,
1924 &self.metrics.bytecode_verifier_metrics,
1925 &self.config.verifier_signing_config,
1926 )?
1927 } else {
1928 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
1929 protocol_config,
1930 transaction.kind(),
1931 input_objects,
1932 receiving_objects,
1933 )?;
1934 let gas_status = IotaGasStatus::new(
1935 transaction.gas_budget(),
1936 transaction.gas_price(),
1937 epoch_store.reference_gas_price(),
1938 protocol_config,
1939 )?;
1940
1941 (gas_status, checked_input_objects)
1942 };
1943
1944 let executor = iota_execution::executor(
1946 protocol_config,
1947 true, None,
1949 )
1950 .expect("Creating an executor should not fail here");
1951
1952 let (kind, signer, gas_coins) = transaction.execution_parts();
1954 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
1955 self.get_backing_store().as_ref(),
1956 protocol_config,
1957 self.metrics.limits_metrics.clone(),
1958 false, self.config.certificate_deny_config.certificate_deny_set(),
1960 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1961 epoch_store
1962 .epoch_start_config()
1963 .epoch_data()
1964 .epoch_start_timestamp(),
1965 checked_input_objects,
1966 gas_coins,
1967 gas_status,
1968 kind,
1969 signer,
1970 transaction.digest(),
1971 checks.disabled(),
1972 );
1973
1974 Ok(SimulateTransactionResult {
1977 input_objects: inner_temp_store.input_objects,
1978 output_objects: inner_temp_store.written,
1979 events: effects.events_digest().map(|_| inner_temp_store.events),
1980 effects,
1981 execution_result,
1982 mock_gas_id,
1983 })
1984 }
1985
1986 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
1991 pub async fn dev_inspect_transaction_block(
1992 &self,
1993 sender: IotaAddress,
1994 transaction_kind: TransactionKind,
1995 gas_price: Option<u64>,
1996 gas_budget: Option<u64>,
1997 gas_sponsor: Option<IotaAddress>,
1998 gas_objects: Option<Vec<ObjectRef>>,
1999 show_raw_txn_data_and_effects: Option<bool>,
2000 skip_checks: Option<bool>,
2001 ) -> IotaResult<DevInspectResults> {
2002 let epoch_store = self.load_epoch_store_one_call_per_task();
2003
2004 if !self.is_fullnode(&epoch_store) {
2005 return Err(IotaError::UnsupportedFeature {
2006 error: "dev-inspect is only supported on fullnodes".to_string(),
2007 });
2008 }
2009
2010 if transaction_kind.is_system_tx() {
2011 return Err(IotaError::UnsupportedFeature {
2012 error: "system transactions are not supported".to_string(),
2013 });
2014 }
2015
2016 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2017 let skip_checks = skip_checks.unwrap_or(true);
2018 let reference_gas_price = epoch_store.reference_gas_price();
2019 let protocol_config = epoch_store.protocol_config();
2020 let max_tx_gas = protocol_config.max_tx_gas();
2021
2022 let price = gas_price.unwrap_or(reference_gas_price);
2023 let budget = gas_budget.unwrap_or(max_tx_gas);
2024 let owner = gas_sponsor.unwrap_or(sender);
2025 let payment = gas_objects.unwrap_or_default();
2028 let transaction = TransactionData::V1(TransactionDataV1 {
2029 kind: transaction_kind.clone(),
2030 sender,
2031 gas_data: GasData {
2032 payment,
2033 owner,
2034 price,
2035 budget,
2036 },
2037 expiration: TransactionExpiration::None,
2038 });
2039
2040 let raw_txn_data = if show_raw_txn_data_and_effects {
2041 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2042 error: "Failed to serialize transaction during dev inspect".to_string(),
2043 })?
2044 } else {
2045 vec![]
2046 };
2047
2048 transaction.validity_check_no_gas_check(protocol_config)?;
2049
2050 let input_object_kinds = transaction.input_objects()?;
2051 let receiving_object_refs = transaction.receiving_objects();
2052
2053 iota_transaction_checks::deny::check_transaction_for_signing(
2054 &transaction,
2055 &[],
2056 &input_object_kinds,
2057 &receiving_object_refs,
2058 &self.config.transaction_deny_config,
2059 self.get_backing_package_store().as_ref(),
2060 )?;
2061
2062 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2063 None,
2065 &input_object_kinds,
2066 &receiving_object_refs,
2067 epoch_store.epoch(),
2068 )?;
2069
2070 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2072 SIMULATION_GAS_COIN_VALUE,
2073 transaction.gas_owner(),
2074 );
2075
2076 let gas_objects = if transaction.gas().is_empty() {
2077 let gas_object_ref = dummy_gas_object.compute_object_reference();
2078 vec![gas_object_ref]
2079 } else {
2080 transaction.gas().to_vec()
2081 };
2082
2083 let (gas_status, checked_input_objects) = if skip_checks {
2084 if transaction.gas().is_empty() {
2089 input_objects.push(ObjectReadResult::new(
2090 InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2091 dummy_gas_object.into(),
2092 ));
2093 }
2094 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2095 protocol_config,
2096 &transaction_kind,
2097 input_objects,
2098 receiving_objects,
2099 )?;
2100 let gas_status = IotaGasStatus::new(
2101 max_tx_gas,
2102 transaction.gas_price(),
2103 reference_gas_price,
2104 protocol_config,
2105 )?;
2106
2107 (gas_status, checked_input_objects)
2108 } else {
2109 if transaction.gas().is_empty() {
2113 iota_transaction_checks::check_transaction_input_with_given_gas(
2114 epoch_store.protocol_config(),
2115 reference_gas_price,
2116 &transaction,
2117 input_objects,
2118 receiving_objects,
2119 dummy_gas_object,
2120 &self.metrics.bytecode_verifier_metrics,
2121 &self.config.verifier_signing_config,
2122 )?
2123 } else {
2124 iota_transaction_checks::check_transaction_input(
2125 epoch_store.protocol_config(),
2126 reference_gas_price,
2127 &transaction,
2128 input_objects,
2129 &receiving_objects,
2130 &self.metrics.bytecode_verifier_metrics,
2131 &self.config.verifier_signing_config,
2132 )?
2133 }
2134 };
2135
2136 let executor = iota_execution::executor(protocol_config, true, None)
2137 .expect("Creating an executor should not fail here");
2138 let intent_msg = IntentMessage::new(
2139 Intent {
2140 version: IntentVersion::V0,
2141 scope: IntentScope::TransactionData,
2142 app_id: AppId::Iota,
2143 },
2144 transaction,
2145 );
2146 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2147 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2148 self.get_backing_store().as_ref(),
2149 protocol_config,
2150 self.metrics.limits_metrics.clone(),
2151 false,
2153 self.config.certificate_deny_config.certificate_deny_set(),
2154 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2155 epoch_store
2156 .epoch_start_config()
2157 .epoch_data()
2158 .epoch_start_timestamp(),
2159 checked_input_objects,
2160 gas_objects,
2161 gas_status,
2162 transaction_kind,
2163 sender,
2164 transaction_digest,
2165 skip_checks,
2166 );
2167
2168 let raw_effects = if show_raw_txn_data_and_effects {
2169 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2170 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2171 })?
2172 } else {
2173 vec![]
2174 };
2175
2176 let mut layout_resolver =
2177 epoch_store
2178 .executor()
2179 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2180 &inner_temp_store,
2181 self.get_backing_package_store(),
2182 )));
2183
2184 DevInspectResults::new(
2185 effects,
2186 inner_temp_store.events.clone(),
2187 execution_result,
2188 raw_txn_data,
2189 raw_effects,
2190 layout_resolver.as_mut(),
2191 )
2192 }
2193
2194 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2196 let epoch_store = self.epoch_store_for_testing();
2197 Ok(epoch_store.reference_gas_price())
2198 }
2199
2200 #[instrument(level = "trace", skip_all)]
2201 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2202 self.get_transaction_cache_reader()
2203 .try_is_tx_already_executed(digest)
2204 }
2205
2206 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2208 self.try_is_tx_already_executed(digest)
2209 .expect("storage access failed")
2210 }
2211
2212 #[instrument(level = "debug", skip_all, err)]
2214 fn index_tx(
2215 &self,
2216 indexes: &IndexStore,
2217 digest: &TransactionDigest,
2218 cert: &VerifiedExecutableTransaction,
2220 effects: &TransactionEffects,
2221 events: &TransactionEvents,
2222 timestamp_ms: u64,
2223 tx_coins: Option<TxCoins>,
2224 written: &WrittenObjects,
2225 inner_temporary_store: &InnerTemporaryStore,
2226 ) -> IotaResult<u64> {
2227 let changes = self
2228 .process_object_index(effects, written, inner_temporary_store)
2229 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2230
2231 indexes.index_tx(
2232 cert.data().intent_message().value.sender(),
2233 cert.data()
2234 .intent_message()
2235 .value
2236 .input_objects()?
2237 .iter()
2238 .map(|o| o.object_id()),
2239 effects
2240 .all_changed_objects()
2241 .into_iter()
2242 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2243 cert.data()
2244 .intent_message()
2245 .value
2246 .move_calls()
2247 .into_iter()
2248 .map(|(package, module, function)| {
2249 (*package, module.to_owned(), function.to_owned())
2250 }),
2251 events,
2252 changes,
2253 digest,
2254 timestamp_ms,
2255 tx_coins,
2256 )
2257 }
2258
2259 #[cfg(msim)]
2260 fn create_fail_state(
2261 &self,
2262 certificate: &VerifiedExecutableTransaction,
2263 epoch_store: &Arc<AuthorityPerEpochStore>,
2264 effects: &mut TransactionEffects,
2265 ) {
2266 use std::cell::RefCell;
2267 thread_local! {
2268 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2269 }
2270 if !certificate.data().intent_message().value.is_system_tx() {
2271 let committee = epoch_store.committee();
2272 let cur_stake = (**committee).weight(&self.name);
2273 if cur_stake > 0 {
2274 FAIL_STATE.with_borrow_mut(|fail_state| {
2275 if fail_state.0 < committee.validity_threshold() {
2277 fail_state.0 += cur_stake;
2278 fail_state.1.insert(self.name);
2279 }
2280
2281 if fail_state.1.contains(&self.name) {
2282 info!("cp_exec failing tx");
2283 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2284 }
2285 });
2286 }
2287 }
2288 }
2289
2290 fn process_object_index(
2291 &self,
2292 effects: &TransactionEffects,
2293 written: &WrittenObjects,
2294 inner_temporary_store: &InnerTemporaryStore,
2295 ) -> IotaResult<ObjectIndexChanges> {
2296 let epoch_store = self.load_epoch_store_one_call_per_task();
2297 let mut layout_resolver =
2298 epoch_store
2299 .executor()
2300 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2301 inner_temporary_store,
2302 self.get_backing_package_store(),
2303 )));
2304
2305 let modified_at_version = effects
2306 .modified_at_versions()
2307 .into_iter()
2308 .collect::<HashMap<_, _>>();
2309
2310 let tx_digest = effects.transaction_digest();
2311 let mut deleted_owners = vec![];
2312 let mut deleted_dynamic_fields = vec![];
2313 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2314 let old_version = modified_at_version.get(&id).unwrap();
2315 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2318 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2319 ) {
2320 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2321 Owner::ObjectOwner(object_id) => {
2322 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2323 }
2324 _ => {}
2325 }
2326 }
2327
2328 let mut new_owners = vec![];
2329 let mut new_dynamic_fields = vec![];
2330
2331 for (oref, owner, kind) in effects.all_changed_objects() {
2332 let id = &oref.0;
2333 if let WriteKind::Mutate = kind {
2336 let Some(old_version) = modified_at_version.get(id) else {
2337 panic!(
2338 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2339 );
2340 };
2341 let Some(old_object) = self
2344 .get_object_store()
2345 .try_get_object_by_key(id, *old_version)?
2346 else {
2347 panic!(
2348 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2349 );
2350 };
2351 if old_object.owner != owner {
2352 match old_object.owner {
2353 Owner::AddressOwner(addr) => {
2354 deleted_owners.push((addr, *id));
2355 }
2356 Owner::ObjectOwner(object_id) => {
2357 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2358 }
2359 _ => {}
2360 }
2361 }
2362 }
2363
2364 match owner {
2365 Owner::AddressOwner(addr) => {
2366 let new_object = written.get(id).unwrap_or_else(
2369 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2370 );
2371 assert_eq!(
2372 new_object.version(),
2373 oref.1,
2374 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2375 tx_digest,
2376 id,
2377 new_object.version(),
2378 oref.1
2379 );
2380
2381 let type_ = new_object
2382 .type_()
2383 .map(|type_| ObjectType::Struct(type_.clone()))
2384 .unwrap_or(ObjectType::Package);
2385
2386 new_owners.push((
2387 (addr, *id),
2388 ObjectInfo {
2389 object_id: *id,
2390 version: oref.1,
2391 digest: oref.2,
2392 type_,
2393 owner,
2394 previous_transaction: *effects.transaction_digest(),
2395 },
2396 ));
2397 }
2398 Owner::ObjectOwner(owner) => {
2399 let new_object = written.get(id).unwrap_or_else(
2400 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2401 );
2402 assert_eq!(
2403 new_object.version(),
2404 oref.1,
2405 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2406 tx_digest,
2407 id,
2408 new_object.version(),
2409 oref.1
2410 );
2411
2412 let Some(df_info) = self
2413 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2414 .unwrap_or_else(|e| {
2415 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2416 None
2417 }
2418 )
2419 else {
2420 continue;
2422 };
2423 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2424 }
2425 _ => {}
2426 }
2427 }
2428
2429 Ok(ObjectIndexChanges {
2430 deleted_owners,
2431 deleted_dynamic_fields,
2432 new_owners,
2433 new_dynamic_fields,
2434 })
2435 }
2436
2437 fn try_create_dynamic_field_info(
2438 &self,
2439 o: &Object,
2440 written: &WrittenObjects,
2441 resolver: &mut dyn LayoutResolver,
2442 get_latest_object_version: bool,
2443 ) -> IotaResult<Option<DynamicFieldInfo>> {
2444 let Some(move_object) = o.data.try_as_move().cloned() else {
2446 return Ok(None);
2447 };
2448
2449 if !move_object.type_().is_dynamic_field() {
2451 return Ok(None);
2452 }
2453
2454 let layout = resolver
2455 .get_annotated_layout(&move_object.type_().clone().into())?
2456 .into_layout();
2457
2458 let field =
2459 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2460 IotaError::ObjectDeserialization {
2461 error: e.to_string(),
2462 }
2463 })?;
2464
2465 let type_ = field.kind;
2466 let name_type: TypeTag = field.name_layout.into();
2467 let bcs_name = field.name_bytes.to_owned();
2468
2469 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2470 .map_err(|e| {
2471 warn!("{e}");
2472 IotaError::ObjectDeserialization {
2473 error: e.to_string(),
2474 }
2475 })?;
2476
2477 let name = DynamicFieldName {
2478 type_: name_type,
2479 value: IotaMoveValue::from(name_value).to_json_value(),
2480 };
2481
2482 let value_metadata = field.value_metadata().map_err(|e| {
2483 warn!("{e}");
2484 IotaError::ObjectDeserialization {
2485 error: e.to_string(),
2486 }
2487 })?;
2488
2489 Ok(Some(match value_metadata {
2490 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2491 name,
2492 bcs_name,
2493 type_,
2494 object_type: object_type.to_canonical_string(true),
2495 object_id: o.id(),
2496 version: o.version(),
2497 digest: o.digest(),
2498 },
2499
2500 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2501 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2506 (
2507 object.version(),
2508 object.digest(),
2509 object.data.type_().unwrap().clone(),
2510 )
2511 } else {
2512 let object = if get_latest_object_version {
2514 self.get_object_store()
2521 .try_get_object(&object_id)?
2522 .ok_or_else(|| UserInputError::ObjectNotFound {
2523 object_id,
2524 version: Some(o.version()),
2525 })?
2526 } else {
2527 self.get_object_store()
2529 .try_get_object_by_key(&object_id, o.version())?
2530 .ok_or_else(|| UserInputError::ObjectNotFound {
2531 object_id,
2532 version: Some(o.version()),
2533 })?
2534 };
2535
2536 (
2537 object.version(),
2538 object.digest(),
2539 object.data.type_().unwrap().clone(),
2540 )
2541 };
2542
2543 DynamicFieldInfo {
2544 name,
2545 bcs_name,
2546 type_,
2547 object_type: object_type.to_string(),
2548 object_id,
2549 version,
2550 digest,
2551 }
2552 }
2553 }))
2554 }
2555
2556 #[instrument(level = "trace", skip_all, err)]
2557 fn post_process_one_tx(
2558 &self,
2559 certificate: &VerifiedExecutableTransaction,
2560 effects: &TransactionEffects,
2561 inner_temporary_store: &InnerTemporaryStore,
2562 epoch_store: &Arc<AuthorityPerEpochStore>,
2563 ) -> IotaResult {
2564 if self.indexes.is_none() {
2565 return Ok(());
2566 }
2567
2568 let tx_digest = certificate.digest();
2569 let timestamp_ms = Self::unixtime_now_ms();
2570 let events = &inner_temporary_store.events;
2571 let written = &inner_temporary_store.written;
2572 let tx_coins =
2573 self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2574
2575 if let Some(indexes) = &self.indexes {
2577 let _ = self
2578 .index_tx(
2579 indexes.as_ref(),
2580 tx_digest,
2581 certificate,
2582 effects,
2583 events,
2584 timestamp_ms,
2585 tx_coins,
2586 written,
2587 inner_temporary_store,
2588 )
2589 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2590 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2591 .expect("Indexing tx should not fail");
2592
2593 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2594 let events = self.make_transaction_block_events(
2595 events.clone(),
2596 *tx_digest,
2597 timestamp_ms,
2598 epoch_store,
2599 inner_temporary_store,
2600 )?;
2601 self.subscription_handler
2603 .process_tx(certificate.data().transaction_data(), &effects, &events)
2604 .tap_ok(|_| {
2605 self.metrics
2606 .post_processing_total_tx_had_event_processed
2607 .inc()
2608 })
2609 .tap_err(|e| {
2610 warn!(
2611 ?tx_digest,
2612 "Post processing - Couldn't process events for tx: {}", e
2613 )
2614 })?;
2615
2616 self.metrics
2617 .post_processing_total_events_emitted
2618 .inc_by(events.data.len() as u64);
2619 };
2620 Ok(())
2621 }
2622
2623 fn make_transaction_block_events(
2624 &self,
2625 transaction_events: TransactionEvents,
2626 digest: TransactionDigest,
2627 timestamp_ms: u64,
2628 epoch_store: &Arc<AuthorityPerEpochStore>,
2629 inner_temporary_store: &InnerTemporaryStore,
2630 ) -> IotaResult<IotaTransactionBlockEvents> {
2631 let mut layout_resolver =
2632 epoch_store
2633 .executor()
2634 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2635 inner_temporary_store,
2636 self.get_backing_package_store(),
2637 )));
2638 IotaTransactionBlockEvents::try_from(
2639 transaction_events,
2640 digest,
2641 Some(timestamp_ms),
2642 layout_resolver.as_mut(),
2643 )
2644 }
2645
2646 pub fn unixtime_now_ms() -> u64 {
2647 let now = SystemTime::now()
2648 .duration_since(UNIX_EPOCH)
2649 .expect("Time went backwards")
2650 .as_millis();
2651 u64::try_from(now).expect("Travelling in time machine")
2652 }
2653
2654 #[instrument(level = "trace", skip_all)]
2655 pub async fn handle_transaction_info_request(
2656 &self,
2657 request: TransactionInfoRequest,
2658 ) -> IotaResult<TransactionInfoResponse> {
2659 let epoch_store = self.load_epoch_store_one_call_per_task();
2660 let (transaction, status) = self
2661 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2662 .ok_or(IotaError::TransactionNotFound {
2663 digest: request.transaction_digest,
2664 })?;
2665 Ok(TransactionInfoResponse {
2666 transaction,
2667 status,
2668 })
2669 }
2670
2671 #[instrument(level = "trace", skip_all)]
2672 pub async fn handle_object_info_request(
2673 &self,
2674 request: ObjectInfoRequest,
2675 ) -> IotaResult<ObjectInfoResponse> {
2676 let epoch_store = self.load_epoch_store_one_call_per_task();
2677
2678 let requested_object_seq = match request.request_kind {
2679 ObjectInfoRequestKind::LatestObjectInfo => {
2680 let (_, seq, _) = self
2681 .try_get_object_or_tombstone(request.object_id)
2682 .await?
2683 .ok_or_else(|| {
2684 IotaError::from(UserInputError::ObjectNotFound {
2685 object_id: request.object_id,
2686 version: None,
2687 })
2688 })?;
2689 seq
2690 }
2691 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2692 };
2693
2694 let object = self
2695 .get_object_store()
2696 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2697 .ok_or_else(|| {
2698 IotaError::from(UserInputError::ObjectNotFound {
2699 object_id: request.object_id,
2700 version: Some(requested_object_seq),
2701 })
2702 })?;
2703
2704 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2705 (request.generate_layout, object.data.try_as_move())
2706 {
2707 Some(into_struct_layout(
2708 epoch_store
2709 .executor()
2710 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2711 .get_annotated_layout(&move_obj.type_().clone().into())?,
2712 )?)
2713 } else {
2714 None
2715 };
2716
2717 let lock = if !object.is_address_owned() {
2718 None
2720 } else {
2721 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2722 .await?
2723 .map(|s| s.into_inner())
2724 };
2725
2726 Ok(ObjectInfoResponse {
2727 object,
2728 layout,
2729 lock_for_debugging: lock,
2730 })
2731 }
2732
2733 #[instrument(level = "trace", skip_all)]
2734 pub fn handle_checkpoint_request(
2735 &self,
2736 request: &CheckpointRequest,
2737 ) -> IotaResult<CheckpointResponse> {
2738 let summary = if request.certified {
2739 let summary = match request.sequence_number {
2740 Some(seq) => self
2741 .checkpoint_store
2742 .get_checkpoint_by_sequence_number(seq)?,
2743 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2744 }
2745 .map(|v| v.into_inner());
2746 summary.map(CheckpointSummaryResponse::Certified)
2747 } else {
2748 let summary = match request.sequence_number {
2749 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2750 None => self
2751 .checkpoint_store
2752 .get_latest_locally_computed_checkpoint()?,
2753 };
2754 summary.map(CheckpointSummaryResponse::Pending)
2755 };
2756 let contents = match &summary {
2757 Some(s) => self
2758 .checkpoint_store
2759 .get_checkpoint_contents(&s.content_digest())?,
2760 None => None,
2761 };
2762 Ok(CheckpointResponse {
2763 checkpoint: summary,
2764 contents,
2765 })
2766 }
2767
2768 fn check_protocol_version(
2769 supported_protocol_versions: SupportedProtocolVersions,
2770 current_version: ProtocolVersion,
2771 ) {
2772 info!("current protocol version is now {:?}", current_version);
2773 info!("supported versions are: {:?}", supported_protocol_versions);
2774 if !supported_protocol_versions.is_version_supported(current_version) {
2775 let msg = format!(
2776 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2777 );
2778
2779 error!("{}", msg);
2780 eprintln!("{msg}");
2781
2782 #[cfg(not(msim))]
2783 std::process::exit(1);
2784
2785 #[cfg(msim)]
2786 iota_simulator::task::shutdown_current_node();
2787 }
2788 }
2789
2790 #[expect(clippy::disallowed_methods)] pub async fn new(
2792 name: AuthorityName,
2793 secret: StableSyncAuthoritySigner,
2794 supported_protocol_versions: SupportedProtocolVersions,
2795 store: Arc<AuthorityStore>,
2796 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2797 epoch_store: Arc<AuthorityPerEpochStore>,
2798 committee_store: Arc<CommitteeStore>,
2799 indexes: Option<Arc<IndexStore>>,
2800 rest_index: Option<Arc<RestIndexStore>>,
2801 checkpoint_store: Arc<CheckpointStore>,
2802 prometheus_registry: &Registry,
2803 genesis_objects: &[Object],
2804 db_checkpoint_config: &DBCheckpointConfig,
2805 config: NodeConfig,
2806 archive_readers: ArchiveReaderBalancer,
2807 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2808 chain_identifier: ChainIdentifier,
2809 pruner_db: Option<Arc<AuthorityPrunerTables>>,
2810 ) -> Arc<Self> {
2811 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2812
2813 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2814 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2815 let transaction_manager = Arc::new(TransactionManager::new(
2816 execution_cache_trait_pointers.object_cache_reader.clone(),
2817 execution_cache_trait_pointers
2818 .transaction_cache_reader
2819 .clone(),
2820 &epoch_store,
2821 tx_ready_certificates,
2822 metrics.clone(),
2823 ));
2824 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2825
2826 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2827 epoch_store.get_parent_path(),
2828 &config.authority_store_pruning_config,
2829 );
2830 let _pruner = AuthorityStorePruner::new(
2831 store.perpetual_tables.clone(),
2832 checkpoint_store.clone(),
2833 rest_index.clone(),
2834 config.authority_store_pruning_config.clone(),
2835 epoch_store.committee().authority_exists(&name),
2836 epoch_store.epoch_start_state().epoch_duration_ms(),
2837 prometheus_registry,
2838 archive_readers,
2839 pruner_db,
2840 );
2841 let input_loader =
2842 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2843 let epoch = epoch_store.epoch();
2844 let rgp = epoch_store.reference_gas_price();
2845 let state = Arc::new(AuthorityState {
2846 name,
2847 secret,
2848 execution_lock: RwLock::new(epoch),
2849 epoch_store: ArcSwap::new(epoch_store.clone()),
2850 input_loader,
2851 execution_cache_trait_pointers,
2852 indexes,
2853 rest_index,
2854 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2855 checkpoint_store,
2856 committee_store,
2857 transaction_manager,
2858 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2859 metrics,
2860 _pruner,
2861 _authority_per_epoch_pruner,
2862 db_checkpoint_config: db_checkpoint_config.clone(),
2863 config,
2864 overload_info: AuthorityOverloadInfo::default(),
2865 validator_tx_finalizer,
2866 chain_identifier,
2867 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
2868 });
2869
2870 let authority_state = Arc::downgrade(&state);
2872 spawn_monitored_task!(execution_process(
2873 authority_state,
2874 rx_ready_certificates,
2875 rx_execution_shutdown,
2876 ));
2877
2878 state
2880 .create_owner_index_if_empty(genesis_objects, &epoch_store)
2881 .expect("Error indexing genesis objects.");
2882
2883 state
2884 }
2885
2886 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2888 &self.execution_cache_trait_pointers.object_cache_reader
2889 }
2890
2891 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2892 &self.execution_cache_trait_pointers.transaction_cache_reader
2893 }
2894
2895 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2896 &self.execution_cache_trait_pointers.cache_writer
2897 }
2898
2899 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2900 &self.execution_cache_trait_pointers.backing_store
2901 }
2902
2903 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2904 &self.execution_cache_trait_pointers.backing_package_store
2905 }
2906
2907 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2908 &self.execution_cache_trait_pointers.object_store
2909 }
2910
2911 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2912 &self.execution_cache_trait_pointers.reconfig_api
2913 }
2914
2915 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2916 &self.execution_cache_trait_pointers.accumulator_store
2917 }
2918
2919 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2920 &self.execution_cache_trait_pointers.checkpoint_cache
2921 }
2922
2923 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2924 &self.execution_cache_trait_pointers.state_sync_store
2925 }
2926
2927 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2928 &self.execution_cache_trait_pointers.cache_commit
2929 }
2930
2931 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2932 self.execution_cache_trait_pointers
2933 .testing_api
2934 .database_for_testing()
2935 }
2936
2937 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2938 &self,
2939 config: NodeConfig,
2940 metrics: Arc<AuthorityStorePruningMetrics>,
2941 ) -> anyhow::Result<()> {
2942 let archive_readers =
2943 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
2944 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
2945 &self.database_for_testing().perpetual_tables,
2946 &self.checkpoint_store,
2947 self.rest_index.as_deref(),
2948 None,
2949 config.authority_store_pruning_config,
2950 metrics,
2951 archive_readers,
2952 EPOCH_DURATION_MS_FOR_TESTING,
2953 )
2954 .await
2955 }
2956
2957 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
2958 &self.transaction_manager
2959 }
2960
2961 pub fn enqueue_transactions_for_execution(
2964 &self,
2965 txns: Vec<VerifiedExecutableTransaction>,
2966 epoch_store: &Arc<AuthorityPerEpochStore>,
2967 ) {
2968 self.transaction_manager.enqueue(txns, epoch_store)
2969 }
2970
2971 pub fn enqueue_certificates_for_execution(
2973 &self,
2974 certs: Vec<VerifiedCertificate>,
2975 epoch_store: &Arc<AuthorityPerEpochStore>,
2976 ) {
2977 self.transaction_manager
2978 .enqueue_certificates(certs, epoch_store)
2979 }
2980
2981 pub fn enqueue_with_expected_effects_digest(
2982 &self,
2983 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
2984 epoch_store: &AuthorityPerEpochStore,
2985 ) {
2986 self.transaction_manager
2987 .enqueue_with_expected_effects_digest(certs, epoch_store)
2988 }
2989
2990 fn create_owner_index_if_empty(
2991 &self,
2992 genesis_objects: &[Object],
2993 epoch_store: &Arc<AuthorityPerEpochStore>,
2994 ) -> IotaResult {
2995 let Some(index_store) = &self.indexes else {
2996 return Ok(());
2997 };
2998 if !index_store.is_empty() {
2999 return Ok(());
3000 }
3001
3002 let mut new_owners = vec![];
3003 let mut new_dynamic_fields = vec![];
3004 let mut layout_resolver = epoch_store
3005 .executor()
3006 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3007 for o in genesis_objects.iter() {
3008 match o.owner {
3009 Owner::AddressOwner(addr) => new_owners.push((
3010 (addr, o.id()),
3011 ObjectInfo::new(&o.compute_object_reference(), o),
3012 )),
3013 Owner::ObjectOwner(object_id) => {
3014 let id = o.id();
3015 let Some(info) = self.try_create_dynamic_field_info(
3016 o,
3017 &BTreeMap::new(),
3018 layout_resolver.as_mut(),
3019 true,
3020 )?
3021 else {
3022 continue;
3023 };
3024 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3025 }
3026 _ => {}
3027 }
3028 }
3029
3030 index_store.insert_genesis_objects(ObjectIndexChanges {
3031 deleted_owners: vec![],
3032 deleted_dynamic_fields: vec![],
3033 new_owners,
3034 new_dynamic_fields,
3035 })
3036 }
3037
3038 pub fn execution_lock_for_executable_transaction(
3042 &self,
3043 transaction: &VerifiedExecutableTransaction,
3044 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3045 let lock = self
3046 .execution_lock
3047 .try_read()
3048 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3049 if *lock == transaction.auth_sig().epoch() {
3050 Ok(lock)
3051 } else {
3052 Err(IotaError::WrongEpoch {
3053 expected_epoch: *lock,
3054 actual_epoch: transaction.auth_sig().epoch(),
3055 })
3056 }
3057 }
3058
3059 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3065 self.execution_lock
3066 .try_read()
3067 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3068 }
3069
3070 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3071 self.execution_lock.write().await
3072 }
3073
3074 #[instrument(level = "error", skip_all)]
3075 pub async fn reconfigure(
3076 &self,
3077 cur_epoch_store: &AuthorityPerEpochStore,
3078 supported_protocol_versions: SupportedProtocolVersions,
3079 new_committee: Committee,
3080 epoch_start_configuration: EpochStartConfiguration,
3081 accumulator: Arc<StateAccumulator>,
3082 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3083 epoch_supply_change: i64,
3084 epoch_last_checkpoint: CheckpointSequenceNumber,
3085 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3086 Self::check_protocol_version(
3087 supported_protocol_versions,
3088 epoch_start_configuration
3089 .epoch_start_state()
3090 .protocol_version(),
3091 );
3092 self.metrics.reset_on_reconfigure();
3093 self.committee_store.insert_new_committee(&new_committee)?;
3094
3095 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3097
3098 cur_epoch_store.epoch_terminated().await;
3100
3101 let highest_locally_built_checkpoint_seq = self
3102 .checkpoint_store
3103 .get_latest_locally_computed_checkpoint()?
3104 .map(|c| *c.sequence_number())
3105 .unwrap_or(0);
3106
3107 assert!(
3108 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3109 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3110 );
3111 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3112 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3116 if num_shared_version_assignments > 1 {
3119 debug_fatal!(
3121 "all shared_version_assignments should have been removed \
3122 (num_shared_version_assignments: {num_shared_version_assignments})"
3123 );
3124 }
3125 }
3126
3127 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3133 .await?;
3134 self.get_reconfig_api()
3135 .clear_state_end_of_epoch(&execution_lock);
3136 self.check_system_consistency(
3137 cur_epoch_store,
3138 accumulator,
3139 expensive_safety_check_config,
3140 epoch_supply_change,
3141 )?;
3142 self.get_reconfig_api()
3143 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3144 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3145 if self
3146 .db_checkpoint_config
3147 .perform_db_checkpoints_at_epoch_end
3148 {
3149 let checkpoint_indexes = self
3150 .db_checkpoint_config
3151 .perform_index_db_checkpoints_at_epoch_end
3152 .unwrap_or(false);
3153 let current_epoch = cur_epoch_store.epoch();
3154 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3155 self.checkpoint_all_dbs(
3156 &epoch_checkpoint_path,
3157 cur_epoch_store,
3158 checkpoint_indexes,
3159 )?;
3160 }
3161 }
3162
3163 self.get_reconfig_api()
3164 .reconfigure_cache(&epoch_start_configuration)
3165 .await;
3166
3167 let new_epoch = new_committee.epoch;
3168 let new_epoch_store = self
3169 .reopen_epoch_db(
3170 cur_epoch_store,
3171 new_committee,
3172 epoch_start_configuration,
3173 expensive_safety_check_config,
3174 epoch_last_checkpoint,
3175 )
3176 .await?;
3177 assert_eq!(new_epoch_store.epoch(), new_epoch);
3178 self.transaction_manager.reconfigure(new_epoch);
3179 *execution_lock = new_epoch;
3180 Ok(new_epoch_store)
3184 }
3185
3186 pub async fn reconfigure_for_testing(&self) {
3191 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3192 let epoch_store = self.epoch_store_for_testing().clone();
3193 let protocol_config = epoch_store.protocol_config().clone();
3194 let _guard =
3202 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3203 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3204 self.get_backing_package_store().clone(),
3205 self.get_object_store().clone(),
3206 &self.config.expensive_safety_check_config,
3207 self.checkpoint_store
3208 .get_epoch_last_checkpoint(epoch_store.epoch())
3209 .unwrap()
3210 .map(|c| *c.sequence_number())
3211 .unwrap_or_default(),
3212 );
3213 let new_epoch = new_epoch_store.epoch();
3214 self.transaction_manager.reconfigure(new_epoch);
3215 self.epoch_store.store(new_epoch_store);
3216 epoch_store.epoch_terminated().await;
3217 *execution_lock = new_epoch;
3218 }
3219
3220 #[instrument(level = "error", skip_all)]
3221 fn check_system_consistency(
3222 &self,
3223 cur_epoch_store: &AuthorityPerEpochStore,
3224 accumulator: Arc<StateAccumulator>,
3225 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3226 epoch_supply_change: i64,
3227 ) -> IotaResult<()> {
3228 info!(
3229 "Performing iota conservation consistency check for epoch {}",
3230 cur_epoch_store.epoch()
3231 );
3232
3233 if cfg!(debug_assertions) {
3234 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3235 }
3236
3237 self.get_reconfig_api()
3238 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3239
3240 if expensive_safety_check_config.enable_state_consistency_check() {
3242 info!(
3243 "Performing state consistency check for epoch {}",
3244 cur_epoch_store.epoch()
3245 );
3246 self.expensive_check_is_consistent_state(
3247 accumulator,
3248 cur_epoch_store,
3249 cfg!(debug_assertions), );
3251 }
3252
3253 if expensive_safety_check_config.enable_secondary_index_checks() {
3254 if let Some(indexes) = self.indexes.clone() {
3255 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3256 .expect("secondary indexes are inconsistent");
3257 }
3258 }
3259
3260 Ok(())
3261 }
3262
3263 fn expensive_check_is_consistent_state(
3264 &self,
3265 accumulator: Arc<StateAccumulator>,
3266 cur_epoch_store: &AuthorityPerEpochStore,
3267 panic: bool,
3268 ) {
3269 let live_object_set_hash = accumulator.digest_live_object_set();
3270
3271 let root_state_hash: ECMHLiveObjectSetDigest = self
3272 .get_accumulator_store()
3273 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3274 .expect("Retrieving root state hash cannot fail")
3275 .expect("Root state hash for epoch must exist")
3276 .1
3277 .digest()
3278 .into();
3279
3280 let is_inconsistent = root_state_hash != live_object_set_hash;
3281 if is_inconsistent {
3282 if panic {
3283 panic!(
3284 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3285 );
3286 } else {
3287 error!(
3288 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3289 root_state_hash, live_object_set_hash
3290 );
3291 }
3292 } else {
3293 info!("State consistency check passed");
3294 }
3295
3296 if !panic {
3297 accumulator.set_inconsistent_state(is_inconsistent);
3298 }
3299 }
3300
3301 pub fn current_epoch_for_testing(&self) -> EpochId {
3302 self.epoch_store_for_testing().epoch()
3303 }
3304
3305 #[instrument(level = "error", skip_all)]
3306 pub fn checkpoint_all_dbs(
3307 &self,
3308 checkpoint_path: &Path,
3309 cur_epoch_store: &AuthorityPerEpochStore,
3310 checkpoint_indexes: bool,
3311 ) -> IotaResult {
3312 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3313 let current_epoch = cur_epoch_store.epoch();
3314
3315 if checkpoint_path.exists() {
3316 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3317 return Ok(());
3318 }
3319
3320 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3321 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3322
3323 if checkpoint_path_tmp.exists() {
3324 fs::remove_dir_all(&checkpoint_path_tmp)
3325 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3326 }
3327
3328 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3329 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3330
3331 self.checkpoint_store
3334 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3335
3336 self.get_reconfig_api()
3337 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3338
3339 self.committee_store
3340 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3341
3342 if checkpoint_indexes {
3343 if let Some(indexes) = self.indexes.as_ref() {
3344 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3345 }
3346 }
3347
3348 fs::rename(checkpoint_path_tmp, checkpoint_path)
3349 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3350 Ok(())
3351 }
3352
3353 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3360 self.epoch_store.load()
3361 }
3362
3363 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3365 self.load_epoch_store_one_call_per_task()
3366 }
3367
3368 pub fn clone_committee_for_testing(&self) -> Committee {
3369 Committee::clone(self.epoch_store_for_testing().committee())
3370 }
3371
3372 #[instrument(level = "trace", skip_all)]
3373 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3374 self.get_object_store()
3375 .try_get_object(object_id)
3376 .map_err(Into::into)
3377 }
3378
3379 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3381 self.try_get_object(object_id)
3382 .await
3383 .expect("storage access failed")
3384 }
3385
3386 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3387 Ok(self
3388 .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3389 .await?
3390 .expect("framework object should always exist")
3391 .compute_object_reference())
3392 }
3393
3394 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3396 self.get_object_cache_reader()
3397 .try_get_iota_system_state_object_unsafe()
3398 }
3399
3400 #[instrument(level = "trace", skip_all)]
3401 pub fn get_checkpoint_by_sequence_number(
3402 &self,
3403 sequence_number: CheckpointSequenceNumber,
3404 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3405 Ok(self
3406 .checkpoint_store
3407 .get_checkpoint_by_sequence_number(sequence_number)?)
3408 }
3409
3410 #[instrument(level = "trace", skip_all)]
3411 pub fn get_transaction_checkpoint_for_tests(
3412 &self,
3413 digest: &TransactionDigest,
3414 epoch_store: &AuthorityPerEpochStore,
3415 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3416 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3417 let Some(checkpoint) = checkpoint else {
3418 return Ok(None);
3419 };
3420 let checkpoint = self
3421 .checkpoint_store
3422 .get_checkpoint_by_sequence_number(checkpoint)?;
3423 Ok(checkpoint)
3424 }
3425
3426 #[instrument(level = "trace", skip_all)]
3427 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3428 Ok(
3429 match self
3430 .get_object_cache_reader()
3431 .try_get_latest_object_or_tombstone(*object_id)?
3432 {
3433 Some((_, ObjectOrTombstone::Object(object))) => {
3434 let layout = self.get_object_layout(&object)?;
3435 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3436 }
3437 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3438 None => ObjectRead::NotExists(*object_id),
3439 },
3440 )
3441 }
3442
3443 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3445 self.chain_identifier
3446 }
3447
3448 #[instrument(level = "trace", skip_all)]
3449 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3450 where
3451 T: DeserializeOwned,
3452 {
3453 let o = self.get_object_read(object_id)?.into_object()?;
3454 if let Some(move_object) = o.data.try_as_move() {
3455 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3456 IotaError::ObjectDeserialization {
3457 error: format!("{e}"),
3458 }
3459 })?)
3460 } else {
3461 Err(IotaError::ObjectDeserialization {
3462 error: format!("Provided object : [{object_id}] is not a Move object."),
3463 })
3464 }
3465 }
3466
3467 #[instrument(level = "trace", skip_all)]
3473 pub fn get_past_object_read(
3474 &self,
3475 object_id: &ObjectID,
3476 version: SequenceNumber,
3477 ) -> IotaResult<PastObjectRead> {
3478 let Some(obj_ref) = self
3480 .get_object_cache_reader()
3481 .try_get_latest_object_ref_or_tombstone(*object_id)?
3482 else {
3483 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3484 };
3485
3486 if version > obj_ref.1 {
3487 return Ok(PastObjectRead::VersionTooHigh {
3488 object_id: *object_id,
3489 asked_version: version,
3490 latest_version: obj_ref.1,
3491 });
3492 }
3493
3494 if version < obj_ref.1 {
3495 return Ok(match self.read_object_at_version(object_id, version)? {
3497 Some((object, layout)) => {
3498 let obj_ref = object.compute_object_reference();
3499 PastObjectRead::VersionFound(obj_ref, object, layout)
3500 }
3501
3502 None => PastObjectRead::VersionNotFound(*object_id, version),
3503 });
3504 }
3505
3506 if !obj_ref.2.is_alive() {
3507 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3508 }
3509
3510 match self.read_object_at_version(object_id, obj_ref.1)? {
3511 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3512 None => {
3513 error!(
3514 "Object with in parent_entry is missing from object store, datastore is \
3515 inconsistent",
3516 );
3517 Err(UserInputError::ObjectNotFound {
3518 object_id: *object_id,
3519 version: Some(obj_ref.1),
3520 }
3521 .into())
3522 }
3523 }
3524 }
3525
3526 #[instrument(level = "trace", skip_all)]
3527 fn read_object_at_version(
3528 &self,
3529 object_id: &ObjectID,
3530 version: SequenceNumber,
3531 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3532 let Some(object) = self
3533 .get_object_cache_reader()
3534 .try_get_object_by_key(object_id, version)?
3535 else {
3536 return Ok(None);
3537 };
3538
3539 let layout = self.get_object_layout(&object)?;
3540 Ok(Some((object, layout)))
3541 }
3542
3543 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3544 let layout = object
3545 .data
3546 .try_as_move()
3547 .map(|object| {
3548 into_struct_layout(
3549 self.load_epoch_store_one_call_per_task()
3550 .executor()
3551 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3553 .get_annotated_layout(&object.type_().clone().into())?,
3554 )
3555 })
3556 .transpose()?;
3557 Ok(layout)
3558 }
3559
3560 fn get_owner_at_version(
3561 &self,
3562 object_id: &ObjectID,
3563 version: SequenceNumber,
3564 ) -> IotaResult<Owner> {
3565 self.get_object_store()
3566 .try_get_object_by_key(object_id, version)?
3567 .ok_or_else(|| {
3568 IotaError::from(UserInputError::ObjectNotFound {
3569 object_id: *object_id,
3570 version: Some(version),
3571 })
3572 })
3573 .map(|o| o.owner)
3574 }
3575
3576 #[instrument(level = "trace", skip_all)]
3577 pub fn get_owner_objects(
3578 &self,
3579 owner: IotaAddress,
3580 cursor: Option<ObjectID>,
3582 limit: usize,
3583 filter: Option<IotaObjectDataFilter>,
3584 ) -> IotaResult<Vec<ObjectInfo>> {
3585 if let Some(indexes) = &self.indexes {
3586 indexes.get_owner_objects(owner, cursor, limit, filter)
3587 } else {
3588 Err(IotaError::IndexStoreNotAvailable)
3589 }
3590 }
3591
3592 #[instrument(level = "trace", skip_all)]
3593 pub fn get_owned_coins_iterator_with_cursor(
3594 &self,
3595 owner: IotaAddress,
3596 cursor: (String, ObjectID),
3598 limit: usize,
3599 one_coin_type_only: bool,
3600 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3601 if let Some(indexes) = &self.indexes {
3602 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3603 } else {
3604 Err(IotaError::IndexStoreNotAvailable)
3605 }
3606 }
3607
3608 #[instrument(level = "trace", skip_all)]
3609 pub fn get_owner_objects_iterator(
3610 &self,
3611 owner: IotaAddress,
3612 cursor: Option<ObjectID>,
3614 filter: Option<IotaObjectDataFilter>,
3615 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3616 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3617 if let Some(indexes) = &self.indexes {
3618 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3619 } else {
3620 Err(IotaError::IndexStoreNotAvailable)
3621 }
3622 }
3623
3624 #[instrument(level = "trace", skip_all)]
3625 pub async fn get_move_objects<T>(
3626 &self,
3627 owner: IotaAddress,
3628 type_: MoveObjectType,
3629 ) -> IotaResult<Vec<T>>
3630 where
3631 T: DeserializeOwned,
3632 {
3633 let object_ids = self
3634 .get_owner_objects_iterator(owner, None, None)?
3635 .filter(|o| match &o.type_ {
3636 ObjectType::Struct(s) => &type_ == s,
3637 ObjectType::Package => false,
3638 })
3639 .map(|info| ObjectKey(info.object_id, info.version))
3640 .collect::<Vec<_>>();
3641 let mut move_objects = vec![];
3642
3643 let objects = self
3644 .get_object_store()
3645 .try_multi_get_objects_by_key(&object_ids)?;
3646
3647 for (o, id) in objects.into_iter().zip(object_ids) {
3648 let object = o.ok_or_else(|| {
3649 IotaError::from(UserInputError::ObjectNotFound {
3650 object_id: id.0,
3651 version: Some(id.1),
3652 })
3653 })?;
3654 let move_object = object.data.try_as_move().ok_or_else(|| {
3655 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3656 })?;
3657 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3658 IotaError::ObjectDeserialization {
3659 error: format!("{e}"),
3660 }
3661 })?);
3662 }
3663 Ok(move_objects)
3664 }
3665
3666 #[instrument(level = "trace", skip_all)]
3667 pub fn get_dynamic_fields(
3668 &self,
3669 owner: ObjectID,
3670 cursor: Option<ObjectID>,
3672 limit: usize,
3673 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3674 Ok(self
3675 .get_dynamic_fields_iterator(owner, cursor)?
3676 .take(limit)
3677 .collect::<Result<Vec<_>, _>>()?)
3678 }
3679
3680 fn get_dynamic_fields_iterator(
3681 &self,
3682 owner: ObjectID,
3683 cursor: Option<ObjectID>,
3685 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3686 {
3687 if let Some(indexes) = &self.indexes {
3688 indexes.get_dynamic_fields_iterator(owner, cursor)
3689 } else {
3690 Err(IotaError::IndexStoreNotAvailable)
3691 }
3692 }
3693
3694 #[instrument(level = "trace", skip_all)]
3695 pub fn get_dynamic_field_object_id(
3696 &self,
3697 owner: ObjectID,
3698 name_type: TypeTag,
3699 name_bcs_bytes: &[u8],
3700 ) -> IotaResult<Option<ObjectID>> {
3701 if let Some(indexes) = &self.indexes {
3702 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3703 } else {
3704 Err(IotaError::IndexStoreNotAvailable)
3705 }
3706 }
3707
3708 #[instrument(level = "trace", skip_all)]
3709 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3710 Ok(self.get_indexes()?.next_sequence_number())
3711 }
3712
3713 #[instrument(level = "trace", skip_all)]
3714 pub async fn get_executed_transaction_and_effects(
3715 &self,
3716 digest: TransactionDigest,
3717 kv_store: Arc<TransactionKeyValueStore>,
3718 ) -> IotaResult<(Transaction, TransactionEffects)> {
3719 let transaction = kv_store.get_tx(digest).await?;
3720 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3721 Ok((transaction, effects))
3722 }
3723
3724 #[instrument(level = "trace", skip_all)]
3725 pub fn multi_get_checkpoint_by_sequence_number(
3726 &self,
3727 sequence_numbers: &[CheckpointSequenceNumber],
3728 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3729 Ok(self
3730 .checkpoint_store
3731 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3732 }
3733
3734 #[instrument(level = "trace", skip_all)]
3735 pub fn get_transaction_events(
3736 &self,
3737 digest: &TransactionEventsDigest,
3738 ) -> IotaResult<TransactionEvents> {
3739 self.get_transaction_cache_reader()
3740 .try_get_events(digest)?
3741 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3742 }
3743
3744 pub fn get_transaction_input_objects(
3745 &self,
3746 effects: &TransactionEffects,
3747 ) -> anyhow::Result<Vec<Object>> {
3748 let input_object_keys = effects
3749 .modified_at_versions()
3750 .into_iter()
3751 .map(|(object_id, version)| ObjectKey(object_id, version))
3752 .collect::<Vec<_>>();
3753
3754 let input_objects = self
3755 .get_object_store()
3756 .try_multi_get_objects_by_key(&input_object_keys)?
3757 .into_iter()
3758 .enumerate()
3759 .map(|(idx, maybe_object)| {
3760 maybe_object.ok_or_else(|| {
3761 anyhow::anyhow!(
3762 "missing input object key {:?} from tx {}",
3763 input_object_keys[idx],
3764 effects.transaction_digest()
3765 )
3766 })
3767 })
3768 .collect::<anyhow::Result<Vec<_>>>()?;
3769 Ok(input_objects)
3770 }
3771
3772 pub fn get_transaction_output_objects(
3773 &self,
3774 effects: &TransactionEffects,
3775 ) -> anyhow::Result<Vec<Object>> {
3776 let output_object_keys = effects
3777 .all_changed_objects()
3778 .into_iter()
3779 .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3780 .collect::<Vec<_>>();
3781
3782 let output_objects = self
3783 .get_object_store()
3784 .try_multi_get_objects_by_key(&output_object_keys)?
3785 .into_iter()
3786 .enumerate()
3787 .map(|(idx, maybe_object)| {
3788 maybe_object.ok_or_else(|| {
3789 anyhow::anyhow!(
3790 "missing output object key {:?} from tx {}",
3791 output_object_keys[idx],
3792 effects.transaction_digest()
3793 )
3794 })
3795 })
3796 .collect::<anyhow::Result<Vec<_>>>()?;
3797 Ok(output_objects)
3798 }
3799
3800 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3801 match &self.indexes {
3802 Some(i) => Ok(i.clone()),
3803 None => Err(IotaError::UnsupportedFeature {
3804 error: "extended object indexing is not enabled on this server".into(),
3805 }),
3806 }
3807 }
3808
3809 pub async fn get_transactions_for_tests(
3810 self: &Arc<Self>,
3811 filter: Option<TransactionFilter>,
3812 cursor: Option<TransactionDigest>,
3813 limit: Option<usize>,
3814 reverse: bool,
3815 ) -> IotaResult<Vec<TransactionDigest>> {
3816 let metrics = KeyValueStoreMetrics::new_for_tests();
3817 let kv_store = Arc::new(TransactionKeyValueStore::new(
3818 "rocksdb",
3819 metrics,
3820 self.clone(),
3821 ));
3822 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3823 .await
3824 }
3825
3826 #[instrument(level = "trace", skip_all)]
3827 pub async fn get_transactions(
3828 &self,
3829 kv_store: &Arc<TransactionKeyValueStore>,
3830 filter: Option<TransactionFilter>,
3831 cursor: Option<TransactionDigest>,
3833 limit: Option<usize>,
3834 reverse: bool,
3835 ) -> IotaResult<Vec<TransactionDigest>> {
3836 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3837 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3838 let iter = checkpoint_contents.iter().map(|c| c.transaction);
3839 if reverse {
3840 let iter = iter
3841 .rev()
3842 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3843 .skip(usize::from(cursor.is_some()));
3844 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3845 } else {
3846 let iter = iter
3847 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3848 .skip(usize::from(cursor.is_some()));
3849 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3850 }
3851 }
3852 self.get_indexes()?
3853 .get_transactions(filter, cursor, limit, reverse)
3854 }
3855
3856 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3857 &self.checkpoint_store
3858 }
3859
3860 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3861 self.get_checkpoint_store()
3862 .get_highest_executed_checkpoint_seq_number()?
3863 .ok_or(IotaError::UserInput {
3864 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3865 })
3866 }
3867
3868 #[cfg(msim)]
3869 pub fn get_highest_pruned_checkpoint_for_testing(
3870 &self,
3871 ) -> IotaResult<CheckpointSequenceNumber> {
3872 self.database_for_testing()
3873 .perpetual_tables
3874 .get_highest_pruned_checkpoint()
3875 }
3876
3877 #[instrument(level = "trace", skip_all)]
3878 pub fn get_checkpoint_summary_by_sequence_number(
3879 &self,
3880 sequence_number: CheckpointSequenceNumber,
3881 ) -> IotaResult<CheckpointSummary> {
3882 let verified_checkpoint = self
3883 .get_checkpoint_store()
3884 .get_checkpoint_by_sequence_number(sequence_number)?;
3885 match verified_checkpoint {
3886 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3887 None => Err(IotaError::UserInput {
3888 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3889 }),
3890 }
3891 }
3892
3893 #[instrument(level = "trace", skip_all)]
3894 pub fn get_checkpoint_summary_by_digest(
3895 &self,
3896 digest: CheckpointDigest,
3897 ) -> IotaResult<CheckpointSummary> {
3898 let verified_checkpoint = self
3899 .get_checkpoint_store()
3900 .get_checkpoint_by_digest(&digest)?;
3901 match verified_checkpoint {
3902 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3903 None => Err(IotaError::UserInput {
3904 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3905 }),
3906 }
3907 }
3908
3909 #[instrument(level = "trace", skip_all)]
3910 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3911 if is_system_package(package_id) {
3912 return self.find_genesis_txn_digest();
3913 }
3914 Ok(self
3915 .get_object_read(&package_id)?
3916 .into_object()?
3917 .previous_transaction)
3918 }
3919
3920 #[instrument(level = "trace", skip_all)]
3921 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3922 let summary = self
3923 .get_verified_checkpoint_by_sequence_number(0)?
3924 .into_message();
3925 let content = self.get_checkpoint_contents(summary.content_digest)?;
3926 let genesis_transaction = content.enumerate_transactions(&summary).next();
3927 Ok(genesis_transaction
3928 .ok_or(IotaError::UserInput {
3929 error: UserInputError::GenesisTransactionNotFound,
3930 })?
3931 .1
3932 .transaction)
3933 }
3934
3935 #[instrument(level = "trace", skip_all)]
3936 pub fn get_verified_checkpoint_by_sequence_number(
3937 &self,
3938 sequence_number: CheckpointSequenceNumber,
3939 ) -> IotaResult<VerifiedCheckpoint> {
3940 let verified_checkpoint = self
3941 .get_checkpoint_store()
3942 .get_checkpoint_by_sequence_number(sequence_number)?;
3943 match verified_checkpoint {
3944 Some(verified_checkpoint) => Ok(verified_checkpoint),
3945 None => Err(IotaError::UserInput {
3946 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3947 }),
3948 }
3949 }
3950
3951 #[instrument(level = "trace", skip_all)]
3952 pub fn get_verified_checkpoint_summary_by_digest(
3953 &self,
3954 digest: CheckpointDigest,
3955 ) -> IotaResult<VerifiedCheckpoint> {
3956 let verified_checkpoint = self
3957 .get_checkpoint_store()
3958 .get_checkpoint_by_digest(&digest)?;
3959 match verified_checkpoint {
3960 Some(verified_checkpoint) => Ok(verified_checkpoint),
3961 None => Err(IotaError::UserInput {
3962 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3963 }),
3964 }
3965 }
3966
3967 #[instrument(level = "trace", skip_all)]
3968 pub fn get_checkpoint_contents(
3969 &self,
3970 digest: CheckpointContentsDigest,
3971 ) -> IotaResult<CheckpointContents> {
3972 self.get_checkpoint_store()
3973 .get_checkpoint_contents(&digest)?
3974 .ok_or(IotaError::UserInput {
3975 error: UserInputError::CheckpointContentsNotFound(digest),
3976 })
3977 }
3978
3979 #[instrument(level = "trace", skip_all)]
3980 pub fn get_checkpoint_contents_by_sequence_number(
3981 &self,
3982 sequence_number: CheckpointSequenceNumber,
3983 ) -> IotaResult<CheckpointContents> {
3984 let verified_checkpoint = self
3985 .get_checkpoint_store()
3986 .get_checkpoint_by_sequence_number(sequence_number)?;
3987 match verified_checkpoint {
3988 Some(verified_checkpoint) => {
3989 let content_digest = verified_checkpoint.into_inner().content_digest;
3990 self.get_checkpoint_contents(content_digest)
3991 }
3992 None => Err(IotaError::UserInput {
3993 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3994 }),
3995 }
3996 }
3997
3998 #[instrument(level = "trace", skip_all)]
3999 pub async fn query_events(
4000 &self,
4001 kv_store: &Arc<TransactionKeyValueStore>,
4002 query: EventFilter,
4003 cursor: Option<EventID>,
4005 limit: usize,
4006 descending: bool,
4007 ) -> IotaResult<Vec<IotaEvent>> {
4008 let index_store = self.get_indexes()?;
4009
4010 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4012 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4013 IotaError::TransactionNotFound {
4014 digest: cursor.tx_digest,
4015 },
4016 )?;
4017 (tx_seq, cursor.event_seq as usize)
4018 } else if descending {
4019 (u64::MAX, usize::MAX)
4020 } else {
4021 (0, 0)
4022 };
4023
4024 let limit = limit + 1;
4025 let mut event_keys = match query {
4026 EventFilter::All(filters) => {
4027 if filters.is_empty() {
4028 index_store.all_events(tx_num, event_num, limit, descending)?
4029 } else {
4030 return Err(IotaError::UserInput {
4031 error: UserInputError::Unsupported(
4032 "This query type does not currently support filter combinations"
4033 .to_string(),
4034 ),
4035 });
4036 }
4037 }
4038 EventFilter::Transaction(digest) => {
4039 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4040 }
4041 EventFilter::MoveModule { package, module } => {
4042 let module_id = ModuleId::new(package.into(), module);
4043 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4044 }
4045 EventFilter::MoveEventType(struct_name) => index_store
4046 .events_by_move_event_struct_name(
4047 &struct_name,
4048 tx_num,
4049 event_num,
4050 limit,
4051 descending,
4052 )?,
4053 EventFilter::Sender(sender) => {
4054 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4055 }
4056 EventFilter::TimeRange {
4057 start_time,
4058 end_time,
4059 } => index_store
4060 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4061 EventFilter::MoveEventModule { package, module } => index_store
4062 .events_by_move_event_module(
4063 &ModuleId::new(package.into(), module),
4064 tx_num,
4065 event_num,
4066 limit,
4067 descending,
4068 )?,
4069 EventFilter::Package(_)
4071 | EventFilter::MoveEventField { .. }
4072 | EventFilter::Any(_)
4073 | EventFilter::And(_, _)
4074 | EventFilter::Or(_, _) => {
4075 return Err(IotaError::UserInput {
4076 error: UserInputError::Unsupported(
4077 "This query type is not supported by the full node.".to_string(),
4078 ),
4079 });
4080 }
4081 };
4082
4083 if cursor.is_some() {
4086 if !event_keys.is_empty() {
4087 event_keys.remove(0);
4088 }
4089 } else {
4090 event_keys.truncate(limit - 1);
4091 }
4092
4093 let transaction_digests = event_keys
4095 .iter()
4096 .map(|(_, digest, _, _)| *digest)
4097 .collect::<HashSet<_>>()
4098 .into_iter()
4099 .collect::<Vec<_>>();
4100
4101 let events = kv_store
4102 .multi_get_events_by_tx_digests(&transaction_digests)
4103 .await?;
4104
4105 let events_map: HashMap<_, _> =
4106 transaction_digests.iter().zip(events.into_iter()).collect();
4107
4108 let stored_events = event_keys
4109 .into_iter()
4110 .map(|k| {
4111 (
4112 k,
4113 events_map
4114 .get(&k.1)
4115 .expect("fetched digest is missing")
4116 .clone()
4117 .and_then(|e| e.data.get(k.2).cloned()),
4118 )
4119 })
4120 .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4121 event
4122 .map(|e| (e, tx_digest, event_seq, timestamp))
4123 .ok_or(IotaError::TransactionEventsNotFound { digest })
4124 })
4125 .collect::<Result<Vec<_>, _>>()?;
4126
4127 let epoch_store = self.load_epoch_store_one_call_per_task();
4128 let backing_store = self.get_backing_package_store().as_ref();
4129 let mut layout_resolver = epoch_store
4130 .executor()
4131 .type_layout_resolver(Box::new(backing_store));
4132 let mut events = vec![];
4133 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4134 events.push(IotaEvent::try_from(
4135 e.clone(),
4136 tx_digest,
4137 event_seq as u64,
4138 Some(timestamp),
4139 layout_resolver.get_annotated_layout(&e.type_)?,
4140 )?)
4141 }
4142 Ok(events)
4143 }
4144
4145 pub async fn insert_genesis_object(&self, object: Object) {
4146 self.get_reconfig_api()
4147 .try_insert_genesis_object(object)
4148 .expect("Cannot insert genesis object")
4149 }
4150
4151 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4152 futures::future::join_all(
4153 objects
4154 .iter()
4155 .map(|o| self.insert_genesis_object(o.clone())),
4156 )
4157 .await;
4158 }
4159
4160 #[instrument(level = "trace", skip_all)]
4162 pub fn get_transaction_status(
4163 &self,
4164 transaction_digest: &TransactionDigest,
4165 epoch_store: &Arc<AuthorityPerEpochStore>,
4166 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4167 if let Some(effects) =
4169 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4170 {
4171 if let Some(transaction) = self
4172 .get_transaction_cache_reader()
4173 .try_get_transaction_block(transaction_digest)?
4174 {
4175 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4176 let events = if let Some(digest) = effects.events_digest() {
4177 self.get_transaction_events(digest)?
4178 } else {
4179 TransactionEvents::default()
4180 };
4181 return Ok(Some((
4182 (*transaction).clone().into_message(),
4183 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4184 )));
4185 } else {
4186 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4191 }
4192 }
4193 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4194 self.metrics.tx_already_processed.inc();
4195 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4196 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4197 } else {
4198 Ok(None)
4199 }
4200 }
4201
4202 #[instrument(level = "trace", skip_all)]
4206 pub fn get_signed_effects_and_maybe_resign(
4207 &self,
4208 transaction_digest: &TransactionDigest,
4209 epoch_store: &Arc<AuthorityPerEpochStore>,
4210 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4211 let effects = self
4212 .get_transaction_cache_reader()
4213 .try_get_executed_effects(transaction_digest)?;
4214 match effects {
4215 Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4216 None => Ok(None),
4217 }
4218 }
4219
4220 #[instrument(level = "trace", skip_all)]
4221 pub(crate) fn sign_effects(
4222 &self,
4223 effects: TransactionEffects,
4224 epoch_store: &Arc<AuthorityPerEpochStore>,
4225 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4226 let tx_digest = *effects.transaction_digest();
4227 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4228 Some(sig) if sig.epoch == epoch_store.epoch() => {
4229 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4230 }
4231 _ => {
4232 debug!(
4255 ?tx_digest,
4256 epoch=?epoch_store.epoch(),
4257 "Re-signing the effects with the current epoch"
4258 );
4259
4260 let sig = AuthoritySignInfo::new(
4261 epoch_store.epoch(),
4262 &effects,
4263 Intent::iota_app(IntentScope::TransactionEffects),
4264 self.name,
4265 &*self.secret,
4266 );
4267
4268 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4269
4270 epoch_store.insert_effects_digest_and_signature(
4271 &tx_digest,
4272 effects.digest(),
4273 &sig,
4274 )?;
4275
4276 effects
4277 }
4278 };
4279
4280 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4281 signed_effects,
4282 ))
4283 }
4284
4285 #[instrument(level = "trace", skip_all)]
4287 fn fullnode_only_get_tx_coins_for_indexing(
4288 &self,
4289 inner_temporary_store: &InnerTemporaryStore,
4290 epoch_store: &Arc<AuthorityPerEpochStore>,
4291 ) -> Option<TxCoins> {
4292 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4293 return None;
4294 }
4295 let written_coin_objects = inner_temporary_store
4296 .written
4297 .iter()
4298 .filter_map(|(k, v)| {
4299 if v.is_coin() {
4300 Some((*k, v.clone()))
4301 } else {
4302 None
4303 }
4304 })
4305 .collect();
4306 let input_coin_objects = inner_temporary_store
4307 .input_objects
4308 .iter()
4309 .filter_map(|(k, v)| {
4310 if v.is_coin() {
4311 Some((*k, v.clone()))
4312 } else {
4313 None
4314 }
4315 })
4316 .collect::<ObjectMap>();
4317 Some((input_coin_objects, written_coin_objects))
4318 }
4319
4320 #[instrument(level = "trace", skip_all)]
4332 pub async fn get_transaction_lock(
4333 &self,
4334 object_ref: &ObjectRef,
4335 epoch_store: &AuthorityPerEpochStore,
4336 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4337 let lock_info = self
4338 .get_object_cache_reader()
4339 .try_get_lock(*object_ref, epoch_store)?;
4340 let lock_info = match lock_info {
4341 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4342 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4343 provided_obj_ref: *object_ref,
4344 current_version: locked_ref.1,
4345 }
4346 .into());
4347 }
4348 ObjectLockStatus::Initialized => {
4349 return Ok(None);
4350 }
4351 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4352 };
4353
4354 epoch_store.get_signed_transaction(&lock_info)
4355 }
4356
4357 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4358 self.get_object_cache_reader().try_get_objects(objects)
4359 }
4360
4361 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4363 self.try_get_objects(objects)
4364 .await
4365 .expect("storage access failed")
4366 }
4367
4368 pub async fn try_get_object_or_tombstone(
4369 &self,
4370 object_id: ObjectID,
4371 ) -> IotaResult<Option<ObjectRef>> {
4372 self.get_object_cache_reader()
4373 .try_get_latest_object_ref_or_tombstone(object_id)
4374 }
4375
4376 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4378 self.try_get_object_or_tombstone(object_id)
4379 .await
4380 .expect("storage access failed")
4381 }
4382
4383 pub fn set_override_protocol_upgrade_buffer_stake(
4393 &self,
4394 expected_epoch: EpochId,
4395 buffer_stake_bps: u64,
4396 ) -> IotaResult {
4397 let epoch_store = self.load_epoch_store_one_call_per_task();
4398 let actual_epoch = epoch_store.epoch();
4399 if actual_epoch != expected_epoch {
4400 return Err(IotaError::WrongEpoch {
4401 expected_epoch,
4402 actual_epoch,
4403 });
4404 }
4405
4406 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4407 }
4408
4409 pub fn clear_override_protocol_upgrade_buffer_stake(
4410 &self,
4411 expected_epoch: EpochId,
4412 ) -> IotaResult {
4413 let epoch_store = self.load_epoch_store_one_call_per_task();
4414 let actual_epoch = epoch_store.epoch();
4415 if actual_epoch != expected_epoch {
4416 return Err(IotaError::WrongEpoch {
4417 expected_epoch,
4418 actual_epoch,
4419 });
4420 }
4421
4422 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4423 }
4424
4425 pub async fn get_available_system_packages(
4429 &self,
4430 binary_config: &BinaryConfig,
4431 ) -> Vec<ObjectRef> {
4432 let mut results = vec![];
4433
4434 let system_packages = BuiltInFramework::iter_system_packages();
4435
4436 #[cfg(msim)]
4438 let extra_packages = framework_injection::get_extra_packages(self.name);
4439 #[cfg(msim)]
4440 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4441
4442 for system_package in system_packages {
4443 let modules = system_package.modules().to_vec();
4444 #[cfg(msim)]
4446 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4447 .unwrap_or(modules);
4448
4449 let Some(obj_ref) = iota_framework::compare_system_package(
4450 &self.get_object_store(),
4451 &system_package.id,
4452 &modules,
4453 system_package.dependencies.to_vec(),
4454 binary_config,
4455 )
4456 .await
4457 else {
4458 return vec![];
4459 };
4460 results.push(obj_ref);
4461 }
4462
4463 results
4464 }
4465
4466 async fn get_system_package_bytes(
4483 &self,
4484 system_packages: Vec<ObjectRef>,
4485 binary_config: &BinaryConfig,
4486 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4487 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4488 let objects = self.get_objects(&ids).await;
4489
4490 let mut res = Vec::with_capacity(system_packages.len());
4491 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4492 let prev_transaction = match object {
4493 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4494 info!("Framework {} does not need updating", system_package_ref.0);
4496 continue;
4497 }
4498
4499 Some(cur_object) => cur_object.previous_transaction,
4500 None => TransactionDigest::genesis_marker(),
4501 };
4502
4503 #[cfg(msim)]
4504 let SystemPackage {
4505 id: _,
4506 bytes,
4507 dependencies,
4508 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4509 .unwrap_or_else(|| {
4510 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4511 });
4512
4513 #[cfg(not(msim))]
4514 let SystemPackage {
4515 id: _,
4516 bytes,
4517 dependencies,
4518 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4519
4520 let modules: Vec<_> = bytes
4521 .iter()
4522 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4523 .collect();
4524
4525 let new_object = Object::new_system_package(
4526 &modules,
4527 system_package_ref.1,
4528 dependencies.clone(),
4529 prev_transaction,
4530 );
4531
4532 let new_ref = new_object.compute_object_reference();
4533 if new_ref != system_package_ref {
4534 error!(
4535 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4536 );
4537 return None;
4538 }
4539
4540 res.push((system_package_ref.1, bytes, dependencies));
4541 }
4542
4543 Some(res)
4544 }
4545
4546 fn is_protocol_version_supported_v1(
4550 proposed_protocol_version: ProtocolVersion,
4551 committee: &Committee,
4552 capabilities: Vec<AuthorityCapabilitiesV1>,
4553 mut buffer_stake_bps: u64,
4554 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4555 if buffer_stake_bps > 10000 {
4556 warn!("clamping buffer_stake_bps to 10000");
4557 buffer_stake_bps = 10000;
4558 }
4559
4560 let mut desired_upgrades: Vec<_> = capabilities
4563 .into_iter()
4564 .filter_map(|mut cap| {
4565 if cap.available_system_packages.is_empty() {
4567 return None;
4568 }
4569
4570 cap.available_system_packages.sort();
4571
4572 info!(
4573 "validator {:?} supports {:?} with system packages: {:?}",
4574 cap.authority.concise(),
4575 cap.supported_protocol_versions,
4576 cap.available_system_packages,
4577 );
4578
4579 cap.supported_protocol_versions
4583 .get_version_digest(proposed_protocol_version)
4584 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4585 })
4586 .collect();
4587
4588 desired_upgrades.sort();
4591 desired_upgrades
4592 .into_iter()
4593 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4594 .into_iter()
4595 .find_map(|((digest, packages), group)| {
4596 assert!(!packages.is_empty());
4598
4599 let mut stake_aggregator: StakeAggregator<(), true> =
4600 StakeAggregator::new(Arc::new(committee.clone()));
4601
4602 for (_, _, authority) in group {
4603 stake_aggregator.insert_generic(authority, ());
4604 }
4605
4606 let total_votes = stake_aggregator.total_votes();
4607 let quorum_threshold = committee.quorum_threshold();
4608 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4609
4610 info!(
4611 protocol_config_digest = ?digest,
4612 ?total_votes,
4613 ?quorum_threshold,
4614 ?buffer_stake_bps,
4615 ?effective_threshold,
4616 ?proposed_protocol_version,
4617 ?packages,
4618 "support for upgrade"
4619 );
4620
4621 let has_support = total_votes >= effective_threshold;
4622 has_support.then_some((proposed_protocol_version, digest, packages))
4623 })
4624 }
4625
4626 fn choose_protocol_version_and_system_packages_v1(
4630 current_protocol_version: ProtocolVersion,
4631 current_protocol_digest: Digest,
4632 committee: &Committee,
4633 capabilities: Vec<AuthorityCapabilitiesV1>,
4634 buffer_stake_bps: u64,
4635 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4636 let mut next_protocol_version = current_protocol_version;
4637 let mut system_packages = vec![];
4638 let mut protocol_version_digest = current_protocol_digest;
4639
4640 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4644 next_protocol_version + 1,
4645 committee,
4646 capabilities.clone(),
4647 buffer_stake_bps,
4648 ) {
4649 next_protocol_version = version;
4650 protocol_version_digest = digest;
4651 system_packages = packages;
4652 }
4653
4654 (
4655 next_protocol_version,
4656 protocol_version_digest,
4657 system_packages,
4658 )
4659 }
4660
4661 fn get_validators_supporting_protocol_version(
4666 target_protocol_version: ProtocolVersion,
4667 target_digest: Digest,
4668 active_validators: &[AuthorityPublicKey],
4669 capabilities: &[AuthorityCapabilitiesV1],
4670 ) -> Vec<u64> {
4671 let mut eligible_validators = Vec::new();
4672
4673 for capability in capabilities {
4674 if let Some(digest) = capability
4676 .supported_protocol_versions
4677 .get_version_digest(target_protocol_version)
4678 {
4679 if digest == target_digest {
4680 if let Some(index) = active_validators
4682 .iter()
4683 .position(|name| AuthorityName::from(name) == capability.authority)
4684 {
4685 eligible_validators.push(index as u64);
4686 }
4687 }
4688 }
4689 }
4690
4691 eligible_validators.sort();
4693 eligible_validators
4694 }
4695
4696 fn calculate_eligible_validators_weight(
4701 eligible_validator_indices: &[u64],
4702 active_validators: &[AuthorityPublicKey],
4703 committee: &Committee,
4704 ) -> u64 {
4705 let mut total_weight = 0u64;
4706
4707 for &index in eligible_validator_indices {
4708 let authority_pubkey = &active_validators[index as usize];
4709 if let Some((_, weight)) = committee
4711 .members()
4712 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4713 {
4714 total_weight += weight;
4715 }
4716 }
4717
4718 total_weight
4719 }
4720
4721 #[instrument(level = "debug", skip_all)]
4722 fn create_authenticator_state_tx(
4723 &self,
4724 epoch_store: &Arc<AuthorityPerEpochStore>,
4725 ) -> Option<EndOfEpochTransactionKind> {
4726 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4727 info!("authenticator state transactions not enabled");
4728 return None;
4729 }
4730
4731 let authenticator_state_exists = epoch_store.authenticator_state_exists();
4732 let tx = if authenticator_state_exists {
4733 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4734 let min_epoch =
4735 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4736 let authenticator_obj_initial_shared_version = epoch_store
4737 .epoch_start_config()
4738 .authenticator_obj_initial_shared_version()
4739 .expect("initial version must exist");
4740
4741 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4742 min_epoch,
4743 authenticator_obj_initial_shared_version,
4744 );
4745
4746 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4747
4748 tx
4749 } else {
4750 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4751 info!("Creating AuthenticatorStateCreate tx");
4752 tx
4753 };
4754 Some(tx)
4755 }
4756
4757 #[instrument(level = "error", skip_all)]
4770 pub async fn create_and_execute_advance_epoch_tx(
4771 &self,
4772 epoch_store: &Arc<AuthorityPerEpochStore>,
4773 gas_cost_summary: &GasCostSummary,
4774 checkpoint: CheckpointSequenceNumber,
4775 epoch_start_timestamp_ms: CheckpointTimestamp,
4776 ) -> anyhow::Result<(
4777 IotaSystemState,
4778 Option<SystemEpochInfoEvent>,
4779 TransactionEffects,
4780 )> {
4781 let mut txns = Vec::new();
4782
4783 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4784 txns.push(tx);
4785 }
4786
4787 let next_epoch = epoch_store.epoch() + 1;
4788
4789 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4790 let authority_capabilities = epoch_store
4791 .get_capabilities_v1()
4792 .expect("read capabilities from db cannot fail");
4793 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4794 Self::choose_protocol_version_and_system_packages_v1(
4795 epoch_store.protocol_version(),
4796 SupportedProtocolVersionsWithHashes::protocol_config_digest(
4797 epoch_store.protocol_config(),
4798 ),
4799 epoch_store.committee(),
4800 authority_capabilities.clone(),
4801 buffer_stake_bps,
4802 );
4803
4804 let config = epoch_store.protocol_config();
4808 let binary_config = to_binary_config(config);
4809 let Some(next_epoch_system_package_bytes) = self
4810 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4811 .await
4812 else {
4813 error!(
4814 "upgraded system packages {:?} are not locally available, cannot create \
4815 ChangeEpochTx. validator binary must be upgraded to the correct version!",
4816 next_epoch_system_packages
4817 );
4818 bail!("missing system packages: cannot form ChangeEpochTx");
4828 };
4829
4830 if config.select_committee_from_eligible_validators() {
4834 let active_validators = epoch_store.epoch_start_state().get_active_validators();
4836
4837 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
4838
4839 if config.select_committee_supporting_next_epoch_version() {
4843 eligible_active_validators = Self::get_validators_supporting_protocol_version(
4844 next_epoch_protocol_version,
4845 next_epoch_protocol_digest,
4846 &active_validators,
4847 &authority_capabilities,
4848 );
4849
4850 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
4852 &eligible_active_validators,
4853 &active_validators,
4854 epoch_store.committee(),
4855 );
4856
4857 let committee = epoch_store.committee();
4861 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4862
4863 if eligible_validators_weight < effective_threshold {
4864 error!(
4865 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
4866 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
4867 );
4868 eligible_active_validators = (0..active_validators.len() as u64).collect();
4871 }
4872 }
4873
4874 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
4875 next_epoch,
4876 next_epoch_protocol_version,
4877 gas_cost_summary.storage_cost,
4878 gas_cost_summary.computation_cost,
4879 gas_cost_summary.computation_cost_burned,
4880 gas_cost_summary.storage_rebate,
4881 gas_cost_summary.non_refundable_storage_fee,
4882 epoch_start_timestamp_ms,
4883 next_epoch_system_package_bytes,
4884 eligible_active_validators,
4885 ));
4886 } else if config.protocol_defined_base_fee()
4887 && config.max_committee_members_count_as_option().is_some()
4888 {
4889 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4890 next_epoch,
4891 next_epoch_protocol_version,
4892 gas_cost_summary.storage_cost,
4893 gas_cost_summary.computation_cost,
4894 gas_cost_summary.computation_cost_burned,
4895 gas_cost_summary.storage_rebate,
4896 gas_cost_summary.non_refundable_storage_fee,
4897 epoch_start_timestamp_ms,
4898 next_epoch_system_package_bytes,
4899 ));
4900 } else {
4901 txns.push(EndOfEpochTransactionKind::new_change_epoch(
4902 next_epoch,
4903 next_epoch_protocol_version,
4904 gas_cost_summary.storage_cost,
4905 gas_cost_summary.computation_cost,
4906 gas_cost_summary.storage_rebate,
4907 gas_cost_summary.non_refundable_storage_fee,
4908 epoch_start_timestamp_ms,
4909 next_epoch_system_package_bytes,
4910 ));
4911 }
4912
4913 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4914
4915 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4916 tx.clone(),
4917 epoch_store.epoch(),
4918 checkpoint,
4919 );
4920
4921 let tx_digest = executable_tx.digest();
4922
4923 info!(
4924 ?next_epoch,
4925 ?next_epoch_protocol_version,
4926 ?next_epoch_system_packages,
4927 computation_cost=?gas_cost_summary.computation_cost,
4928 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4929 storage_cost=?gas_cost_summary.storage_cost,
4930 storage_rebate=?gas_cost_summary.storage_rebate,
4931 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4932 ?tx_digest,
4933 "Creating advance epoch transaction"
4934 );
4935
4936 fail_point_async!("change_epoch_tx_delay");
4937 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
4938
4939 if self
4943 .get_transaction_cache_reader()
4944 .try_is_tx_already_executed(tx_digest)?
4945 {
4946 warn!("change epoch tx has already been executed via state sync");
4947 bail!("change epoch tx has already been executed via state sync",);
4948 }
4949
4950 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
4951
4952 epoch_store.assign_shared_object_versions_idempotent(
4956 self.get_object_cache_reader().as_ref(),
4957 std::slice::from_ref(&executable_tx),
4958 )?;
4959
4960 let input_objects =
4961 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4962
4963 let (temporary_store, effects, _execution_error_opt) =
4964 self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4965 let system_obj = get_iota_system_state(&temporary_store.written)
4966 .expect("change epoch tx must write to system object");
4967 let system_epoch_info_event = temporary_store
4969 .events
4970 .data
4971 .into_iter()
4972 .find(|event| event.is_system_epoch_info_event())
4973 .map(SystemEpochInfoEvent::from);
4974 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4977
4978 self.get_state_sync_store()
4982 .try_insert_transaction_and_effects(&tx, &effects)
4983 .map_err(|err| {
4984 let err: anyhow::Error = err.into();
4985 err
4986 })?;
4987
4988 info!(
4989 "Effects summary of the change epoch transaction: {:?}",
4990 effects.summary_for_debug()
4991 );
4992 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
4993 assert!(effects.status().is_ok());
4995 Ok((system_obj, system_epoch_info_event, effects))
4996 }
4997
4998 #[instrument(level = "error", skip_all)]
5002 async fn revert_uncommitted_epoch_transactions(
5003 &self,
5004 epoch_store: &AuthorityPerEpochStore,
5005 ) -> IotaResult {
5006 {
5007 let state = epoch_store.get_reconfig_state_write_lock_guard();
5008 if state.should_accept_user_certs() {
5009 epoch_store.close_user_certs(state);
5018 }
5019 }
5021 let pending_certificates = epoch_store.pending_consensus_certificates();
5022 info!(
5023 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5024 pending_certificates.len(),
5025 pending_certificates,
5026 );
5027 for digest in pending_certificates {
5028 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5029 info!(
5030 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5031 digest
5032 );
5033 continue;
5034 }
5035 info!("Reverting {:?} at the end of epoch", digest);
5036 epoch_store.revert_executed_transaction(&digest)?;
5037 self.get_reconfig_api().try_revert_state_update(&digest)?;
5038 }
5039 info!("All uncommitted local transactions reverted");
5040 Ok(())
5041 }
5042
5043 #[instrument(level = "error", skip_all)]
5044 async fn reopen_epoch_db(
5045 &self,
5046 cur_epoch_store: &AuthorityPerEpochStore,
5047 new_committee: Committee,
5048 epoch_start_configuration: EpochStartConfiguration,
5049 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5050 epoch_last_checkpoint: CheckpointSequenceNumber,
5051 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5052 let new_epoch = new_committee.epoch;
5053 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5054 assert_eq!(
5055 epoch_start_configuration.epoch_start_state().epoch(),
5056 new_committee.epoch
5057 );
5058 fail_point!("before-open-new-epoch-store");
5059 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5060 self.name,
5061 new_committee,
5062 epoch_start_configuration,
5063 self.get_backing_package_store().clone(),
5064 self.get_object_store().clone(),
5065 expensive_safety_check_config,
5066 cur_epoch_store.get_chain_identifier(),
5067 epoch_last_checkpoint,
5068 );
5069 self.epoch_store.store(new_epoch_store.clone());
5070 Ok(new_epoch_store)
5071 }
5072
5073 #[cfg(test)]
5074 pub(crate) fn iter_live_object_set_for_testing(
5075 &self,
5076 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5077 self.get_accumulator_store()
5078 .iter_cached_live_object_set_for_testing()
5079 }
5080
5081 #[cfg(test)]
5082 pub(crate) fn shutdown_execution_for_test(&self) {
5083 self.tx_execution_shutdown
5084 .lock()
5085 .take()
5086 .unwrap()
5087 .send(())
5088 .unwrap();
5089 }
5090
5091 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5094 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5095 self.get_object_cache_reader()
5096 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5097 self.get_reconfig_api()
5098 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5099 }
5100}
5101
5102pub struct RandomnessRoundReceiver {
5103 authority_state: Arc<AuthorityState>,
5104 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5105}
5106
5107impl RandomnessRoundReceiver {
5108 pub fn spawn(
5109 authority_state: Arc<AuthorityState>,
5110 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5111 ) -> JoinHandle<()> {
5112 let rrr = RandomnessRoundReceiver {
5113 authority_state,
5114 randomness_rx,
5115 };
5116 spawn_monitored_task!(rrr.run())
5117 }
5118
5119 async fn run(mut self) {
5120 info!("RandomnessRoundReceiver event loop started");
5121
5122 loop {
5123 tokio::select! {
5124 maybe_recv = self.randomness_rx.recv() => {
5125 if let Some((epoch, round, bytes)) = maybe_recv {
5126 self.handle_new_randomness(epoch, round, bytes);
5127 } else {
5128 break;
5129 }
5130 },
5131 }
5132 }
5133
5134 info!("RandomnessRoundReceiver event loop ended");
5135 }
5136
5137 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5138 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5139 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5140 if epoch_store.epoch() != epoch {
5141 warn!(
5142 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5143 epoch_store.epoch()
5144 );
5145 return;
5146 }
5147 let transaction = VerifiedTransaction::new_randomness_state_update(
5148 epoch,
5149 round,
5150 bytes,
5151 epoch_store
5152 .epoch_start_config()
5153 .randomness_obj_initial_shared_version(),
5154 );
5155 debug!(
5156 "created randomness state update transaction with digest: {:?}",
5157 transaction.digest()
5158 );
5159 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5160 let digest = *transaction.digest();
5161
5162 self.authority_state
5167 .get_cache_commit()
5168 .persist_transaction(&transaction);
5169
5170 self.authority_state
5172 .transaction_manager()
5173 .enqueue(vec![transaction], &epoch_store);
5174
5175 let authority_state = self.authority_state.clone();
5176 spawn_monitored_task!(async move {
5177 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5186 let result = tokio::time::timeout(
5187 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5188 authority_state
5189 .get_transaction_cache_reader()
5190 .try_notify_read_executed_effects(&[digest]),
5191 )
5192 .await;
5193 let result = match result {
5194 Ok(result) => result,
5195 Err(_) => {
5196 if cfg!(debug_assertions) {
5197 panic!(
5199 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5200 );
5201 }
5202 warn!(
5203 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5204 );
5205 authority_state
5207 .get_transaction_cache_reader()
5208 .try_notify_read_executed_effects(&[digest])
5209 .await
5210 }
5211 };
5212
5213 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5214 let effects = effects.pop().expect("should return effects");
5215 if *effects.status() != ExecutionStatus::Success {
5216 fatal!(
5217 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5218 );
5219 }
5220 debug!(
5221 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5222 );
5223 });
5224 }
5225}
5226
5227#[async_trait]
5228impl TransactionKeyValueStoreTrait for AuthorityState {
5229 async fn multi_get(
5230 &self,
5231 transaction_keys: &[TransactionDigest],
5232 effects_keys: &[TransactionDigest],
5233 ) -> IotaResult<KVStoreTransactionData> {
5234 let txns = if !transaction_keys.is_empty() {
5235 self.get_transaction_cache_reader()
5236 .try_multi_get_transaction_blocks(transaction_keys)?
5237 .into_iter()
5238 .map(|t| t.map(|t| (*t).clone().into_inner()))
5239 .collect()
5240 } else {
5241 vec![]
5242 };
5243
5244 let fx = if !effects_keys.is_empty() {
5245 self.get_transaction_cache_reader()
5246 .try_multi_get_executed_effects(effects_keys)?
5247 } else {
5248 vec![]
5249 };
5250
5251 Ok((txns, fx))
5252 }
5253
5254 async fn multi_get_checkpoints(
5255 &self,
5256 checkpoint_summaries: &[CheckpointSequenceNumber],
5257 checkpoint_contents: &[CheckpointSequenceNumber],
5258 checkpoint_summaries_by_digest: &[CheckpointDigest],
5259 ) -> IotaResult<(
5260 Vec<Option<CertifiedCheckpointSummary>>,
5261 Vec<Option<CheckpointContents>>,
5262 Vec<Option<CertifiedCheckpointSummary>>,
5263 )> {
5264 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5266 let store = self.get_checkpoint_store();
5267 for seq in checkpoint_summaries {
5268 let checkpoint = store
5269 .get_checkpoint_by_sequence_number(*seq)?
5270 .map(|c| c.into_inner());
5271
5272 summaries.push(checkpoint);
5273 }
5274
5275 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5276 for seq in checkpoint_contents {
5277 let checkpoint = store
5278 .get_checkpoint_by_sequence_number(*seq)?
5279 .and_then(|summary| {
5280 store
5281 .get_checkpoint_contents(&summary.content_digest)
5282 .expect("db read cannot fail")
5283 });
5284 contents.push(checkpoint);
5285 }
5286
5287 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5288 for digest in checkpoint_summaries_by_digest {
5289 let checkpoint = store
5290 .get_checkpoint_by_digest(digest)?
5291 .map(|c| c.into_inner());
5292 summaries_by_digest.push(checkpoint);
5293 }
5294
5295 Ok((summaries, contents, summaries_by_digest))
5296 }
5297
5298 async fn get_transaction_perpetual_checkpoint(
5299 &self,
5300 digest: TransactionDigest,
5301 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5302 self.get_checkpoint_cache()
5303 .try_get_transaction_perpetual_checkpoint(&digest)
5304 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5305 }
5306
5307 async fn get_object(
5308 &self,
5309 object_id: ObjectID,
5310 version: VersionNumber,
5311 ) -> IotaResult<Option<Object>> {
5312 self.get_object_cache_reader()
5313 .try_get_object_by_key(&object_id, version)
5314 }
5315
5316 async fn multi_get_transactions_perpetual_checkpoints(
5317 &self,
5318 digests: &[TransactionDigest],
5319 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5320 let res = self
5321 .get_checkpoint_cache()
5322 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5323
5324 Ok(res
5325 .into_iter()
5326 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5327 .collect())
5328 }
5329
5330 #[instrument(skip(self))]
5331 async fn multi_get_events_by_tx_digests(
5332 &self,
5333 digests: &[TransactionDigest],
5334 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5335 if digests.is_empty() {
5336 return Ok(vec![]);
5337 }
5338 let events_digests: Vec<_> = self
5339 .get_transaction_cache_reader()
5340 .try_multi_get_executed_effects(digests)?
5341 .into_iter()
5342 .map(|t| t.and_then(|t| t.events_digest().cloned()))
5343 .collect();
5344 let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5345 let mut events = self
5346 .get_transaction_cache_reader()
5347 .try_multi_get_events(&non_empty_events)?
5348 .into_iter();
5349 Ok(events_digests
5350 .into_iter()
5351 .map(|ev| ev.and_then(|_| events.next()?))
5352 .collect())
5353 }
5354}
5355
5356#[cfg(msim)]
5357pub mod framework_injection {
5358 use std::{
5359 cell::RefCell,
5360 collections::{BTreeMap, BTreeSet},
5361 };
5362
5363 use iota_framework::{BuiltInFramework, SystemPackage};
5364 use iota_types::{
5365 base_types::{AuthorityName, ObjectID},
5366 is_system_package,
5367 };
5368 use move_binary_format::CompiledModule;
5369
5370 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5371
5372 thread_local! {
5374 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5375 }
5376
5377 type Framework = Vec<CompiledModule>;
5378
5379 pub type PackageUpgradeCallback =
5380 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5381
5382 enum PackageOverrideConfig {
5383 Global(Framework),
5384 PerValidator(PackageUpgradeCallback),
5385 }
5386
5387 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5388 modules
5389 .iter()
5390 .map(|m| {
5391 let mut buf = Vec::new();
5392 m.serialize_with_version(m.version, &mut buf).unwrap();
5393 buf
5394 })
5395 .collect()
5396 }
5397
5398 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5399 OVERRIDE.with(|bs| {
5400 bs.borrow_mut()
5401 .insert(package_id, PackageOverrideConfig::Global(modules))
5402 });
5403 }
5404
5405 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5406 OVERRIDE.with(|bs| {
5407 bs.borrow_mut()
5408 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5409 });
5410 }
5411
5412 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5413 OVERRIDE.with(|cfg| {
5414 cfg.borrow().get(package_id).and_then(|entry| match entry {
5415 PackageOverrideConfig::Global(framework) => {
5416 Some(compiled_modules_to_bytes(framework))
5417 }
5418 PackageOverrideConfig::PerValidator(func) => {
5419 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5420 }
5421 })
5422 })
5423 }
5424
5425 pub fn get_override_modules(
5426 package_id: &ObjectID,
5427 name: AuthorityName,
5428 ) -> Option<Vec<CompiledModule>> {
5429 OVERRIDE.with(|cfg| {
5430 cfg.borrow().get(package_id).and_then(|entry| match entry {
5431 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5432 PackageOverrideConfig::PerValidator(func) => func(name),
5433 })
5434 })
5435 }
5436
5437 pub fn get_override_system_package(
5438 package_id: &ObjectID,
5439 name: AuthorityName,
5440 ) -> Option<SystemPackage> {
5441 let bytes = get_override_bytes(package_id, name)?;
5442 let dependencies = if is_system_package(*package_id) {
5443 BuiltInFramework::get_package_by_id(package_id)
5444 .dependencies
5445 .to_vec()
5446 } else {
5447 BuiltInFramework::all_package_ids()
5450 };
5451 Some(SystemPackage {
5452 id: *package_id,
5453 bytes,
5454 dependencies,
5455 })
5456 }
5457
5458 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5459 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5460 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5461 cfg.borrow()
5462 .keys()
5463 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5464 .collect()
5465 });
5466
5467 extra
5468 .into_iter()
5469 .map(|package| SystemPackage {
5470 id: package,
5471 bytes: get_override_bytes(&package, name).unwrap(),
5472 dependencies: BuiltInFramework::all_package_ids(),
5473 })
5474 .collect()
5475 }
5476}
5477
5478#[derive(Debug, Serialize, Deserialize, Clone)]
5479pub struct ObjDumpFormat {
5480 pub id: ObjectID,
5481 pub version: VersionNumber,
5482 pub digest: ObjectDigest,
5483 pub object: Object,
5484}
5485
5486impl ObjDumpFormat {
5487 fn new(object: Object) -> Self {
5488 let oref = object.compute_object_reference();
5489 Self {
5490 id: oref.0,
5491 version: oref.1,
5492 digest: oref.2,
5493 object,
5494 }
5495 }
5496}
5497
5498#[derive(Debug, Serialize, Deserialize, Clone)]
5499pub struct NodeStateDump {
5500 pub tx_digest: TransactionDigest,
5501 pub sender_signed_data: SenderSignedData,
5502 pub executed_epoch: u64,
5503 pub reference_gas_price: u64,
5504 pub protocol_version: u64,
5505 pub epoch_start_timestamp_ms: u64,
5506 pub computed_effects: TransactionEffects,
5507 pub expected_effects_digest: TransactionEffectsDigest,
5508 pub relevant_system_packages: Vec<ObjDumpFormat>,
5509 pub shared_objects: Vec<ObjDumpFormat>,
5510 pub loaded_child_objects: Vec<ObjDumpFormat>,
5511 pub modified_at_versions: Vec<ObjDumpFormat>,
5512 pub runtime_reads: Vec<ObjDumpFormat>,
5513 pub input_objects: Vec<ObjDumpFormat>,
5514}
5515
5516impl NodeStateDump {
5517 pub fn new(
5518 tx_digest: &TransactionDigest,
5519 effects: &TransactionEffects,
5520 expected_effects_digest: TransactionEffectsDigest,
5521 object_store: &dyn ObjectStore,
5522 epoch_store: &Arc<AuthorityPerEpochStore>,
5523 inner_temporary_store: &InnerTemporaryStore,
5524 certificate: &VerifiedExecutableTransaction,
5525 ) -> IotaResult<Self> {
5526 let executed_epoch = epoch_store.epoch();
5528 let reference_gas_price = epoch_store.reference_gas_price();
5529 let epoch_start_config = epoch_store.epoch_start_config();
5530 let protocol_version = epoch_store.protocol_version().as_u64();
5531 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5532
5533 let mut relevant_system_packages = Vec::new();
5535 for sys_package_id in BuiltInFramework::all_package_ids() {
5536 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5537 relevant_system_packages.push(ObjDumpFormat::new(w))
5538 }
5539 }
5540
5541 let mut shared_objects = Vec::new();
5543 for kind in effects.input_shared_objects() {
5544 match kind {
5545 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5546 if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5547 shared_objects.push(ObjDumpFormat::new(w))
5548 }
5549 }
5550 InputSharedObject::ReadDeleted(..)
5551 | InputSharedObject::MutateDeleted(..)
5552 | InputSharedObject::Cancelled(..) => (), }
5555 }
5556
5557 let mut loaded_child_objects = Vec::new();
5560 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5561 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5562 loaded_child_objects.push(ObjDumpFormat::new(w))
5563 }
5564 }
5565
5566 let mut modified_at_versions = Vec::new();
5568 for (id, ver) in effects.modified_at_versions() {
5569 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5570 modified_at_versions.push(ObjDumpFormat::new(w))
5571 }
5572 }
5573
5574 let mut runtime_reads = Vec::new();
5578 for obj in inner_temporary_store
5579 .runtime_packages_loaded_from_db
5580 .values()
5581 {
5582 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5583 }
5584
5585 Ok(Self {
5588 tx_digest: *tx_digest,
5589 executed_epoch,
5590 reference_gas_price,
5591 epoch_start_timestamp_ms,
5592 protocol_version,
5593 relevant_system_packages,
5594 shared_objects,
5595 loaded_child_objects,
5596 modified_at_versions,
5597 runtime_reads,
5598 sender_signed_data: certificate.clone().into_message(),
5599 input_objects: inner_temporary_store
5600 .input_objects
5601 .values()
5602 .map(|o| ObjDumpFormat::new(o.clone()))
5603 .collect(),
5604 computed_effects: effects.clone(),
5605 expected_effects_digest,
5606 })
5607 }
5608
5609 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5610 let mut objects = Vec::new();
5611 objects.extend(self.relevant_system_packages.clone());
5612 objects.extend(self.shared_objects.clone());
5613 objects.extend(self.loaded_child_objects.clone());
5614 objects.extend(self.modified_at_versions.clone());
5615 objects.extend(self.runtime_reads.clone());
5616 objects.extend(self.input_objects.clone());
5617 objects
5618 }
5619
5620 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5621 let file_name = format!(
5622 "{}_{}_NODE_DUMP.json",
5623 self.tx_digest,
5624 AuthorityState::unixtime_now_ms()
5625 );
5626 let mut path = path.to_path_buf();
5627 path.push(&file_name);
5628 let mut file = File::create(path.clone())?;
5629 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5630 Ok(path)
5631 }
5632
5633 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5634 let file = File::open(path)?;
5635 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5636 }
5637}