1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49 key_value_store::{
50 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51 },
52 key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57 IOTA_SYSTEM_ADDRESS, TypeTag,
58 authenticator_state::get_authenticator_state,
59 base_types::*,
60 committee::{Committee, EpochId, ProtocolVersion},
61 crypto::{
62 AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
63 default_hash,
64 },
65 deny_list_v1::check_coin_deny_list_v1_during_signing,
66 digests::{ChainIdentifier, Digest, TransactionEventsDigest},
67 dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
68 effects::{
69 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
70 TransactionEvents, VerifiedSignedTransactionEffects,
71 },
72 error::{ExecutionError, IotaError, IotaResult, UserInputError},
73 event::{Event, EventID, SystemEpochInfoEvent},
74 executable_transaction::VerifiedExecutableTransaction,
75 execution_config_utils::to_binary_config,
76 execution_status::ExecutionStatus,
77 gas::{GasCostSummary, IotaGasStatus},
78 gas_coin::NANOS_PER_IOTA,
79 inner_temporary_store::{
80 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
81 WrittenObjects,
82 },
83 iota_system_state::{
84 IotaSystemState, IotaSystemStateTrait,
85 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
86 },
87 is_system_package,
88 layout_resolver::{LayoutResolver, into_struct_layout},
89 message_envelope::Message,
90 messages_checkpoint::{
91 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
92 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
93 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
94 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
95 },
96 messages_consensus::AuthorityCapabilitiesV1,
97 messages_grpc::{
98 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
99 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
100 TransactionStatus,
101 },
102 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
103 object::{
104 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
105 bounded_visitor::BoundedVisitor,
106 },
107 storage::{
108 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
109 },
110 supported_protocol_versions::{
111 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
112 },
113 transaction::*,
114 transaction_executor::{SimulateTransactionResult, VmChecks},
115};
116use itertools::Itertools;
117use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
118use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
119use parking_lot::Mutex;
120use prometheus::{
121 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
122 register_histogram_vec_with_registry, register_histogram_with_registry,
123 register_int_counter_vec_with_registry, register_int_counter_with_registry,
124 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
125};
126use serde::{Deserialize, Serialize, de::DeserializeOwned};
127use tap::TapFallible;
128use tokio::{
129 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
130 task::JoinHandle,
131};
132use tracing::{debug, error, info, instrument, trace, warn};
133use typed_store::TypedStoreError;
134
135use self::{
136 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
137};
138#[cfg(msim)]
139pub use crate::checkpoints::checkpoint_executor::utils::{
140 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
141};
142use crate::{
143 authority::{
144 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
145 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
146 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
147 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
148 authority_store_tables::AuthorityPrunerTables,
149 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
150 },
151 authority_client::NetworkAuthorityClient,
152 checkpoints::CheckpointStore,
153 congestion_tracker::CongestionTracker,
154 consensus_adapter::ConsensusAdapter,
155 epoch::committee_store::CommitteeStore,
156 execution_cache::{
157 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
158 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
159 TransactionCacheRead,
160 },
161 execution_driver::execution_process,
162 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
163 metrics::{LatencyObserver, RateTracker},
164 module_cache_metrics::ResolverMetrics,
165 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
166 rest_index::RestIndexStore,
167 stake_aggregator::StakeAggregator,
168 state_accumulator::{AccumulatorStore, StateAccumulator},
169 subscription_handler::SubscriptionHandler,
170 transaction_input_loader::TransactionInputLoader,
171 transaction_manager::TransactionManager,
172 transaction_outputs::TransactionOutputs,
173 validator_tx_finalizer::ValidatorTxFinalizer,
174 verify_indexes::verify_indexes,
175};
176
177#[cfg(test)]
178#[path = "unit_tests/authority_tests.rs"]
179pub mod authority_tests;
180
181#[cfg(test)]
182#[path = "unit_tests/transaction_tests.rs"]
183pub mod transaction_tests;
184
185#[cfg(test)]
186#[path = "unit_tests/batch_transaction_tests.rs"]
187mod batch_transaction_tests;
188
189#[cfg(test)]
190#[path = "unit_tests/move_integration_tests.rs"]
191pub mod move_integration_tests;
192
193#[cfg(test)]
194#[path = "unit_tests/gas_tests.rs"]
195mod gas_tests;
196
197#[cfg(test)]
198#[path = "unit_tests/batch_verification_tests.rs"]
199mod batch_verification_tests;
200
201#[cfg(test)]
202#[path = "unit_tests/coin_deny_list_tests.rs"]
203mod coin_deny_list_tests;
204
205#[cfg(test)]
206#[path = "unit_tests/auth_unit_test_utils.rs"]
207pub mod auth_unit_test_utils;
208
209pub mod authority_test_utils;
210
211pub mod authority_per_epoch_store;
212pub mod authority_per_epoch_store_pruner;
213
214pub mod authority_store_pruner;
215pub mod authority_store_tables;
216pub mod authority_store_types;
217pub mod epoch_start_configuration;
218pub mod shared_object_congestion_tracker;
219pub mod shared_object_version_manager;
220pub mod suggested_gas_price_calculator;
221pub mod test_authority_builder;
222pub mod transaction_deferral;
223
224pub(crate) mod authority_store;
225pub mod backpressure;
226
227pub struct AuthorityMetrics {
229 tx_orders: IntCounter,
230 total_certs: IntCounter,
231 total_cert_attempts: IntCounter,
232 total_effects: IntCounter,
233 pub shared_obj_tx: IntCounter,
234 sponsored_tx: IntCounter,
235 tx_already_processed: IntCounter,
236 num_input_objs: Histogram,
237 num_shared_objects: Histogram,
238 batch_size: Histogram,
239
240 authority_state_handle_transaction_latency: Histogram,
241
242 execute_certificate_latency_single_writer: Histogram,
243 execute_certificate_latency_shared_object: Histogram,
244
245 internal_execution_latency: Histogram,
246 execution_load_input_objects_latency: Histogram,
247 prepare_certificate_latency: Histogram,
248 commit_certificate_latency: Histogram,
249 db_checkpoint_latency: Histogram,
250
251 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
252 pub(crate) transaction_manager_num_missing_objects: IntGauge,
253 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
254 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
255 pub(crate) transaction_manager_num_ready: IntGauge,
256 pub(crate) transaction_manager_object_cache_size: IntGauge,
257 pub(crate) transaction_manager_object_cache_hits: IntCounter,
258 pub(crate) transaction_manager_object_cache_misses: IntCounter,
259 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
260 pub(crate) transaction_manager_package_cache_size: IntGauge,
261 pub(crate) transaction_manager_package_cache_hits: IntCounter,
262 pub(crate) transaction_manager_package_cache_misses: IntCounter,
263 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
264 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
265
266 pub(crate) execution_driver_executed_transactions: IntCounter,
267 pub(crate) execution_driver_dispatch_queue: IntGauge,
268 pub(crate) execution_queueing_delay_s: Histogram,
269 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
270 pub(crate) execution_gas_latency_ratio: Histogram,
271
272 pub(crate) skipped_consensus_txns: IntCounter,
273 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
274
275 pub(crate) authority_overload_status: IntGauge,
276 pub(crate) authority_load_shedding_percentage: IntGauge,
277
278 pub(crate) transaction_overload_sources: IntCounterVec,
279
280 post_processing_total_events_emitted: IntCounter,
282 post_processing_total_tx_indexed: IntCounter,
283 post_processing_total_tx_had_event_processed: IntCounter,
284 post_processing_total_failures: IntCounter,
285
286 pub consensus_handler_processed: IntCounterVec,
288 pub consensus_handler_transaction_sizes: HistogramVec,
289 pub consensus_handler_num_low_scoring_authorities: IntGauge,
290 pub consensus_handler_scores: IntGaugeVec,
291 pub consensus_handler_deferred_transactions: IntCounter,
292 pub consensus_handler_congested_transactions: IntCounter,
293 pub consensus_handler_cancelled_transactions: IntCounter,
294 pub consensus_handler_max_object_costs: IntGaugeVec,
295 pub consensus_committed_subdags: IntCounterVec,
296 pub consensus_committed_messages: IntGaugeVec,
297 pub consensus_committed_user_transactions: IntGaugeVec,
298 pub consensus_handler_leader_round: IntGauge,
299 pub consensus_calculated_throughput: IntGauge,
300 pub consensus_calculated_throughput_profile: IntGauge,
301
302 pub limits_metrics: Arc<LimitsMetrics>,
303
304 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
306
307 pub zklogin_sig_count: IntCounter,
309 pub multisig_sig_count: IntCounter,
311
312 pub execution_queueing_latency: LatencyObserver,
315
316 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
323
324 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
327}
328
329const POSITIVE_INT_BUCKETS: &[f64] = &[
331 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
332 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
333 7000000., 10000000.,
334];
335
336const LATENCY_SEC_BUCKETS: &[f64] = &[
337 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
338 10., 20., 30., 60., 90.,
339];
340
341const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
343 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
344 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
345];
346
347const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
348 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
349 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
350];
351
352pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
356 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
357 let execute_certificate_latency = register_histogram_vec_with_registry!(
358 "authority_state_execute_certificate_latency",
359 "Latency of executing certificates, including waiting for inputs",
360 &["tx_type"],
361 LATENCY_SEC_BUCKETS.to_vec(),
362 registry,
363 )
364 .unwrap();
365
366 let execute_certificate_latency_single_writer =
367 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
368 let execute_certificate_latency_shared_object =
369 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
370
371 Self {
372 tx_orders: register_int_counter_with_registry!(
373 "total_transaction_orders",
374 "Total number of transaction orders",
375 registry,
376 )
377 .unwrap(),
378 total_certs: register_int_counter_with_registry!(
379 "total_transaction_certificates",
380 "Total number of transaction certificates handled",
381 registry,
382 )
383 .unwrap(),
384 total_cert_attempts: register_int_counter_with_registry!(
385 "total_handle_certificate_attempts",
386 "Number of calls to handle_certificate",
387 registry,
388 )
389 .unwrap(),
390 total_effects: register_int_counter_with_registry!(
392 "total_transaction_effects",
393 "Total number of transaction effects produced",
394 registry,
395 )
396 .unwrap(),
397
398 shared_obj_tx: register_int_counter_with_registry!(
399 "num_shared_obj_tx",
400 "Number of transactions involving shared objects",
401 registry,
402 )
403 .unwrap(),
404
405 sponsored_tx: register_int_counter_with_registry!(
406 "num_sponsored_tx",
407 "Number of sponsored transactions",
408 registry,
409 )
410 .unwrap(),
411
412 tx_already_processed: register_int_counter_with_registry!(
413 "num_tx_already_processed",
414 "Number of transaction orders already processed previously",
415 registry,
416 )
417 .unwrap(),
418 num_input_objs: register_histogram_with_registry!(
419 "num_input_objects",
420 "Distribution of number of input TX objects per TX",
421 POSITIVE_INT_BUCKETS.to_vec(),
422 registry,
423 )
424 .unwrap(),
425 num_shared_objects: register_histogram_with_registry!(
426 "num_shared_objects",
427 "Number of shared input objects per TX",
428 POSITIVE_INT_BUCKETS.to_vec(),
429 registry,
430 )
431 .unwrap(),
432 batch_size: register_histogram_with_registry!(
433 "batch_size",
434 "Distribution of size of transaction batch",
435 POSITIVE_INT_BUCKETS.to_vec(),
436 registry,
437 )
438 .unwrap(),
439 authority_state_handle_transaction_latency: register_histogram_with_registry!(
440 "authority_state_handle_transaction_latency",
441 "Latency of handling transactions",
442 LATENCY_SEC_BUCKETS.to_vec(),
443 registry,
444 )
445 .unwrap(),
446 execute_certificate_latency_single_writer,
447 execute_certificate_latency_shared_object,
448 internal_execution_latency: register_histogram_with_registry!(
449 "authority_state_internal_execution_latency",
450 "Latency of actual certificate executions",
451 LATENCY_SEC_BUCKETS.to_vec(),
452 registry,
453 )
454 .unwrap(),
455 execution_load_input_objects_latency: register_histogram_with_registry!(
456 "authority_state_execution_load_input_objects_latency",
457 "Latency of loading input objects for execution",
458 LOW_LATENCY_SEC_BUCKETS.to_vec(),
459 registry,
460 )
461 .unwrap(),
462 prepare_certificate_latency: register_histogram_with_registry!(
463 "authority_state_prepare_certificate_latency",
464 "Latency of executing certificates, before committing the results",
465 LATENCY_SEC_BUCKETS.to_vec(),
466 registry,
467 )
468 .unwrap(),
469 commit_certificate_latency: register_histogram_with_registry!(
470 "authority_state_commit_certificate_latency",
471 "Latency of committing certificate execution results",
472 LATENCY_SEC_BUCKETS.to_vec(),
473 registry,
474 )
475 .unwrap(),
476 db_checkpoint_latency: register_histogram_with_registry!(
477 "db_checkpoint_latency",
478 "Latency of checkpointing dbs",
479 LATENCY_SEC_BUCKETS.to_vec(),
480 registry,
481 ).unwrap(),
482 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
483 "transaction_manager_num_enqueued_certificates",
484 "Current number of certificates enqueued to TransactionManager",
485 &["result"],
486 registry,
487 )
488 .unwrap(),
489 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
490 "transaction_manager_num_missing_objects",
491 "Current number of missing objects in TransactionManager",
492 registry,
493 )
494 .unwrap(),
495 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
496 "transaction_manager_num_pending_certificates",
497 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
498 registry,
499 )
500 .unwrap(),
501 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
502 "transaction_manager_num_executing_certificates",
503 "Number of executing certificates, including queued and actually running certificates",
504 registry,
505 )
506 .unwrap(),
507 transaction_manager_num_ready: register_int_gauge_with_registry!(
508 "transaction_manager_num_ready",
509 "Number of ready transactions in TransactionManager",
510 registry,
511 )
512 .unwrap(),
513 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
514 "transaction_manager_object_cache_size",
515 "Current size of object-availability cache in TransactionManager",
516 registry,
517 )
518 .unwrap(),
519 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
520 "transaction_manager_object_cache_hits",
521 "Number of object-availability cache hits in TransactionManager",
522 registry,
523 )
524 .unwrap(),
525 authority_overload_status: register_int_gauge_with_registry!(
526 "authority_overload_status",
527 "Whether authority is current experiencing overload and enters load shedding mode.",
528 registry)
529 .unwrap(),
530 authority_load_shedding_percentage: register_int_gauge_with_registry!(
531 "authority_load_shedding_percentage",
532 "The percentage of transactions is shed when the authority is in load shedding mode.",
533 registry)
534 .unwrap(),
535 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
536 "transaction_manager_object_cache_misses",
537 "Number of object-availability cache misses in TransactionManager",
538 registry,
539 )
540 .unwrap(),
541 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
542 "transaction_manager_object_cache_evictions",
543 "Number of object-availability cache evictions in TransactionManager",
544 registry,
545 )
546 .unwrap(),
547 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
548 "transaction_manager_package_cache_size",
549 "Current size of package-availability cache in TransactionManager",
550 registry,
551 )
552 .unwrap(),
553 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
554 "transaction_manager_package_cache_hits",
555 "Number of package-availability cache hits in TransactionManager",
556 registry,
557 )
558 .unwrap(),
559 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
560 "transaction_manager_package_cache_misses",
561 "Number of package-availability cache misses in TransactionManager",
562 registry,
563 )
564 .unwrap(),
565 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
566 "transaction_manager_package_cache_evictions",
567 "Number of package-availability cache evictions in TransactionManager",
568 registry,
569 )
570 .unwrap(),
571 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
572 "transaction_manager_transaction_queue_age_s",
573 "Time spent in waiting for transaction in the queue",
574 LATENCY_SEC_BUCKETS.to_vec(),
575 registry,
576 )
577 .unwrap(),
578 transaction_overload_sources: register_int_counter_vec_with_registry!(
579 "transaction_overload_sources",
580 "Number of times each source indicates transaction overload.",
581 &["source"],
582 registry)
583 .unwrap(),
584 execution_driver_executed_transactions: register_int_counter_with_registry!(
585 "execution_driver_executed_transactions",
586 "Cumulative number of transaction executed by execution driver",
587 registry,
588 )
589 .unwrap(),
590 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
591 "execution_driver_dispatch_queue",
592 "Number of transaction pending in execution driver dispatch queue",
593 registry,
594 )
595 .unwrap(),
596 execution_queueing_delay_s: register_histogram_with_registry!(
597 "execution_queueing_delay_s",
598 "Queueing delay between a transaction is ready for execution until it starts executing.",
599 LATENCY_SEC_BUCKETS.to_vec(),
600 registry
601 )
602 .unwrap(),
603 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
604 "prepare_cert_gas_latency_ratio",
605 "The ratio of computation gas divided by VM execution latency.",
606 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
607 registry
608 )
609 .unwrap(),
610 execution_gas_latency_ratio: register_histogram_with_registry!(
611 "execution_gas_latency_ratio",
612 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
613 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
614 registry
615 )
616 .unwrap(),
617 skipped_consensus_txns: register_int_counter_with_registry!(
618 "skipped_consensus_txns",
619 "Total number of consensus transactions skipped",
620 registry,
621 )
622 .unwrap(),
623 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
624 "skipped_consensus_txns_cache_hit",
625 "Total number of consensus transactions skipped because of local cache hit",
626 registry,
627 )
628 .unwrap(),
629 post_processing_total_events_emitted: register_int_counter_with_registry!(
630 "post_processing_total_events_emitted",
631 "Total number of events emitted in post processing",
632 registry,
633 )
634 .unwrap(),
635 post_processing_total_tx_indexed: register_int_counter_with_registry!(
636 "post_processing_total_tx_indexed",
637 "Total number of txes indexed in post processing",
638 registry,
639 )
640 .unwrap(),
641 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
642 "post_processing_total_tx_had_event_processed",
643 "Total number of txes finished event processing in post processing",
644 registry,
645 )
646 .unwrap(),
647 post_processing_total_failures: register_int_counter_with_registry!(
648 "post_processing_total_failures",
649 "Total number of failure in post processing",
650 registry,
651 )
652 .unwrap(),
653 consensus_handler_processed: register_int_counter_vec_with_registry!(
654 "consensus_handler_processed",
655 "Number of transactions processed by consensus handler",
656 &["class"],
657 registry
658 ).unwrap(),
659 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
660 "consensus_handler_transaction_sizes",
661 "Sizes of each type of transactions processed by consensus handler",
662 &["class"],
663 POSITIVE_INT_BUCKETS.to_vec(),
664 registry
665 ).unwrap(),
666 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
667 "consensus_handler_num_low_scoring_authorities",
668 "Number of low scoring authorities based on reputation scores from consensus",
669 registry
670 ).unwrap(),
671 consensus_handler_scores: register_int_gauge_vec_with_registry!(
672 "consensus_handler_scores",
673 "scores from consensus for each authority",
674 &["authority"],
675 registry,
676 ).unwrap(),
677 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
678 "consensus_handler_deferred_transactions",
679 "Number of transactions deferred by consensus handler",
680 registry,
681 ).unwrap(),
682 consensus_handler_congested_transactions: register_int_counter_with_registry!(
683 "consensus_handler_congested_transactions",
684 "Number of transactions deferred by consensus handler due to congestion",
685 registry,
686 ).unwrap(),
687 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
688 "consensus_handler_cancelled_transactions",
689 "Number of transactions cancelled by consensus handler",
690 registry,
691 ).unwrap(),
692 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
693 "consensus_handler_max_congestion_control_object_costs",
694 "Max object costs for congestion control in the current consensus commit",
695 &["commit_type"],
696 registry,
697 ).unwrap(),
698 consensus_committed_subdags: register_int_counter_vec_with_registry!(
699 "consensus_committed_subdags",
700 "Number of committed subdags, sliced by leader",
701 &["authority"],
702 registry,
703 ).unwrap(),
704 consensus_committed_messages: register_int_gauge_vec_with_registry!(
705 "consensus_committed_messages",
706 "Total number of committed consensus messages, sliced by author",
707 &["authority"],
708 registry,
709 ).unwrap(),
710 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
711 "consensus_committed_user_transactions",
712 "Number of committed user transactions, sliced by submitter",
713 &["authority"],
714 registry,
715 ).unwrap(),
716 consensus_handler_leader_round: register_int_gauge_with_registry!(
717 "consensus_handler_leader_round",
718 "The leader round of the current consensus output being processed in the consensus handler",
719 registry,
720 ).unwrap(),
721 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
722 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
723 zklogin_sig_count: register_int_counter_with_registry!(
724 "zklogin_sig_count",
725 "Count of zkLogin signatures",
726 registry,
727 )
728 .unwrap(),
729 multisig_sig_count: register_int_counter_with_registry!(
730 "multisig_sig_count",
731 "Count of zkLogin signatures",
732 registry,
733 )
734 .unwrap(),
735 consensus_calculated_throughput: register_int_gauge_with_registry!(
736 "consensus_calculated_throughput",
737 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
738 registry,
739 ).unwrap(),
740 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
741 "consensus_calculated_throughput_profile",
742 "The current active calculated throughput profile",
743 registry
744 ).unwrap(),
745 execution_queueing_latency: LatencyObserver::new(),
746 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
747 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
748 }
749 }
750
751 pub fn reset_on_reconfigure(&self) {
755 self.consensus_committed_messages.reset();
756 self.consensus_handler_scores.reset();
757 self.consensus_committed_user_transactions.reset();
758 }
759}
760
761pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
768
769pub struct AuthorityState {
770 pub name: AuthorityName,
773 pub secret: StableSyncAuthoritySigner,
775
776 input_loader: TransactionInputLoader,
778 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
779
780 epoch_store: ArcSwap<AuthorityPerEpochStore>,
781
782 execution_lock: RwLock<EpochId>,
788
789 pub indexes: Option<Arc<IndexStore>>,
790 pub rest_index: Option<Arc<RestIndexStore>>,
791
792 pub subscription_handler: Arc<SubscriptionHandler>,
793 pub checkpoint_store: Arc<CheckpointStore>,
794
795 committee_store: Arc<CommitteeStore>,
796
797 transaction_manager: Arc<TransactionManager>,
799
800 #[cfg_attr(not(test), expect(unused))]
802 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
803
804 pub metrics: Arc<AuthorityMetrics>,
805 _pruner: AuthorityStorePruner,
806 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
807
808 db_checkpoint_config: DBCheckpointConfig,
810
811 pub config: NodeConfig,
812
813 pub overload_info: AuthorityOverloadInfo,
815
816 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
817
818 chain_identifier: ChainIdentifier,
821
822 pub(crate) congestion_tracker: Arc<CongestionTracker>,
823}
824
825impl AuthorityState {
834 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
835 epoch_store.committee().authority_exists(&self.name)
836 }
837
838 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
839 epoch_store
840 .active_validators()
841 .iter()
842 .any(|a| AuthorityName::from(a) == self.name)
843 }
844
845 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
846 !self.is_committee_validator(epoch_store)
847 }
848
849 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
850 &self.committee_store
851 }
852
853 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
854 self.committee_store.clone()
855 }
856
857 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
858 &self.config.authority_overload_config
859 }
860
861 pub fn get_epoch_state_commitments(
862 &self,
863 epoch: EpochId,
864 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
865 self.checkpoint_store.get_epoch_state_commitments(epoch)
866 }
867
868 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
872 async fn handle_transaction_impl(
873 &self,
874 transaction: VerifiedTransaction,
875 epoch_store: &Arc<AuthorityPerEpochStore>,
876 ) -> IotaResult<VerifiedSignedTransaction> {
877 let _execution_lock = self.execution_lock_for_signing();
879
880 let tx_digest = transaction.digest();
881 let tx_data = transaction.data().transaction_data();
882
883 let input_object_kinds = tx_data.input_objects()?;
884 let receiving_objects_refs = tx_data.receiving_objects();
885
886 iota_transaction_checks::deny::check_transaction_for_signing(
890 tx_data,
891 transaction.tx_signatures(),
892 &input_object_kinds,
893 &receiving_objects_refs,
894 &self.config.transaction_deny_config,
895 self.get_backing_package_store().as_ref(),
896 )?;
897
898 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
899 Some(tx_digest),
900 &input_object_kinds,
901 &receiving_objects_refs,
902 epoch_store.epoch(),
903 )?;
904
905 let (_gas_status, checked_input_objects) =
906 iota_transaction_checks::check_transaction_input(
907 epoch_store.protocol_config(),
908 epoch_store.reference_gas_price(),
909 tx_data,
910 input_objects,
911 &receiving_objects,
912 &self.metrics.bytecode_verifier_metrics,
913 &self.config.verifier_signing_config,
914 )?;
915
916 check_coin_deny_list_v1_during_signing(
917 tx_data.sender(),
918 &checked_input_objects,
919 &receiving_objects,
920 &self.get_object_store(),
921 )?;
922
923 let owned_objects = checked_input_objects.inner().filter_owned_objects();
924
925 let signed_transaction = VerifiedSignedTransaction::new(
926 epoch_store.epoch(),
927 transaction,
928 self.name,
929 &*self.secret,
930 );
931
932 self.get_cache_writer().try_acquire_transaction_locks(
937 epoch_store,
938 &owned_objects,
939 signed_transaction.clone(),
940 )?;
941
942 Ok(signed_transaction)
943 }
944
945 #[instrument(name = "handle_transaction", level = "trace", skip_all)]
947 pub async fn handle_transaction(
948 &self,
949 epoch_store: &Arc<AuthorityPerEpochStore>,
950 transaction: VerifiedTransaction,
951 ) -> IotaResult<HandleTransactionResponse> {
952 let tx_digest = *transaction.digest();
953 debug!("handle_transaction");
954
955 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
957 return Ok(HandleTransactionResponse { status });
958 }
959
960 let _metrics_guard = self
961 .metrics
962 .authority_state_handle_transaction_latency
963 .start_timer();
964 self.metrics.tx_orders.inc();
965
966 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
967 match signed {
968 Ok(s) => {
969 if self.is_committee_validator(epoch_store) {
970 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
971 let tx = s.clone();
972 let validator_tx_finalizer = validator_tx_finalizer.clone();
973 let cache_reader = self.get_transaction_cache_reader().clone();
974 let epoch_store = epoch_store.clone();
975 spawn_monitored_task!(epoch_store.within_alive_epoch(
976 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
977 ));
978 }
979 }
980 Ok(HandleTransactionResponse {
981 status: TransactionStatus::Signed(s.into_inner().into_sig()),
982 })
983 }
984 Err(err) => Ok(HandleTransactionResponse {
988 status: self
989 .get_transaction_status(&tx_digest, epoch_store)?
990 .ok_or(err)?
991 .1,
992 }),
993 }
994 }
995
996 pub fn check_system_overload_at_signing(&self) -> bool {
997 self.config
998 .authority_overload_config
999 .check_system_overload_at_signing
1000 }
1001
1002 pub fn check_system_overload_at_execution(&self) -> bool {
1003 self.config
1004 .authority_overload_config
1005 .check_system_overload_at_execution
1006 }
1007
1008 pub(crate) fn check_system_overload(
1009 &self,
1010 consensus_adapter: &Arc<ConsensusAdapter>,
1011 tx_data: &SenderSignedData,
1012 do_authority_overload_check: bool,
1013 ) -> IotaResult {
1014 if do_authority_overload_check {
1015 self.check_authority_overload(tx_data).tap_err(|_| {
1016 self.update_overload_metrics("execution_queue");
1017 })?;
1018 }
1019 self.transaction_manager
1020 .check_execution_overload(self.overload_config(), tx_data)
1021 .tap_err(|_| {
1022 self.update_overload_metrics("execution_pending");
1023 })?;
1024 consensus_adapter.check_consensus_overload().tap_err(|_| {
1025 self.update_overload_metrics("consensus");
1026 })?;
1027
1028 let pending_tx_count = self
1029 .get_cache_commit()
1030 .approximate_pending_transaction_count();
1031 if pending_tx_count
1032 > self
1033 .config
1034 .execution_cache_config
1035 .writeback_cache
1036 .backpressure_threshold_for_rpc()
1037 {
1038 return Err(IotaError::ValidatorOverloadedRetryAfter {
1039 retry_after_secs: 10,
1040 });
1041 }
1042
1043 Ok(())
1044 }
1045
1046 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1047 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1048 return Ok(());
1049 }
1050
1051 let load_shedding_percentage = self
1052 .overload_info
1053 .load_shedding_percentage
1054 .load(Ordering::Relaxed);
1055 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1056 }
1057
1058 fn update_overload_metrics(&self, source: &str) {
1059 self.metrics
1060 .transaction_overload_sources
1061 .with_label_values(&[source])
1062 .inc();
1063 }
1064
1065 #[instrument(level = "trace", skip_all)]
1067 pub async fn execute_certificate(
1068 &self,
1069 certificate: &VerifiedCertificate,
1070 epoch_store: &Arc<AuthorityPerEpochStore>,
1071 ) -> IotaResult<TransactionEffects> {
1072 let _metrics_guard = if certificate.contains_shared_object() {
1073 self.metrics
1074 .execute_certificate_latency_shared_object
1075 .start_timer()
1076 } else {
1077 self.metrics
1078 .execute_certificate_latency_single_writer
1079 .start_timer()
1080 };
1081 trace!("execute_certificate");
1082
1083 self.metrics.total_cert_attempts.inc();
1084
1085 if !certificate.contains_shared_object() {
1086 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1091 }
1092
1093 epoch_store
1096 .within_alive_epoch(self.notify_read_effects(certificate))
1097 .await
1098 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1099 .and_then(|r| r)
1100 }
1101
1102 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1117 pub fn try_execute_immediately(
1118 &self,
1119 certificate: &VerifiedExecutableTransaction,
1120 expected_effects_digest: Option<TransactionEffectsDigest>,
1121 epoch_store: &Arc<AuthorityPerEpochStore>,
1122 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1123 let _scope = monitored_scope("Execution::try_execute_immediately");
1124 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1125
1126 let tx_digest = certificate.digest();
1127
1128 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1130
1131 if let Some(effects) = self
1134 .get_transaction_cache_reader()
1135 .try_get_executed_effects(tx_digest)?
1136 {
1137 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1138 assert_eq!(
1139 effects.digest(),
1140 expected_effects_digest_inner,
1141 "Unexpected effects digest for transaction {tx_digest:?}"
1142 );
1143 }
1144 tx_guard.release();
1145 return Ok((effects, None));
1146 }
1147 let input_objects =
1148 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1149
1150 let expected_effects_digest =
1156 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1157
1158 self.process_certificate(
1159 tx_guard,
1160 certificate,
1161 input_objects,
1162 expected_effects_digest,
1163 epoch_store,
1164 )
1165 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1166 .tap_ok(
1167 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1168 )
1169 }
1170
1171 pub fn read_objects_for_execution(
1172 &self,
1173 tx_lock: &CertLockGuard,
1174 certificate: &VerifiedExecutableTransaction,
1175 epoch_store: &Arc<AuthorityPerEpochStore>,
1176 ) -> IotaResult<InputObjects> {
1177 let _scope = monitored_scope("Execution::load_input_objects");
1178 let _metrics_guard = self
1179 .metrics
1180 .execution_load_input_objects_latency
1181 .start_timer();
1182 let input_objects = &certificate.data().transaction_data().input_objects()?;
1183 self.input_loader.read_objects_for_execution(
1184 epoch_store,
1185 &certificate.key(),
1186 tx_lock,
1187 input_objects,
1188 epoch_store.epoch(),
1189 )
1190 }
1191
1192 pub fn try_execute_for_test(
1196 &self,
1197 certificate: &VerifiedCertificate,
1198 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1199 let epoch_store = self.epoch_store_for_testing();
1200 let (effects, execution_error_opt) = self.try_execute_immediately(
1201 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1202 None,
1203 &epoch_store,
1204 )?;
1205 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1206 Ok((signed_effects, execution_error_opt))
1207 }
1208
1209 pub fn execute_for_test(
1211 &self,
1212 certificate: &VerifiedCertificate,
1213 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1214 self.try_execute_for_test(certificate)
1215 .expect("try_execute_for_test should not fail")
1216 }
1217
1218 pub async fn notify_read_effects(
1219 &self,
1220 certificate: &VerifiedCertificate,
1221 ) -> IotaResult<TransactionEffects> {
1222 self.get_transaction_cache_reader()
1223 .try_notify_read_executed_effects(&[*certificate.digest()])
1224 .await
1225 .map(|mut r| r.pop().expect("must return correct number of effects"))
1226 }
1227
1228 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1229 self.get_object_cache_reader()
1230 .try_check_owned_objects_are_live(owned_object_refs)
1231 }
1232
1233 pub(crate) fn debug_dump_transaction_state(
1238 &self,
1239 tx_digest: &TransactionDigest,
1240 effects: &TransactionEffects,
1241 expected_effects_digest: TransactionEffectsDigest,
1242 inner_temporary_store: &InnerTemporaryStore,
1243 certificate: &VerifiedExecutableTransaction,
1244 debug_dump_config: &StateDebugDumpConfig,
1245 ) -> IotaResult<PathBuf> {
1246 let dump_dir = debug_dump_config
1247 .dump_file_directory
1248 .as_ref()
1249 .cloned()
1250 .unwrap_or(std::env::temp_dir());
1251 let epoch_store = self.load_epoch_store_one_call_per_task();
1252
1253 NodeStateDump::new(
1254 tx_digest,
1255 effects,
1256 expected_effects_digest,
1257 self.get_object_store().as_ref(),
1258 &epoch_store,
1259 inner_temporary_store,
1260 certificate,
1261 )?
1262 .write_to_file(&dump_dir)
1263 .map_err(|e| IotaError::FileIO(e.to_string()))
1264 }
1265
1266 #[instrument(level = "trace", skip_all)]
1267 pub(crate) fn process_certificate(
1268 &self,
1269 tx_guard: CertTxGuard,
1270 certificate: &VerifiedExecutableTransaction,
1271 input_objects: InputObjects,
1272 expected_effects_digest: Option<TransactionEffectsDigest>,
1273 epoch_store: &Arc<AuthorityPerEpochStore>,
1274 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1275 let process_certificate_start_time = tokio::time::Instant::now();
1276 let digest = *certificate.digest();
1277
1278 fail_point_if!("correlated-crash-process-certificate", || {
1279 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1280 iota_simulator::task::kill_current_node(None);
1281 }
1282 });
1283
1284 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1285 let execution_guard = match execution_guard {
1291 Ok(execution_guard) => execution_guard,
1292 Err(err) => {
1293 tx_guard.release();
1294 return Err(err);
1295 }
1296 };
1297 if *execution_guard != epoch_store.epoch() {
1301 tx_guard.release();
1302 info!("The epoch of the execution_guard doesn't match the epoch store");
1303 return Err(IotaError::WrongEpoch {
1304 expected_epoch: epoch_store.epoch(),
1305 actual_epoch: *execution_guard,
1306 });
1307 }
1308
1309 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1315 &execution_guard,
1316 certificate,
1317 input_objects,
1318 epoch_store,
1319 ) {
1320 Err(e) => {
1321 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1322 tx_guard.release();
1323 return Err(e);
1324 }
1325 Ok(res) => res,
1326 };
1327
1328 if let Some(expected_effects_digest) = expected_effects_digest {
1329 if effects.digest() != expected_effects_digest {
1330 match self.debug_dump_transaction_state(
1332 &digest,
1333 &effects,
1334 expected_effects_digest,
1335 &inner_temporary_store,
1336 certificate,
1337 &self.config.state_debug_dump_config,
1338 ) {
1339 Ok(out_path) => {
1340 info!(
1341 "Dumped node state for transaction {} to {}",
1342 digest,
1343 out_path.as_path().display().to_string()
1344 );
1345 }
1346 Err(e) => {
1347 error!("Error dumping state for transaction {}: {e}", digest);
1348 }
1349 }
1350 error!(
1351 tx_digest = ?digest,
1352 ?expected_effects_digest,
1353 actual_effects = ?effects,
1354 "fork detected!"
1355 );
1356 panic!(
1357 "Transaction {} is expected to have effects digest {}, but got {}!",
1358 digest,
1359 expected_effects_digest,
1360 effects.digest(),
1361 );
1362 }
1363 }
1364
1365 fail_point!("crash");
1366
1367 self.commit_certificate(
1368 certificate,
1369 inner_temporary_store,
1370 &effects,
1371 tx_guard,
1372 execution_guard,
1373 epoch_store,
1374 )?;
1375
1376 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1377 certificate.data().transaction_data().kind()
1378 {
1379 if let Some(err) = &execution_error_opt {
1380 debug_fatal!("Authenticator state update failed: {:?}", err);
1381 }
1382 epoch_store.update_authenticator_state(auth_state);
1383
1384 if cfg!(debug_assertions) {
1387 let authenticator_state = get_authenticator_state(self.get_object_store())
1388 .expect("Read cannot fail")
1389 .expect("Authenticator state must exist");
1390
1391 let mut sys_jwks: Vec<_> = authenticator_state
1392 .active_jwks
1393 .into_iter()
1394 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1395 .collect();
1396 let mut active_jwks: Vec<_> = epoch_store
1397 .signature_verifier
1398 .get_jwks()
1399 .into_iter()
1400 .collect();
1401 sys_jwks.sort();
1402 active_jwks.sort();
1403
1404 assert_eq!(sys_jwks, active_jwks);
1405 }
1406 }
1407
1408 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1409 if elapsed > 0.0 {
1410 self.metrics
1411 .execution_gas_latency_ratio
1412 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1413 };
1414 Ok((effects, execution_error_opt))
1415 }
1416
1417 #[instrument(level = "trace", skip_all)]
1418 fn commit_certificate(
1419 &self,
1420 certificate: &VerifiedExecutableTransaction,
1421 inner_temporary_store: InnerTemporaryStore,
1422 effects: &TransactionEffects,
1423 tx_guard: CertTxGuard,
1424 _execution_guard: ExecutionLockReadGuard<'_>,
1425 epoch_store: &Arc<AuthorityPerEpochStore>,
1426 ) -> IotaResult {
1427 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1428 monitored_scope("Execution::commit_certificate");
1429 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1430
1431 let tx_key = certificate.key();
1432 let tx_digest = certificate.digest();
1433 let input_object_count = inner_temporary_store.input_objects.len();
1434 let shared_object_count = effects.input_shared_objects().len();
1435
1436 let output_keys = inner_temporary_store.get_output_keys(effects);
1437
1438 let _ = self
1440 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1441 .tap_err(|e| {
1442 self.metrics.post_processing_total_failures.inc();
1443 error!(?tx_digest, "tx post processing failed: {e}");
1444 });
1445
1446 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1450
1451 fail_point!("crash");
1453
1454 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1455 certificate.clone().into_unsigned(),
1456 effects.clone(),
1457 inner_temporary_store,
1458 );
1459 self.get_cache_writer()
1460 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1461
1462 if certificate.transaction_data().is_end_of_epoch_tx() {
1463 self.get_object_cache_reader()
1466 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1467 }
1468
1469 tx_guard.commit_tx();
1471
1472 self.transaction_manager
1476 .notify_commit(tx_digest, output_keys, epoch_store);
1477
1478 self.update_metrics(certificate, input_object_count, shared_object_count);
1479
1480 Ok(())
1481 }
1482
1483 fn update_metrics(
1484 &self,
1485 certificate: &VerifiedExecutableTransaction,
1486 input_object_count: usize,
1487 shared_object_count: usize,
1488 ) {
1489 if certificate.has_zklogin_sig() {
1491 self.metrics.zklogin_sig_count.inc();
1492 } else if certificate.has_upgraded_multisig() {
1493 self.metrics.multisig_sig_count.inc();
1494 }
1495
1496 self.metrics.total_effects.inc();
1497 self.metrics.total_certs.inc();
1498
1499 if shared_object_count > 0 {
1500 self.metrics.shared_obj_tx.inc();
1501 }
1502
1503 if certificate.is_sponsored_tx() {
1504 self.metrics.sponsored_tx.inc();
1505 }
1506
1507 self.metrics
1508 .num_input_objs
1509 .observe(input_object_count as f64);
1510 self.metrics
1511 .num_shared_objects
1512 .observe(shared_object_count as f64);
1513 self.metrics.batch_size.observe(
1514 certificate
1515 .data()
1516 .intent_message()
1517 .value
1518 .kind()
1519 .num_commands() as f64,
1520 );
1521 }
1522
1523 #[instrument(level = "trace", skip_all)]
1535 fn prepare_certificate(
1536 &self,
1537 _execution_guard: &ExecutionLockReadGuard<'_>,
1538 certificate: &VerifiedExecutableTransaction,
1539 input_objects: InputObjects,
1540 epoch_store: &Arc<AuthorityPerEpochStore>,
1541 ) -> IotaResult<(
1542 InnerTemporaryStore,
1543 TransactionEffects,
1544 Option<ExecutionError>,
1545 )> {
1546 let _scope = monitored_scope("Execution::prepare_certificate");
1547 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1548 let prepare_certificate_start_time = tokio::time::Instant::now();
1549
1550 let tx_data = certificate.data().transaction_data();
1553 tx_data.validity_check(epoch_store.protocol_config())?;
1554
1555 let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1558 certificate,
1559 input_objects,
1560 epoch_store.protocol_config(),
1561 epoch_store.reference_gas_price(),
1562 )?;
1563
1564 let owned_object_refs = input_objects.inner().filter_owned_objects();
1565 self.check_owned_locks(&owned_object_refs)?;
1566 let tx_digest = *certificate.digest();
1567 let protocol_config = epoch_store.protocol_config();
1568 let transaction_data = &certificate.data().intent_message().value;
1569 let (kind, signer, gas) = transaction_data.execution_parts();
1570
1571 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1572 let (inner_temp_store, _, mut effects, execution_error_opt) =
1573 epoch_store.executor().execute_transaction_to_effects(
1574 self.get_backing_store().as_ref(),
1575 protocol_config,
1576 self.metrics.limits_metrics.clone(),
1577 self.config
1580 .expensive_safety_check_config
1581 .enable_deep_per_tx_iota_conservation_check(),
1582 self.config.certificate_deny_config.certificate_deny_set(),
1583 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1584 epoch_store
1585 .epoch_start_config()
1586 .epoch_data()
1587 .epoch_start_timestamp(),
1588 input_objects,
1589 gas,
1590 gas_status,
1591 kind,
1592 signer,
1593 tx_digest,
1594 &mut None,
1595 );
1596
1597 fail_point_if!("cp_execution_nondeterminism", || {
1598 #[cfg(msim)]
1599 self.create_fail_state(certificate, epoch_store, &mut effects);
1600 });
1601
1602 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1603 if elapsed > 0.0 {
1604 self.metrics
1605 .prepare_cert_gas_latency_ratio
1606 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1607 }
1608
1609 Ok((inner_temp_store, effects, execution_error_opt.err()))
1610 }
1611
1612 pub fn prepare_certificate_for_benchmark(
1613 &self,
1614 certificate: &VerifiedExecutableTransaction,
1615 input_objects: InputObjects,
1616 epoch_store: &Arc<AuthorityPerEpochStore>,
1617 ) -> IotaResult<(
1618 InnerTemporaryStore,
1619 TransactionEffects,
1620 Option<ExecutionError>,
1621 )> {
1622 let lock = RwLock::new(epoch_store.epoch());
1623 let execution_guard = lock.try_read().unwrap();
1624
1625 self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1626 }
1627
1628 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1631 #[allow(clippy::type_complexity)]
1632 pub fn dry_exec_transaction(
1633 &self,
1634 transaction: TransactionData,
1635 transaction_digest: TransactionDigest,
1636 ) -> IotaResult<(
1637 DryRunTransactionBlockResponse,
1638 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1639 TransactionEffects,
1640 Option<ObjectID>,
1641 )> {
1642 let epoch_store = self.load_epoch_store_one_call_per_task();
1643 if !self.is_fullnode(&epoch_store) {
1644 return Err(IotaError::UnsupportedFeature {
1645 error: "dry-exec is only supported on fullnodes".to_string(),
1646 });
1647 }
1648
1649 if transaction.kind().is_system_tx() {
1650 return Err(IotaError::UnsupportedFeature {
1651 error: "dry-exec does not support system transactions".to_string(),
1652 });
1653 }
1654
1655 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1656 }
1657
1658 #[allow(clippy::type_complexity)]
1659 pub fn dry_exec_transaction_for_benchmark(
1660 &self,
1661 transaction: TransactionData,
1662 transaction_digest: TransactionDigest,
1663 ) -> IotaResult<(
1664 DryRunTransactionBlockResponse,
1665 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1666 TransactionEffects,
1667 Option<ObjectID>,
1668 )> {
1669 let epoch_store = self.load_epoch_store_one_call_per_task();
1670 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1671 }
1672
1673 #[instrument(level = "trace", skip_all)]
1674 #[allow(clippy::type_complexity)]
1675 fn dry_exec_transaction_impl(
1676 &self,
1677 epoch_store: &AuthorityPerEpochStore,
1678 transaction: TransactionData,
1679 transaction_digest: TransactionDigest,
1680 ) -> IotaResult<(
1681 DryRunTransactionBlockResponse,
1682 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1683 TransactionEffects,
1684 Option<ObjectID>,
1685 )> {
1686 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1688
1689 let input_object_kinds = transaction.input_objects()?;
1690 let receiving_object_refs = transaction.receiving_objects();
1691
1692 iota_transaction_checks::deny::check_transaction_for_signing(
1693 &transaction,
1694 &[],
1695 &input_object_kinds,
1696 &receiving_object_refs,
1697 &self.config.transaction_deny_config,
1698 self.get_backing_package_store().as_ref(),
1699 )?;
1700
1701 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1702 None,
1704 &input_object_kinds,
1705 &receiving_object_refs,
1706 epoch_store.epoch(),
1707 )?;
1708
1709 let mut transaction = transaction;
1711 let mut gas_object_refs = transaction.gas().to_vec();
1712 let reference_gas_price = epoch_store.reference_gas_price();
1713 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1714 let sender = transaction.gas_owner();
1715 let gas_object_id = ObjectID::random();
1716 let gas_object = Object::new_move(
1717 MoveObject::new_gas_coin(
1718 OBJECT_START_VERSION,
1719 gas_object_id,
1720 SIMULATION_GAS_COIN_VALUE,
1721 ),
1722 Owner::AddressOwner(sender),
1723 TransactionDigest::genesis_marker(),
1724 );
1725 let gas_object_ref = gas_object.compute_object_reference();
1726 gas_object_refs = vec![gas_object_ref];
1727 transaction.gas_data_mut().payment = gas_object_refs.clone();
1729 (
1730 iota_transaction_checks::check_transaction_input_with_given_gas(
1731 epoch_store.protocol_config(),
1732 reference_gas_price,
1733 &transaction,
1734 input_objects,
1735 receiving_objects,
1736 gas_object,
1737 &self.metrics.bytecode_verifier_metrics,
1738 &self.config.verifier_signing_config,
1739 )?,
1740 Some(gas_object_id),
1741 )
1742 } else {
1743 (
1744 iota_transaction_checks::check_transaction_input(
1745 epoch_store.protocol_config(),
1746 reference_gas_price,
1747 &transaction,
1748 input_objects,
1749 &receiving_objects,
1750 &self.metrics.bytecode_verifier_metrics,
1751 &self.config.verifier_signing_config,
1752 )?,
1753 None,
1754 )
1755 };
1756
1757 let protocol_config = epoch_store.protocol_config();
1758 let (kind, signer, _) = transaction.execution_parts();
1759
1760 let silent = true;
1761 let executor = iota_execution::executor(protocol_config, silent, None)
1762 .expect("Creating an executor should not fail here");
1763
1764 let expensive_checks = false;
1765 let (inner_temp_store, _, effects, execution_error) = executor
1766 .execute_transaction_to_effects(
1767 self.get_backing_store().as_ref(),
1768 protocol_config,
1769 self.metrics.limits_metrics.clone(),
1770 expensive_checks,
1771 self.config.certificate_deny_config.certificate_deny_set(),
1772 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1773 epoch_store
1774 .epoch_start_config()
1775 .epoch_data()
1776 .epoch_start_timestamp(),
1777 checked_input_objects,
1778 gas_object_refs,
1779 gas_status,
1780 kind,
1781 signer,
1782 transaction_digest,
1783 &mut None,
1784 );
1785 let tx_digest = *effects.transaction_digest();
1786
1787 let module_cache =
1788 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1789
1790 let mut layout_resolver =
1791 epoch_store
1792 .executor()
1793 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1794 &inner_temp_store,
1795 self.get_backing_package_store(),
1796 )));
1797 let object_changes = Vec::new();
1799
1800 let balance_changes = Vec::new();
1802
1803 let written_with_kind = effects
1804 .created()
1805 .into_iter()
1806 .map(|(oref, _)| (oref, WriteKind::Create))
1807 .chain(
1808 effects
1809 .unwrapped()
1810 .into_iter()
1811 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1812 )
1813 .chain(
1814 effects
1815 .mutated()
1816 .into_iter()
1817 .map(|(oref, _)| (oref, WriteKind::Mutate)),
1818 )
1819 .map(|(oref, kind)| {
1820 let obj = inner_temp_store.written.get(&oref.0).unwrap();
1821 (oref.0, (oref, obj.clone(), kind))
1823 })
1824 .collect();
1825
1826 let execution_error_source = execution_error
1827 .as_ref()
1828 .err()
1829 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
1830
1831 Ok((
1832 DryRunTransactionBlockResponse {
1833 suggested_gas_price: self
1835 .congestion_tracker
1836 .get_prediction_suggested_gas_price(&transaction),
1837 input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1838 .map_err(|e| IotaError::TransactionSerialization {
1839 error: format!(
1840 "Failed to convert transaction to IotaTransactionBlockData: {e}",
1841 ),
1842 })?, effects: effects.clone().try_into()?,
1844 events: IotaTransactionBlockEvents::try_from(
1845 inner_temp_store.events.clone(),
1846 tx_digest,
1847 None,
1848 layout_resolver.as_mut(),
1849 )?,
1850 object_changes,
1851 balance_changes,
1852 execution_error_source,
1853 },
1854 written_with_kind,
1855 effects,
1856 mock_gas,
1857 ))
1858 }
1859
1860 pub fn simulate_transaction(
1861 &self,
1862 mut transaction: TransactionData,
1863 checks: VmChecks,
1864 ) -> IotaResult<SimulateTransactionResult> {
1865 if transaction.kind().is_system_tx() {
1866 return Err(IotaError::UnsupportedFeature {
1867 error: "simulate does not support system transactions".to_string(),
1868 });
1869 }
1870
1871 let epoch_store = self.load_epoch_store_one_call_per_task();
1872 if !self.is_fullnode(&epoch_store) {
1873 return Err(IotaError::UnsupportedFeature {
1874 error: "simulate is only supported on fullnodes".to_string(),
1875 });
1876 }
1877
1878 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1882
1883 let input_object_kinds = transaction.input_objects()?;
1884 let receiving_object_refs = transaction.receiving_objects();
1885
1886 iota_transaction_checks::deny::check_transaction_for_signing(
1889 &transaction,
1890 &[],
1891 &input_object_kinds,
1892 &receiving_object_refs,
1893 &self.config.transaction_deny_config,
1894 self.get_backing_package_store().as_ref(),
1895 )?;
1896
1897 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1899 None,
1901 &input_object_kinds,
1902 &receiving_object_refs,
1903 epoch_store.epoch(),
1904 )?;
1905
1906 let mock_gas_id = if transaction.gas().is_empty() {
1908 let mock_gas_object = Object::new_move(
1909 MoveObject::new_gas_coin(
1910 OBJECT_START_VERSION,
1911 ObjectID::MAX,
1912 SIMULATION_GAS_COIN_VALUE,
1913 ),
1914 Owner::AddressOwner(transaction.gas_data().owner),
1915 TransactionDigest::genesis_marker(),
1916 );
1917 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
1918 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
1919 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
1920 Some(mock_gas_object.id())
1921 } else {
1922 None
1923 };
1924
1925 let protocol_config = epoch_store.protocol_config();
1926
1927 let (gas_status, checked_input_objects) = if checks.enabled() {
1930 iota_transaction_checks::check_transaction_input(
1931 protocol_config,
1932 epoch_store.reference_gas_price(),
1933 &transaction,
1934 input_objects,
1935 &receiving_objects,
1936 &self.metrics.bytecode_verifier_metrics,
1937 &self.config.verifier_signing_config,
1938 )?
1939 } else {
1940 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
1941 protocol_config,
1942 transaction.kind(),
1943 input_objects,
1944 receiving_objects,
1945 )?;
1946 let gas_status = IotaGasStatus::new(
1947 transaction.gas_budget(),
1948 transaction.gas_price(),
1949 epoch_store.reference_gas_price(),
1950 protocol_config,
1951 )?;
1952
1953 (gas_status, checked_input_objects)
1954 };
1955
1956 let executor = iota_execution::executor(
1958 protocol_config,
1959 true, None,
1961 )
1962 .expect("Creating an executor should not fail here");
1963
1964 let (kind, signer, gas_coins) = transaction.execution_parts();
1966 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
1967 self.get_backing_store().as_ref(),
1968 protocol_config,
1969 self.metrics.limits_metrics.clone(),
1970 false, self.config.certificate_deny_config.certificate_deny_set(),
1972 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1973 epoch_store
1974 .epoch_start_config()
1975 .epoch_data()
1976 .epoch_start_timestamp(),
1977 checked_input_objects,
1978 gas_coins,
1979 gas_status,
1980 kind,
1981 signer,
1982 transaction.digest(),
1983 checks.disabled(),
1984 );
1985
1986 Ok(SimulateTransactionResult {
1989 input_objects: inner_temp_store.input_objects,
1990 output_objects: inner_temp_store.written,
1991 events: effects.events_digest().map(|_| inner_temp_store.events),
1992 effects,
1993 execution_result,
1994 mock_gas_id,
1995 })
1996 }
1997
1998 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2003 pub async fn dev_inspect_transaction_block(
2004 &self,
2005 sender: IotaAddress,
2006 transaction_kind: TransactionKind,
2007 gas_price: Option<u64>,
2008 gas_budget: Option<u64>,
2009 gas_sponsor: Option<IotaAddress>,
2010 gas_objects: Option<Vec<ObjectRef>>,
2011 show_raw_txn_data_and_effects: Option<bool>,
2012 skip_checks: Option<bool>,
2013 ) -> IotaResult<DevInspectResults> {
2014 let epoch_store = self.load_epoch_store_one_call_per_task();
2015
2016 if !self.is_fullnode(&epoch_store) {
2017 return Err(IotaError::UnsupportedFeature {
2018 error: "dev-inspect is only supported on fullnodes".to_string(),
2019 });
2020 }
2021
2022 if transaction_kind.is_system_tx() {
2023 return Err(IotaError::UnsupportedFeature {
2024 error: "system transactions are not supported".to_string(),
2025 });
2026 }
2027
2028 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2029 let skip_checks = skip_checks.unwrap_or(true);
2030 let reference_gas_price = epoch_store.reference_gas_price();
2031 let protocol_config = epoch_store.protocol_config();
2032 let max_tx_gas = protocol_config.max_tx_gas();
2033
2034 let price = gas_price.unwrap_or(reference_gas_price);
2035 let budget = gas_budget.unwrap_or(max_tx_gas);
2036 let owner = gas_sponsor.unwrap_or(sender);
2037 let payment = gas_objects.unwrap_or_default();
2040 let transaction = TransactionData::V1(TransactionDataV1 {
2041 kind: transaction_kind.clone(),
2042 sender,
2043 gas_data: GasData {
2044 payment,
2045 owner,
2046 price,
2047 budget,
2048 },
2049 expiration: TransactionExpiration::None,
2050 });
2051
2052 let raw_txn_data = if show_raw_txn_data_and_effects {
2053 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2054 error: "Failed to serialize transaction during dev inspect".to_string(),
2055 })?
2056 } else {
2057 vec![]
2058 };
2059
2060 transaction.validity_check_no_gas_check(protocol_config)?;
2061
2062 let input_object_kinds = transaction.input_objects()?;
2063 let receiving_object_refs = transaction.receiving_objects();
2064
2065 iota_transaction_checks::deny::check_transaction_for_signing(
2066 &transaction,
2067 &[],
2068 &input_object_kinds,
2069 &receiving_object_refs,
2070 &self.config.transaction_deny_config,
2071 self.get_backing_package_store().as_ref(),
2072 )?;
2073
2074 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2075 None,
2077 &input_object_kinds,
2078 &receiving_object_refs,
2079 epoch_store.epoch(),
2080 )?;
2081
2082 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2084 SIMULATION_GAS_COIN_VALUE,
2085 transaction.gas_owner(),
2086 );
2087
2088 let gas_objects = if transaction.gas().is_empty() {
2089 let gas_object_ref = dummy_gas_object.compute_object_reference();
2090 vec![gas_object_ref]
2091 } else {
2092 transaction.gas().to_vec()
2093 };
2094
2095 let (gas_status, checked_input_objects) = if skip_checks {
2096 if transaction.gas().is_empty() {
2101 input_objects.push(ObjectReadResult::new(
2102 InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2103 dummy_gas_object.into(),
2104 ));
2105 }
2106 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2107 protocol_config,
2108 &transaction_kind,
2109 input_objects,
2110 receiving_objects,
2111 )?;
2112 let gas_status = IotaGasStatus::new(
2113 max_tx_gas,
2114 transaction.gas_price(),
2115 reference_gas_price,
2116 protocol_config,
2117 )?;
2118
2119 (gas_status, checked_input_objects)
2120 } else {
2121 if transaction.gas().is_empty() {
2125 iota_transaction_checks::check_transaction_input_with_given_gas(
2126 epoch_store.protocol_config(),
2127 reference_gas_price,
2128 &transaction,
2129 input_objects,
2130 receiving_objects,
2131 dummy_gas_object,
2132 &self.metrics.bytecode_verifier_metrics,
2133 &self.config.verifier_signing_config,
2134 )?
2135 } else {
2136 iota_transaction_checks::check_transaction_input(
2137 epoch_store.protocol_config(),
2138 reference_gas_price,
2139 &transaction,
2140 input_objects,
2141 &receiving_objects,
2142 &self.metrics.bytecode_verifier_metrics,
2143 &self.config.verifier_signing_config,
2144 )?
2145 }
2146 };
2147
2148 let executor = iota_execution::executor(protocol_config, true, None)
2149 .expect("Creating an executor should not fail here");
2150 let intent_msg = IntentMessage::new(
2151 Intent {
2152 version: IntentVersion::V0,
2153 scope: IntentScope::TransactionData,
2154 app_id: IntentAppId::Iota,
2155 },
2156 transaction,
2157 );
2158 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2159 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2160 self.get_backing_store().as_ref(),
2161 protocol_config,
2162 self.metrics.limits_metrics.clone(),
2163 false,
2165 self.config.certificate_deny_config.certificate_deny_set(),
2166 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2167 epoch_store
2168 .epoch_start_config()
2169 .epoch_data()
2170 .epoch_start_timestamp(),
2171 checked_input_objects,
2172 gas_objects,
2173 gas_status,
2174 transaction_kind,
2175 sender,
2176 transaction_digest,
2177 skip_checks,
2178 );
2179
2180 let raw_effects = if show_raw_txn_data_and_effects {
2181 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2182 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2183 })?
2184 } else {
2185 vec![]
2186 };
2187
2188 let mut layout_resolver =
2189 epoch_store
2190 .executor()
2191 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2192 &inner_temp_store,
2193 self.get_backing_package_store(),
2194 )));
2195
2196 DevInspectResults::new(
2197 effects,
2198 inner_temp_store.events.clone(),
2199 execution_result,
2200 raw_txn_data,
2201 raw_effects,
2202 layout_resolver.as_mut(),
2203 )
2204 }
2205
2206 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2208 let epoch_store = self.epoch_store_for_testing();
2209 Ok(epoch_store.reference_gas_price())
2210 }
2211
2212 #[instrument(level = "trace", skip_all)]
2213 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2214 self.get_transaction_cache_reader()
2215 .try_is_tx_already_executed(digest)
2216 }
2217
2218 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2220 self.try_is_tx_already_executed(digest)
2221 .expect("storage access failed")
2222 }
2223
2224 #[instrument(level = "debug", skip_all, err)]
2226 fn index_tx(
2227 &self,
2228 indexes: &IndexStore,
2229 digest: &TransactionDigest,
2230 cert: &VerifiedExecutableTransaction,
2232 effects: &TransactionEffects,
2233 events: &TransactionEvents,
2234 timestamp_ms: u64,
2235 tx_coins: Option<TxCoins>,
2236 written: &WrittenObjects,
2237 inner_temporary_store: &InnerTemporaryStore,
2238 ) -> IotaResult<u64> {
2239 let changes = self
2240 .process_object_index(effects, written, inner_temporary_store)
2241 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2242
2243 indexes.index_tx(
2244 cert.data().intent_message().value.sender(),
2245 cert.data()
2246 .intent_message()
2247 .value
2248 .input_objects()?
2249 .iter()
2250 .map(|o| o.object_id()),
2251 effects
2252 .all_changed_objects()
2253 .into_iter()
2254 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2255 cert.data()
2256 .intent_message()
2257 .value
2258 .move_calls()
2259 .into_iter()
2260 .map(|(package, module, function)| {
2261 (*package, module.to_owned(), function.to_owned())
2262 }),
2263 events,
2264 changes,
2265 digest,
2266 timestamp_ms,
2267 tx_coins,
2268 )
2269 }
2270
2271 #[cfg(msim)]
2272 fn create_fail_state(
2273 &self,
2274 certificate: &VerifiedExecutableTransaction,
2275 epoch_store: &Arc<AuthorityPerEpochStore>,
2276 effects: &mut TransactionEffects,
2277 ) {
2278 use std::cell::RefCell;
2279 thread_local! {
2280 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2281 }
2282 if !certificate.data().intent_message().value.is_system_tx() {
2283 let committee = epoch_store.committee();
2284 let cur_stake = (**committee).weight(&self.name);
2285 if cur_stake > 0 {
2286 FAIL_STATE.with_borrow_mut(|fail_state| {
2287 if fail_state.0 < committee.validity_threshold() {
2289 fail_state.0 += cur_stake;
2290 fail_state.1.insert(self.name);
2291 }
2292
2293 if fail_state.1.contains(&self.name) {
2294 info!("cp_exec failing tx");
2295 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2296 }
2297 });
2298 }
2299 }
2300 }
2301
2302 fn process_object_index(
2303 &self,
2304 effects: &TransactionEffects,
2305 written: &WrittenObjects,
2306 inner_temporary_store: &InnerTemporaryStore,
2307 ) -> IotaResult<ObjectIndexChanges> {
2308 let epoch_store = self.load_epoch_store_one_call_per_task();
2309 let mut layout_resolver =
2310 epoch_store
2311 .executor()
2312 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2313 inner_temporary_store,
2314 self.get_backing_package_store(),
2315 )));
2316
2317 let modified_at_version = effects
2318 .modified_at_versions()
2319 .into_iter()
2320 .collect::<HashMap<_, _>>();
2321
2322 let tx_digest = effects.transaction_digest();
2323 let mut deleted_owners = vec![];
2324 let mut deleted_dynamic_fields = vec![];
2325 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2326 let old_version = modified_at_version.get(&id).unwrap();
2327 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2330 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2331 ) {
2332 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2333 Owner::ObjectOwner(object_id) => {
2334 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2335 }
2336 _ => {}
2337 }
2338 }
2339
2340 let mut new_owners = vec![];
2341 let mut new_dynamic_fields = vec![];
2342
2343 for (oref, owner, kind) in effects.all_changed_objects() {
2344 let id = &oref.0;
2345 if let WriteKind::Mutate = kind {
2348 let Some(old_version) = modified_at_version.get(id) else {
2349 panic!(
2350 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2351 );
2352 };
2353 let Some(old_object) = self
2356 .get_object_store()
2357 .try_get_object_by_key(id, *old_version)?
2358 else {
2359 panic!(
2360 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2361 );
2362 };
2363 if old_object.owner != owner {
2364 match old_object.owner {
2365 Owner::AddressOwner(addr) => {
2366 deleted_owners.push((addr, *id));
2367 }
2368 Owner::ObjectOwner(object_id) => {
2369 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2370 }
2371 _ => {}
2372 }
2373 }
2374 }
2375
2376 match owner {
2377 Owner::AddressOwner(addr) => {
2378 let new_object = written.get(id).unwrap_or_else(
2381 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2382 );
2383 assert_eq!(
2384 new_object.version(),
2385 oref.1,
2386 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2387 tx_digest,
2388 id,
2389 new_object.version(),
2390 oref.1
2391 );
2392
2393 let type_ = new_object
2394 .type_()
2395 .map(|type_| ObjectType::Struct(type_.clone()))
2396 .unwrap_or(ObjectType::Package);
2397
2398 new_owners.push((
2399 (addr, *id),
2400 ObjectInfo {
2401 object_id: *id,
2402 version: oref.1,
2403 digest: oref.2,
2404 type_,
2405 owner,
2406 previous_transaction: *effects.transaction_digest(),
2407 },
2408 ));
2409 }
2410 Owner::ObjectOwner(owner) => {
2411 let new_object = written.get(id).unwrap_or_else(
2412 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2413 );
2414 assert_eq!(
2415 new_object.version(),
2416 oref.1,
2417 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2418 tx_digest,
2419 id,
2420 new_object.version(),
2421 oref.1
2422 );
2423
2424 let Some(df_info) = self
2425 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2426 .unwrap_or_else(|e| {
2427 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2428 None
2429 }
2430 )
2431 else {
2432 continue;
2434 };
2435 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2436 }
2437 _ => {}
2438 }
2439 }
2440
2441 Ok(ObjectIndexChanges {
2442 deleted_owners,
2443 deleted_dynamic_fields,
2444 new_owners,
2445 new_dynamic_fields,
2446 })
2447 }
2448
2449 fn try_create_dynamic_field_info(
2450 &self,
2451 o: &Object,
2452 written: &WrittenObjects,
2453 resolver: &mut dyn LayoutResolver,
2454 get_latest_object_version: bool,
2455 ) -> IotaResult<Option<DynamicFieldInfo>> {
2456 let Some(move_object) = o.data.try_as_move().cloned() else {
2458 return Ok(None);
2459 };
2460
2461 if !move_object.type_().is_dynamic_field() {
2463 return Ok(None);
2464 }
2465
2466 let layout = resolver
2467 .get_annotated_layout(&move_object.type_().clone().into())?
2468 .into_layout();
2469
2470 let field =
2471 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2472 IotaError::ObjectDeserialization {
2473 error: e.to_string(),
2474 }
2475 })?;
2476
2477 let type_ = field.kind;
2478 let name_type: TypeTag = field.name_layout.into();
2479 let bcs_name = field.name_bytes.to_owned();
2480
2481 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2482 .map_err(|e| {
2483 warn!("{e}");
2484 IotaError::ObjectDeserialization {
2485 error: e.to_string(),
2486 }
2487 })?;
2488
2489 let name = DynamicFieldName {
2490 type_: name_type,
2491 value: IotaMoveValue::from(name_value).to_json_value(),
2492 };
2493
2494 let value_metadata = field.value_metadata().map_err(|e| {
2495 warn!("{e}");
2496 IotaError::ObjectDeserialization {
2497 error: e.to_string(),
2498 }
2499 })?;
2500
2501 Ok(Some(match value_metadata {
2502 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2503 name,
2504 bcs_name,
2505 type_,
2506 object_type: object_type.to_canonical_string(true),
2507 object_id: o.id(),
2508 version: o.version(),
2509 digest: o.digest(),
2510 },
2511
2512 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2513 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2518 (
2519 object.version(),
2520 object.digest(),
2521 object.data.type_().unwrap().clone(),
2522 )
2523 } else {
2524 let object = if get_latest_object_version {
2526 self.get_object_store()
2533 .try_get_object(&object_id)?
2534 .ok_or_else(|| UserInputError::ObjectNotFound {
2535 object_id,
2536 version: Some(o.version()),
2537 })?
2538 } else {
2539 self.get_object_store()
2541 .try_get_object_by_key(&object_id, o.version())?
2542 .ok_or_else(|| UserInputError::ObjectNotFound {
2543 object_id,
2544 version: Some(o.version()),
2545 })?
2546 };
2547
2548 (
2549 object.version(),
2550 object.digest(),
2551 object.data.type_().unwrap().clone(),
2552 )
2553 };
2554
2555 DynamicFieldInfo {
2556 name,
2557 bcs_name,
2558 type_,
2559 object_type: object_type.to_string(),
2560 object_id,
2561 version,
2562 digest,
2563 }
2564 }
2565 }))
2566 }
2567
2568 #[instrument(level = "trace", skip_all, err)]
2569 fn post_process_one_tx(
2570 &self,
2571 certificate: &VerifiedExecutableTransaction,
2572 effects: &TransactionEffects,
2573 inner_temporary_store: &InnerTemporaryStore,
2574 epoch_store: &Arc<AuthorityPerEpochStore>,
2575 ) -> IotaResult {
2576 if self.indexes.is_none() {
2577 return Ok(());
2578 }
2579
2580 let tx_digest = certificate.digest();
2581 let timestamp_ms = Self::unixtime_now_ms();
2582 let events = &inner_temporary_store.events;
2583 let written = &inner_temporary_store.written;
2584 let tx_coins =
2585 self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2586
2587 if let Some(indexes) = &self.indexes {
2589 let _ = self
2590 .index_tx(
2591 indexes.as_ref(),
2592 tx_digest,
2593 certificate,
2594 effects,
2595 events,
2596 timestamp_ms,
2597 tx_coins,
2598 written,
2599 inner_temporary_store,
2600 )
2601 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2602 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2603 .expect("Indexing tx should not fail");
2604
2605 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2606 let events = self.make_transaction_block_events(
2607 events.clone(),
2608 *tx_digest,
2609 timestamp_ms,
2610 epoch_store,
2611 inner_temporary_store,
2612 )?;
2613 self.subscription_handler
2615 .process_tx(certificate.data().transaction_data(), &effects, &events)
2616 .tap_ok(|_| {
2617 self.metrics
2618 .post_processing_total_tx_had_event_processed
2619 .inc()
2620 })
2621 .tap_err(|e| {
2622 warn!(
2623 ?tx_digest,
2624 "Post processing - Couldn't process events for tx: {}", e
2625 )
2626 })?;
2627
2628 self.metrics
2629 .post_processing_total_events_emitted
2630 .inc_by(events.data.len() as u64);
2631 };
2632 Ok(())
2633 }
2634
2635 fn make_transaction_block_events(
2636 &self,
2637 transaction_events: TransactionEvents,
2638 digest: TransactionDigest,
2639 timestamp_ms: u64,
2640 epoch_store: &Arc<AuthorityPerEpochStore>,
2641 inner_temporary_store: &InnerTemporaryStore,
2642 ) -> IotaResult<IotaTransactionBlockEvents> {
2643 let mut layout_resolver =
2644 epoch_store
2645 .executor()
2646 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2647 inner_temporary_store,
2648 self.get_backing_package_store(),
2649 )));
2650 IotaTransactionBlockEvents::try_from(
2651 transaction_events,
2652 digest,
2653 Some(timestamp_ms),
2654 layout_resolver.as_mut(),
2655 )
2656 }
2657
2658 pub fn unixtime_now_ms() -> u64 {
2659 let now = SystemTime::now()
2660 .duration_since(UNIX_EPOCH)
2661 .expect("Time went backwards")
2662 .as_millis();
2663 u64::try_from(now).expect("Travelling in time machine")
2664 }
2665
2666 #[instrument(level = "trace", skip_all)]
2667 pub async fn handle_transaction_info_request(
2668 &self,
2669 request: TransactionInfoRequest,
2670 ) -> IotaResult<TransactionInfoResponse> {
2671 let epoch_store = self.load_epoch_store_one_call_per_task();
2672 let (transaction, status) = self
2673 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2674 .ok_or(IotaError::TransactionNotFound {
2675 digest: request.transaction_digest,
2676 })?;
2677 Ok(TransactionInfoResponse {
2678 transaction,
2679 status,
2680 })
2681 }
2682
2683 #[instrument(level = "trace", skip_all)]
2684 pub async fn handle_object_info_request(
2685 &self,
2686 request: ObjectInfoRequest,
2687 ) -> IotaResult<ObjectInfoResponse> {
2688 let epoch_store = self.load_epoch_store_one_call_per_task();
2689
2690 let requested_object_seq = match request.request_kind {
2691 ObjectInfoRequestKind::LatestObjectInfo => {
2692 let (_, seq, _) = self
2693 .try_get_object_or_tombstone(request.object_id)
2694 .await?
2695 .ok_or_else(|| {
2696 IotaError::from(UserInputError::ObjectNotFound {
2697 object_id: request.object_id,
2698 version: None,
2699 })
2700 })?;
2701 seq
2702 }
2703 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2704 };
2705
2706 let object = self
2707 .get_object_store()
2708 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2709 .ok_or_else(|| {
2710 IotaError::from(UserInputError::ObjectNotFound {
2711 object_id: request.object_id,
2712 version: Some(requested_object_seq),
2713 })
2714 })?;
2715
2716 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2717 (request.generate_layout, object.data.try_as_move())
2718 {
2719 Some(into_struct_layout(
2720 epoch_store
2721 .executor()
2722 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2723 .get_annotated_layout(&move_obj.type_().clone().into())?,
2724 )?)
2725 } else {
2726 None
2727 };
2728
2729 let lock = if !object.is_address_owned() {
2730 None
2732 } else {
2733 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2734 .await?
2735 .map(|s| s.into_inner())
2736 };
2737
2738 Ok(ObjectInfoResponse {
2739 object,
2740 layout,
2741 lock_for_debugging: lock,
2742 })
2743 }
2744
2745 #[instrument(level = "trace", skip_all)]
2746 pub fn handle_checkpoint_request(
2747 &self,
2748 request: &CheckpointRequest,
2749 ) -> IotaResult<CheckpointResponse> {
2750 let summary = if request.certified {
2751 let summary = match request.sequence_number {
2752 Some(seq) => self
2753 .checkpoint_store
2754 .get_checkpoint_by_sequence_number(seq)?,
2755 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2756 }
2757 .map(|v| v.into_inner());
2758 summary.map(CheckpointSummaryResponse::Certified)
2759 } else {
2760 let summary = match request.sequence_number {
2761 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2762 None => self
2763 .checkpoint_store
2764 .get_latest_locally_computed_checkpoint()?,
2765 };
2766 summary.map(CheckpointSummaryResponse::Pending)
2767 };
2768 let contents = match &summary {
2769 Some(s) => self
2770 .checkpoint_store
2771 .get_checkpoint_contents(&s.content_digest())?,
2772 None => None,
2773 };
2774 Ok(CheckpointResponse {
2775 checkpoint: summary,
2776 contents,
2777 })
2778 }
2779
2780 fn check_protocol_version(
2781 supported_protocol_versions: SupportedProtocolVersions,
2782 current_version: ProtocolVersion,
2783 ) {
2784 info!("current protocol version is now {:?}", current_version);
2785 info!("supported versions are: {:?}", supported_protocol_versions);
2786 if !supported_protocol_versions.is_version_supported(current_version) {
2787 let msg = format!(
2788 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2789 );
2790
2791 error!("{}", msg);
2792 eprintln!("{msg}");
2793
2794 #[cfg(not(msim))]
2795 std::process::exit(1);
2796
2797 #[cfg(msim)]
2798 iota_simulator::task::shutdown_current_node();
2799 }
2800 }
2801
2802 #[expect(clippy::disallowed_methods)] pub async fn new(
2804 name: AuthorityName,
2805 secret: StableSyncAuthoritySigner,
2806 supported_protocol_versions: SupportedProtocolVersions,
2807 store: Arc<AuthorityStore>,
2808 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2809 epoch_store: Arc<AuthorityPerEpochStore>,
2810 committee_store: Arc<CommitteeStore>,
2811 indexes: Option<Arc<IndexStore>>,
2812 rest_index: Option<Arc<RestIndexStore>>,
2813 checkpoint_store: Arc<CheckpointStore>,
2814 prometheus_registry: &Registry,
2815 genesis_objects: &[Object],
2816 db_checkpoint_config: &DBCheckpointConfig,
2817 config: NodeConfig,
2818 archive_readers: ArchiveReaderBalancer,
2819 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2820 chain_identifier: ChainIdentifier,
2821 pruner_db: Option<Arc<AuthorityPrunerTables>>,
2822 ) -> Arc<Self> {
2823 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2824
2825 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2826 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2827 let transaction_manager = Arc::new(TransactionManager::new(
2828 execution_cache_trait_pointers.object_cache_reader.clone(),
2829 execution_cache_trait_pointers
2830 .transaction_cache_reader
2831 .clone(),
2832 &epoch_store,
2833 tx_ready_certificates,
2834 metrics.clone(),
2835 ));
2836 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2837
2838 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2839 epoch_store.get_parent_path(),
2840 &config.authority_store_pruning_config,
2841 );
2842 let _pruner = AuthorityStorePruner::new(
2843 store.perpetual_tables.clone(),
2844 checkpoint_store.clone(),
2845 rest_index.clone(),
2846 indexes.clone(),
2847 config.authority_store_pruning_config.clone(),
2848 epoch_store.committee().authority_exists(&name),
2849 epoch_store.epoch_start_state().epoch_duration_ms(),
2850 prometheus_registry,
2851 archive_readers,
2852 pruner_db,
2853 );
2854 let input_loader =
2855 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2856 let epoch = epoch_store.epoch();
2857 let rgp = epoch_store.reference_gas_price();
2858 let state = Arc::new(AuthorityState {
2859 name,
2860 secret,
2861 execution_lock: RwLock::new(epoch),
2862 epoch_store: ArcSwap::new(epoch_store.clone()),
2863 input_loader,
2864 execution_cache_trait_pointers,
2865 indexes,
2866 rest_index,
2867 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2868 checkpoint_store,
2869 committee_store,
2870 transaction_manager,
2871 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2872 metrics,
2873 _pruner,
2874 _authority_per_epoch_pruner,
2875 db_checkpoint_config: db_checkpoint_config.clone(),
2876 config,
2877 overload_info: AuthorityOverloadInfo::default(),
2878 validator_tx_finalizer,
2879 chain_identifier,
2880 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
2881 });
2882
2883 let authority_state = Arc::downgrade(&state);
2885 spawn_monitored_task!(execution_process(
2886 authority_state,
2887 rx_ready_certificates,
2888 rx_execution_shutdown,
2889 ));
2890
2891 state
2893 .create_owner_index_if_empty(genesis_objects, &epoch_store)
2894 .expect("Error indexing genesis objects.");
2895
2896 state
2897 }
2898
2899 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2901 &self.execution_cache_trait_pointers.object_cache_reader
2902 }
2903
2904 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2905 &self.execution_cache_trait_pointers.transaction_cache_reader
2906 }
2907
2908 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2909 &self.execution_cache_trait_pointers.cache_writer
2910 }
2911
2912 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2913 &self.execution_cache_trait_pointers.backing_store
2914 }
2915
2916 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2917 &self.execution_cache_trait_pointers.backing_package_store
2918 }
2919
2920 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2921 &self.execution_cache_trait_pointers.object_store
2922 }
2923
2924 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2925 &self.execution_cache_trait_pointers.reconfig_api
2926 }
2927
2928 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2929 &self.execution_cache_trait_pointers.accumulator_store
2930 }
2931
2932 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2933 &self.execution_cache_trait_pointers.checkpoint_cache
2934 }
2935
2936 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2937 &self.execution_cache_trait_pointers.state_sync_store
2938 }
2939
2940 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2941 &self.execution_cache_trait_pointers.cache_commit
2942 }
2943
2944 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2945 self.execution_cache_trait_pointers
2946 .testing_api
2947 .database_for_testing()
2948 }
2949
2950 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2951 &self,
2952 config: NodeConfig,
2953 metrics: Arc<AuthorityStorePruningMetrics>,
2954 ) -> anyhow::Result<()> {
2955 let archive_readers =
2956 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
2957 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
2958 &self.database_for_testing().perpetual_tables,
2959 &self.checkpoint_store,
2960 self.rest_index.as_deref(),
2961 None,
2962 config.authority_store_pruning_config,
2963 metrics,
2964 archive_readers,
2965 EPOCH_DURATION_MS_FOR_TESTING,
2966 )
2967 .await
2968 }
2969
2970 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
2971 &self.transaction_manager
2972 }
2973
2974 pub fn enqueue_transactions_for_execution(
2977 &self,
2978 txns: Vec<VerifiedExecutableTransaction>,
2979 epoch_store: &Arc<AuthorityPerEpochStore>,
2980 ) {
2981 self.transaction_manager.enqueue(txns, epoch_store)
2982 }
2983
2984 pub fn enqueue_certificates_for_execution(
2986 &self,
2987 certs: Vec<VerifiedCertificate>,
2988 epoch_store: &Arc<AuthorityPerEpochStore>,
2989 ) {
2990 self.transaction_manager
2991 .enqueue_certificates(certs, epoch_store)
2992 }
2993
2994 pub fn enqueue_with_expected_effects_digest(
2995 &self,
2996 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
2997 epoch_store: &AuthorityPerEpochStore,
2998 ) {
2999 self.transaction_manager
3000 .enqueue_with_expected_effects_digest(certs, epoch_store)
3001 }
3002
3003 fn create_owner_index_if_empty(
3004 &self,
3005 genesis_objects: &[Object],
3006 epoch_store: &Arc<AuthorityPerEpochStore>,
3007 ) -> IotaResult {
3008 let Some(index_store) = &self.indexes else {
3009 return Ok(());
3010 };
3011 if !index_store.is_empty() {
3012 return Ok(());
3013 }
3014
3015 let mut new_owners = vec![];
3016 let mut new_dynamic_fields = vec![];
3017 let mut layout_resolver = epoch_store
3018 .executor()
3019 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3020 for o in genesis_objects.iter() {
3021 match o.owner {
3022 Owner::AddressOwner(addr) => new_owners.push((
3023 (addr, o.id()),
3024 ObjectInfo::new(&o.compute_object_reference(), o),
3025 )),
3026 Owner::ObjectOwner(object_id) => {
3027 let id = o.id();
3028 let Some(info) = self.try_create_dynamic_field_info(
3029 o,
3030 &BTreeMap::new(),
3031 layout_resolver.as_mut(),
3032 true,
3033 )?
3034 else {
3035 continue;
3036 };
3037 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3038 }
3039 _ => {}
3040 }
3041 }
3042
3043 index_store.insert_genesis_objects(ObjectIndexChanges {
3044 deleted_owners: vec![],
3045 deleted_dynamic_fields: vec![],
3046 new_owners,
3047 new_dynamic_fields,
3048 })
3049 }
3050
3051 pub fn execution_lock_for_executable_transaction(
3055 &self,
3056 transaction: &VerifiedExecutableTransaction,
3057 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3058 let lock = self
3059 .execution_lock
3060 .try_read()
3061 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3062 if *lock == transaction.auth_sig().epoch() {
3063 Ok(lock)
3064 } else {
3065 Err(IotaError::WrongEpoch {
3066 expected_epoch: *lock,
3067 actual_epoch: transaction.auth_sig().epoch(),
3068 })
3069 }
3070 }
3071
3072 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3078 self.execution_lock
3079 .try_read()
3080 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3081 }
3082
3083 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3084 self.execution_lock.write().await
3085 }
3086
3087 #[instrument(level = "error", skip_all)]
3088 pub async fn reconfigure(
3089 &self,
3090 cur_epoch_store: &AuthorityPerEpochStore,
3091 supported_protocol_versions: SupportedProtocolVersions,
3092 new_committee: Committee,
3093 epoch_start_configuration: EpochStartConfiguration,
3094 accumulator: Arc<StateAccumulator>,
3095 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3096 epoch_supply_change: i64,
3097 epoch_last_checkpoint: CheckpointSequenceNumber,
3098 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3099 Self::check_protocol_version(
3100 supported_protocol_versions,
3101 epoch_start_configuration
3102 .epoch_start_state()
3103 .protocol_version(),
3104 );
3105 self.metrics.reset_on_reconfigure();
3106 self.committee_store.insert_new_committee(&new_committee)?;
3107
3108 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3110
3111 cur_epoch_store.epoch_terminated().await;
3113
3114 let highest_locally_built_checkpoint_seq = self
3115 .checkpoint_store
3116 .get_latest_locally_computed_checkpoint()?
3117 .map(|c| *c.sequence_number())
3118 .unwrap_or(0);
3119
3120 assert!(
3121 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3122 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3123 );
3124 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3125 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3129 if num_shared_version_assignments > 1 {
3132 debug_fatal!(
3134 "all shared_version_assignments should have been removed \
3135 (num_shared_version_assignments: {num_shared_version_assignments})"
3136 );
3137 }
3138 }
3139
3140 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3146 .await?;
3147 self.get_reconfig_api()
3148 .clear_state_end_of_epoch(&execution_lock);
3149 self.check_system_consistency(
3150 cur_epoch_store,
3151 accumulator,
3152 expensive_safety_check_config,
3153 epoch_supply_change,
3154 )?;
3155 self.get_reconfig_api()
3156 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3157 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3158 if self
3159 .db_checkpoint_config
3160 .perform_db_checkpoints_at_epoch_end
3161 {
3162 let checkpoint_indexes = self
3163 .db_checkpoint_config
3164 .perform_index_db_checkpoints_at_epoch_end
3165 .unwrap_or(false);
3166 let current_epoch = cur_epoch_store.epoch();
3167 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3168 self.checkpoint_all_dbs(
3169 &epoch_checkpoint_path,
3170 cur_epoch_store,
3171 checkpoint_indexes,
3172 )?;
3173 }
3174 }
3175
3176 self.get_reconfig_api()
3177 .reconfigure_cache(&epoch_start_configuration)
3178 .await;
3179
3180 let new_epoch = new_committee.epoch;
3181 let new_epoch_store = self
3182 .reopen_epoch_db(
3183 cur_epoch_store,
3184 new_committee,
3185 epoch_start_configuration,
3186 expensive_safety_check_config,
3187 epoch_last_checkpoint,
3188 )
3189 .await?;
3190 assert_eq!(new_epoch_store.epoch(), new_epoch);
3191 self.transaction_manager.reconfigure(new_epoch);
3192 *execution_lock = new_epoch;
3193 Ok(new_epoch_store)
3197 }
3198
3199 pub async fn reconfigure_for_testing(&self) {
3204 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3205 let epoch_store = self.epoch_store_for_testing().clone();
3206 let protocol_config = epoch_store.protocol_config().clone();
3207 let _guard =
3215 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3216 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3217 self.get_backing_package_store().clone(),
3218 self.get_object_store().clone(),
3219 &self.config.expensive_safety_check_config,
3220 self.checkpoint_store
3221 .get_epoch_last_checkpoint(epoch_store.epoch())
3222 .unwrap()
3223 .map(|c| *c.sequence_number())
3224 .unwrap_or_default(),
3225 );
3226 let new_epoch = new_epoch_store.epoch();
3227 self.transaction_manager.reconfigure(new_epoch);
3228 self.epoch_store.store(new_epoch_store);
3229 epoch_store.epoch_terminated().await;
3230 *execution_lock = new_epoch;
3231 }
3232
3233 #[instrument(level = "error", skip_all)]
3234 fn check_system_consistency(
3235 &self,
3236 cur_epoch_store: &AuthorityPerEpochStore,
3237 accumulator: Arc<StateAccumulator>,
3238 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3239 epoch_supply_change: i64,
3240 ) -> IotaResult<()> {
3241 info!(
3242 "Performing iota conservation consistency check for epoch {}",
3243 cur_epoch_store.epoch()
3244 );
3245
3246 if cfg!(debug_assertions) {
3247 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3248 }
3249
3250 self.get_reconfig_api()
3251 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3252
3253 if expensive_safety_check_config.enable_state_consistency_check() {
3255 info!(
3256 "Performing state consistency check for epoch {}",
3257 cur_epoch_store.epoch()
3258 );
3259 self.expensive_check_is_consistent_state(
3260 accumulator,
3261 cur_epoch_store,
3262 cfg!(debug_assertions), );
3264 }
3265
3266 if expensive_safety_check_config.enable_secondary_index_checks() {
3267 if let Some(indexes) = self.indexes.clone() {
3268 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3269 .expect("secondary indexes are inconsistent");
3270 }
3271 }
3272
3273 Ok(())
3274 }
3275
3276 fn expensive_check_is_consistent_state(
3277 &self,
3278 accumulator: Arc<StateAccumulator>,
3279 cur_epoch_store: &AuthorityPerEpochStore,
3280 panic: bool,
3281 ) {
3282 let live_object_set_hash = accumulator.digest_live_object_set();
3283
3284 let root_state_hash: ECMHLiveObjectSetDigest = self
3285 .get_accumulator_store()
3286 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3287 .expect("Retrieving root state hash cannot fail")
3288 .expect("Root state hash for epoch must exist")
3289 .1
3290 .digest()
3291 .into();
3292
3293 let is_inconsistent = root_state_hash != live_object_set_hash;
3294 if is_inconsistent {
3295 if panic {
3296 panic!(
3297 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3298 );
3299 } else {
3300 error!(
3301 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3302 root_state_hash, live_object_set_hash
3303 );
3304 }
3305 } else {
3306 info!("State consistency check passed");
3307 }
3308
3309 if !panic {
3310 accumulator.set_inconsistent_state(is_inconsistent);
3311 }
3312 }
3313
3314 pub fn current_epoch_for_testing(&self) -> EpochId {
3315 self.epoch_store_for_testing().epoch()
3316 }
3317
3318 #[instrument(level = "error", skip_all)]
3319 pub fn checkpoint_all_dbs(
3320 &self,
3321 checkpoint_path: &Path,
3322 cur_epoch_store: &AuthorityPerEpochStore,
3323 checkpoint_indexes: bool,
3324 ) -> IotaResult {
3325 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3326 let current_epoch = cur_epoch_store.epoch();
3327
3328 if checkpoint_path.exists() {
3329 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3330 return Ok(());
3331 }
3332
3333 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3334 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3335
3336 if checkpoint_path_tmp.exists() {
3337 fs::remove_dir_all(&checkpoint_path_tmp)
3338 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3339 }
3340
3341 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3342 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3343
3344 self.checkpoint_store
3347 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3348
3349 self.get_reconfig_api()
3350 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3351
3352 self.committee_store
3353 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3354
3355 if checkpoint_indexes {
3356 if let Some(indexes) = self.indexes.as_ref() {
3357 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3358 }
3359 }
3360
3361 fs::rename(checkpoint_path_tmp, checkpoint_path)
3362 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3363 Ok(())
3364 }
3365
3366 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3373 self.epoch_store.load()
3374 }
3375
3376 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3378 self.load_epoch_store_one_call_per_task()
3379 }
3380
3381 pub fn clone_committee_for_testing(&self) -> Committee {
3382 Committee::clone(self.epoch_store_for_testing().committee())
3383 }
3384
3385 #[instrument(level = "trace", skip_all)]
3386 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3387 self.get_object_store()
3388 .try_get_object(object_id)
3389 .map_err(Into::into)
3390 }
3391
3392 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3394 self.try_get_object(object_id)
3395 .await
3396 .expect("storage access failed")
3397 }
3398
3399 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3400 Ok(self
3401 .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3402 .await?
3403 .expect("framework object should always exist")
3404 .compute_object_reference())
3405 }
3406
3407 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3409 self.get_object_cache_reader()
3410 .try_get_iota_system_state_object_unsafe()
3411 }
3412
3413 #[instrument(level = "trace", skip_all)]
3414 pub fn get_checkpoint_by_sequence_number(
3415 &self,
3416 sequence_number: CheckpointSequenceNumber,
3417 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3418 Ok(self
3419 .checkpoint_store
3420 .get_checkpoint_by_sequence_number(sequence_number)?)
3421 }
3422
3423 #[instrument(level = "trace", skip_all)]
3424 pub fn get_transaction_checkpoint_for_tests(
3425 &self,
3426 digest: &TransactionDigest,
3427 epoch_store: &AuthorityPerEpochStore,
3428 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3429 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3430 let Some(checkpoint) = checkpoint else {
3431 return Ok(None);
3432 };
3433 let checkpoint = self
3434 .checkpoint_store
3435 .get_checkpoint_by_sequence_number(checkpoint)?;
3436 Ok(checkpoint)
3437 }
3438
3439 #[instrument(level = "trace", skip_all)]
3440 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3441 Ok(
3442 match self
3443 .get_object_cache_reader()
3444 .try_get_latest_object_or_tombstone(*object_id)?
3445 {
3446 Some((_, ObjectOrTombstone::Object(object))) => {
3447 let layout = self.get_object_layout(&object)?;
3448 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3449 }
3450 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3451 None => ObjectRead::NotExists(*object_id),
3452 },
3453 )
3454 }
3455
3456 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3458 self.chain_identifier
3459 }
3460
3461 #[instrument(level = "trace", skip_all)]
3462 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3463 where
3464 T: DeserializeOwned,
3465 {
3466 let o = self.get_object_read(object_id)?.into_object()?;
3467 if let Some(move_object) = o.data.try_as_move() {
3468 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3469 IotaError::ObjectDeserialization {
3470 error: format!("{e}"),
3471 }
3472 })?)
3473 } else {
3474 Err(IotaError::ObjectDeserialization {
3475 error: format!("Provided object : [{object_id}] is not a Move object."),
3476 })
3477 }
3478 }
3479
3480 #[instrument(level = "trace", skip_all)]
3486 pub fn get_past_object_read(
3487 &self,
3488 object_id: &ObjectID,
3489 version: SequenceNumber,
3490 ) -> IotaResult<PastObjectRead> {
3491 let Some(obj_ref) = self
3493 .get_object_cache_reader()
3494 .try_get_latest_object_ref_or_tombstone(*object_id)?
3495 else {
3496 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3497 };
3498
3499 if version > obj_ref.1 {
3500 return Ok(PastObjectRead::VersionTooHigh {
3501 object_id: *object_id,
3502 asked_version: version,
3503 latest_version: obj_ref.1,
3504 });
3505 }
3506
3507 if version < obj_ref.1 {
3508 return Ok(match self.read_object_at_version(object_id, version)? {
3510 Some((object, layout)) => {
3511 let obj_ref = object.compute_object_reference();
3512 PastObjectRead::VersionFound(obj_ref, object, layout)
3513 }
3514
3515 None => PastObjectRead::VersionNotFound(*object_id, version),
3516 });
3517 }
3518
3519 if !obj_ref.2.is_alive() {
3520 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3521 }
3522
3523 match self.read_object_at_version(object_id, obj_ref.1)? {
3524 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3525 None => {
3526 error!(
3527 "Object with in parent_entry is missing from object store, datastore is \
3528 inconsistent",
3529 );
3530 Err(UserInputError::ObjectNotFound {
3531 object_id: *object_id,
3532 version: Some(obj_ref.1),
3533 }
3534 .into())
3535 }
3536 }
3537 }
3538
3539 #[instrument(level = "trace", skip_all)]
3540 fn read_object_at_version(
3541 &self,
3542 object_id: &ObjectID,
3543 version: SequenceNumber,
3544 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3545 let Some(object) = self
3546 .get_object_cache_reader()
3547 .try_get_object_by_key(object_id, version)?
3548 else {
3549 return Ok(None);
3550 };
3551
3552 let layout = self.get_object_layout(&object)?;
3553 Ok(Some((object, layout)))
3554 }
3555
3556 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3557 let layout = object
3558 .data
3559 .try_as_move()
3560 .map(|object| {
3561 into_struct_layout(
3562 self.load_epoch_store_one_call_per_task()
3563 .executor()
3564 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3566 .get_annotated_layout(&object.type_().clone().into())?,
3567 )
3568 })
3569 .transpose()?;
3570 Ok(layout)
3571 }
3572
3573 fn get_owner_at_version(
3574 &self,
3575 object_id: &ObjectID,
3576 version: SequenceNumber,
3577 ) -> IotaResult<Owner> {
3578 self.get_object_store()
3579 .try_get_object_by_key(object_id, version)?
3580 .ok_or_else(|| {
3581 IotaError::from(UserInputError::ObjectNotFound {
3582 object_id: *object_id,
3583 version: Some(version),
3584 })
3585 })
3586 .map(|o| o.owner)
3587 }
3588
3589 #[instrument(level = "trace", skip_all)]
3590 pub fn get_owner_objects(
3591 &self,
3592 owner: IotaAddress,
3593 cursor: Option<ObjectID>,
3595 limit: usize,
3596 filter: Option<IotaObjectDataFilter>,
3597 ) -> IotaResult<Vec<ObjectInfo>> {
3598 if let Some(indexes) = &self.indexes {
3599 indexes.get_owner_objects(owner, cursor, limit, filter)
3600 } else {
3601 Err(IotaError::IndexStoreNotAvailable)
3602 }
3603 }
3604
3605 #[instrument(level = "trace", skip_all)]
3606 pub fn get_owned_coins_iterator_with_cursor(
3607 &self,
3608 owner: IotaAddress,
3609 cursor: (String, ObjectID),
3611 limit: usize,
3612 one_coin_type_only: bool,
3613 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3614 if let Some(indexes) = &self.indexes {
3615 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3616 } else {
3617 Err(IotaError::IndexStoreNotAvailable)
3618 }
3619 }
3620
3621 #[instrument(level = "trace", skip_all)]
3622 pub fn get_owner_objects_iterator(
3623 &self,
3624 owner: IotaAddress,
3625 cursor: Option<ObjectID>,
3627 filter: Option<IotaObjectDataFilter>,
3628 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3629 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3630 if let Some(indexes) = &self.indexes {
3631 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3632 } else {
3633 Err(IotaError::IndexStoreNotAvailable)
3634 }
3635 }
3636
3637 #[instrument(level = "trace", skip_all)]
3638 pub async fn get_move_objects<T>(
3639 &self,
3640 owner: IotaAddress,
3641 type_: MoveObjectType,
3642 ) -> IotaResult<Vec<T>>
3643 where
3644 T: DeserializeOwned,
3645 {
3646 let object_ids = self
3647 .get_owner_objects_iterator(owner, None, None)?
3648 .filter(|o| match &o.type_ {
3649 ObjectType::Struct(s) => &type_ == s,
3650 ObjectType::Package => false,
3651 })
3652 .map(|info| ObjectKey(info.object_id, info.version))
3653 .collect::<Vec<_>>();
3654 let mut move_objects = vec![];
3655
3656 let objects = self
3657 .get_object_store()
3658 .try_multi_get_objects_by_key(&object_ids)?;
3659
3660 for (o, id) in objects.into_iter().zip(object_ids) {
3661 let object = o.ok_or_else(|| {
3662 IotaError::from(UserInputError::ObjectNotFound {
3663 object_id: id.0,
3664 version: Some(id.1),
3665 })
3666 })?;
3667 let move_object = object.data.try_as_move().ok_or_else(|| {
3668 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3669 })?;
3670 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3671 IotaError::ObjectDeserialization {
3672 error: format!("{e}"),
3673 }
3674 })?);
3675 }
3676 Ok(move_objects)
3677 }
3678
3679 #[instrument(level = "trace", skip_all)]
3680 pub fn get_dynamic_fields(
3681 &self,
3682 owner: ObjectID,
3683 cursor: Option<ObjectID>,
3685 limit: usize,
3686 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3687 Ok(self
3688 .get_dynamic_fields_iterator(owner, cursor)?
3689 .take(limit)
3690 .collect::<Result<Vec<_>, _>>()?)
3691 }
3692
3693 fn get_dynamic_fields_iterator(
3694 &self,
3695 owner: ObjectID,
3696 cursor: Option<ObjectID>,
3698 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3699 {
3700 if let Some(indexes) = &self.indexes {
3701 indexes.get_dynamic_fields_iterator(owner, cursor)
3702 } else {
3703 Err(IotaError::IndexStoreNotAvailable)
3704 }
3705 }
3706
3707 #[instrument(level = "trace", skip_all)]
3708 pub fn get_dynamic_field_object_id(
3709 &self,
3710 owner: ObjectID,
3711 name_type: TypeTag,
3712 name_bcs_bytes: &[u8],
3713 ) -> IotaResult<Option<ObjectID>> {
3714 if let Some(indexes) = &self.indexes {
3715 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3716 } else {
3717 Err(IotaError::IndexStoreNotAvailable)
3718 }
3719 }
3720
3721 #[instrument(level = "trace", skip_all)]
3722 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3723 Ok(self.get_indexes()?.next_sequence_number())
3724 }
3725
3726 #[instrument(level = "trace", skip_all)]
3727 pub async fn get_executed_transaction_and_effects(
3728 &self,
3729 digest: TransactionDigest,
3730 kv_store: Arc<TransactionKeyValueStore>,
3731 ) -> IotaResult<(Transaction, TransactionEffects)> {
3732 let transaction = kv_store.get_tx(digest).await?;
3733 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3734 Ok((transaction, effects))
3735 }
3736
3737 #[instrument(level = "trace", skip_all)]
3738 pub fn multi_get_checkpoint_by_sequence_number(
3739 &self,
3740 sequence_numbers: &[CheckpointSequenceNumber],
3741 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3742 Ok(self
3743 .checkpoint_store
3744 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3745 }
3746
3747 #[instrument(level = "trace", skip_all)]
3748 pub fn get_transaction_events(
3749 &self,
3750 digest: &TransactionEventsDigest,
3751 ) -> IotaResult<TransactionEvents> {
3752 self.get_transaction_cache_reader()
3753 .try_get_events(digest)?
3754 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3755 }
3756
3757 pub fn get_transaction_input_objects(
3758 &self,
3759 effects: &TransactionEffects,
3760 ) -> anyhow::Result<Vec<Object>> {
3761 let input_object_keys = effects
3762 .modified_at_versions()
3763 .into_iter()
3764 .map(|(object_id, version)| ObjectKey(object_id, version))
3765 .collect::<Vec<_>>();
3766
3767 let input_objects = self
3768 .get_object_store()
3769 .try_multi_get_objects_by_key(&input_object_keys)?
3770 .into_iter()
3771 .enumerate()
3772 .map(|(idx, maybe_object)| {
3773 maybe_object.ok_or_else(|| {
3774 anyhow::anyhow!(
3775 "missing input object key {:?} from tx {}",
3776 input_object_keys[idx],
3777 effects.transaction_digest()
3778 )
3779 })
3780 })
3781 .collect::<anyhow::Result<Vec<_>>>()?;
3782 Ok(input_objects)
3783 }
3784
3785 pub fn get_transaction_output_objects(
3786 &self,
3787 effects: &TransactionEffects,
3788 ) -> anyhow::Result<Vec<Object>> {
3789 let output_object_keys = effects
3790 .all_changed_objects()
3791 .into_iter()
3792 .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3793 .collect::<Vec<_>>();
3794
3795 let output_objects = self
3796 .get_object_store()
3797 .try_multi_get_objects_by_key(&output_object_keys)?
3798 .into_iter()
3799 .enumerate()
3800 .map(|(idx, maybe_object)| {
3801 maybe_object.ok_or_else(|| {
3802 anyhow::anyhow!(
3803 "missing output object key {:?} from tx {}",
3804 output_object_keys[idx],
3805 effects.transaction_digest()
3806 )
3807 })
3808 })
3809 .collect::<anyhow::Result<Vec<_>>>()?;
3810 Ok(output_objects)
3811 }
3812
3813 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3814 match &self.indexes {
3815 Some(i) => Ok(i.clone()),
3816 None => Err(IotaError::UnsupportedFeature {
3817 error: "extended object indexing is not enabled on this server".into(),
3818 }),
3819 }
3820 }
3821
3822 pub async fn get_transactions_for_tests(
3823 self: &Arc<Self>,
3824 filter: Option<TransactionFilter>,
3825 cursor: Option<TransactionDigest>,
3826 limit: Option<usize>,
3827 reverse: bool,
3828 ) -> IotaResult<Vec<TransactionDigest>> {
3829 let metrics = KeyValueStoreMetrics::new_for_tests();
3830 let kv_store = Arc::new(TransactionKeyValueStore::new(
3831 "rocksdb",
3832 metrics,
3833 self.clone(),
3834 ));
3835 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3836 .await
3837 }
3838
3839 #[instrument(level = "trace", skip_all)]
3840 pub async fn get_transactions(
3841 &self,
3842 kv_store: &Arc<TransactionKeyValueStore>,
3843 filter: Option<TransactionFilter>,
3844 cursor: Option<TransactionDigest>,
3846 limit: Option<usize>,
3847 reverse: bool,
3848 ) -> IotaResult<Vec<TransactionDigest>> {
3849 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3850 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3851 let iter = checkpoint_contents.iter().map(|c| c.transaction);
3852 if reverse {
3853 let iter = iter
3854 .rev()
3855 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3856 .skip(usize::from(cursor.is_some()));
3857 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3858 } else {
3859 let iter = iter
3860 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3861 .skip(usize::from(cursor.is_some()));
3862 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3863 }
3864 }
3865 self.get_indexes()?
3866 .get_transactions(filter, cursor, limit, reverse)
3867 }
3868
3869 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3870 &self.checkpoint_store
3871 }
3872
3873 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3874 self.get_checkpoint_store()
3875 .get_highest_executed_checkpoint_seq_number()?
3876 .ok_or(IotaError::UserInput {
3877 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3878 })
3879 }
3880
3881 #[cfg(msim)]
3882 pub fn get_highest_pruned_checkpoint_for_testing(
3883 &self,
3884 ) -> IotaResult<CheckpointSequenceNumber> {
3885 self.database_for_testing()
3886 .perpetual_tables
3887 .get_highest_pruned_checkpoint()
3888 }
3889
3890 #[instrument(level = "trace", skip_all)]
3891 pub fn get_checkpoint_summary_by_sequence_number(
3892 &self,
3893 sequence_number: CheckpointSequenceNumber,
3894 ) -> IotaResult<CheckpointSummary> {
3895 let verified_checkpoint = self
3896 .get_checkpoint_store()
3897 .get_checkpoint_by_sequence_number(sequence_number)?;
3898 match verified_checkpoint {
3899 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3900 None => Err(IotaError::UserInput {
3901 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3902 }),
3903 }
3904 }
3905
3906 #[instrument(level = "trace", skip_all)]
3907 pub fn get_checkpoint_summary_by_digest(
3908 &self,
3909 digest: CheckpointDigest,
3910 ) -> IotaResult<CheckpointSummary> {
3911 let verified_checkpoint = self
3912 .get_checkpoint_store()
3913 .get_checkpoint_by_digest(&digest)?;
3914 match verified_checkpoint {
3915 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3916 None => Err(IotaError::UserInput {
3917 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3918 }),
3919 }
3920 }
3921
3922 #[instrument(level = "trace", skip_all)]
3923 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3924 if is_system_package(package_id) {
3925 return self.find_genesis_txn_digest();
3926 }
3927 Ok(self
3928 .get_object_read(&package_id)?
3929 .into_object()?
3930 .previous_transaction)
3931 }
3932
3933 #[instrument(level = "trace", skip_all)]
3934 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3935 let summary = self
3936 .get_verified_checkpoint_by_sequence_number(0)?
3937 .into_message();
3938 let content = self.get_checkpoint_contents(summary.content_digest)?;
3939 let genesis_transaction = content.enumerate_transactions(&summary).next();
3940 Ok(genesis_transaction
3941 .ok_or(IotaError::UserInput {
3942 error: UserInputError::GenesisTransactionNotFound,
3943 })?
3944 .1
3945 .transaction)
3946 }
3947
3948 #[instrument(level = "trace", skip_all)]
3949 pub fn get_verified_checkpoint_by_sequence_number(
3950 &self,
3951 sequence_number: CheckpointSequenceNumber,
3952 ) -> IotaResult<VerifiedCheckpoint> {
3953 let verified_checkpoint = self
3954 .get_checkpoint_store()
3955 .get_checkpoint_by_sequence_number(sequence_number)?;
3956 match verified_checkpoint {
3957 Some(verified_checkpoint) => Ok(verified_checkpoint),
3958 None => Err(IotaError::UserInput {
3959 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3960 }),
3961 }
3962 }
3963
3964 #[instrument(level = "trace", skip_all)]
3965 pub fn get_verified_checkpoint_summary_by_digest(
3966 &self,
3967 digest: CheckpointDigest,
3968 ) -> IotaResult<VerifiedCheckpoint> {
3969 let verified_checkpoint = self
3970 .get_checkpoint_store()
3971 .get_checkpoint_by_digest(&digest)?;
3972 match verified_checkpoint {
3973 Some(verified_checkpoint) => Ok(verified_checkpoint),
3974 None => Err(IotaError::UserInput {
3975 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3976 }),
3977 }
3978 }
3979
3980 #[instrument(level = "trace", skip_all)]
3981 pub fn get_checkpoint_contents(
3982 &self,
3983 digest: CheckpointContentsDigest,
3984 ) -> IotaResult<CheckpointContents> {
3985 self.get_checkpoint_store()
3986 .get_checkpoint_contents(&digest)?
3987 .ok_or(IotaError::UserInput {
3988 error: UserInputError::CheckpointContentsNotFound(digest),
3989 })
3990 }
3991
3992 #[instrument(level = "trace", skip_all)]
3993 pub fn get_checkpoint_contents_by_sequence_number(
3994 &self,
3995 sequence_number: CheckpointSequenceNumber,
3996 ) -> IotaResult<CheckpointContents> {
3997 let verified_checkpoint = self
3998 .get_checkpoint_store()
3999 .get_checkpoint_by_sequence_number(sequence_number)?;
4000 match verified_checkpoint {
4001 Some(verified_checkpoint) => {
4002 let content_digest = verified_checkpoint.into_inner().content_digest;
4003 self.get_checkpoint_contents(content_digest)
4004 }
4005 None => Err(IotaError::UserInput {
4006 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4007 }),
4008 }
4009 }
4010
4011 #[instrument(level = "trace", skip_all)]
4012 pub async fn query_events(
4013 &self,
4014 kv_store: &Arc<TransactionKeyValueStore>,
4015 query: EventFilter,
4016 cursor: Option<EventID>,
4018 limit: usize,
4019 descending: bool,
4020 ) -> IotaResult<Vec<IotaEvent>> {
4021 let index_store = self.get_indexes()?;
4022
4023 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4025 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4026 IotaError::TransactionNotFound {
4027 digest: cursor.tx_digest,
4028 },
4029 )?;
4030 (tx_seq, cursor.event_seq as usize)
4031 } else if descending {
4032 (u64::MAX, usize::MAX)
4033 } else {
4034 (0, 0)
4035 };
4036
4037 let limit = limit + 1;
4038 let mut event_keys = match query {
4039 EventFilter::All(filters) => {
4040 if filters.is_empty() {
4041 index_store.all_events(tx_num, event_num, limit, descending)?
4042 } else {
4043 return Err(IotaError::UserInput {
4044 error: UserInputError::Unsupported(
4045 "This query type does not currently support filter combinations"
4046 .to_string(),
4047 ),
4048 });
4049 }
4050 }
4051 EventFilter::Transaction(digest) => {
4052 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4053 }
4054 EventFilter::MoveModule { package, module } => {
4055 let module_id = ModuleId::new(package.into(), module);
4056 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4057 }
4058 EventFilter::MoveEventType(struct_name) => index_store
4059 .events_by_move_event_struct_name(
4060 &struct_name,
4061 tx_num,
4062 event_num,
4063 limit,
4064 descending,
4065 )?,
4066 EventFilter::Sender(sender) => {
4067 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4068 }
4069 EventFilter::TimeRange {
4070 start_time,
4071 end_time,
4072 } => index_store
4073 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4074 EventFilter::MoveEventModule { package, module } => index_store
4075 .events_by_move_event_module(
4076 &ModuleId::new(package.into(), module),
4077 tx_num,
4078 event_num,
4079 limit,
4080 descending,
4081 )?,
4082 EventFilter::Package(_)
4084 | EventFilter::MoveEventField { .. }
4085 | EventFilter::Any(_)
4086 | EventFilter::And(_, _)
4087 | EventFilter::Or(_, _) => {
4088 return Err(IotaError::UserInput {
4089 error: UserInputError::Unsupported(
4090 "This query type is not supported by the full node.".to_string(),
4091 ),
4092 });
4093 }
4094 };
4095
4096 if cursor.is_some() {
4099 if !event_keys.is_empty() {
4100 event_keys.remove(0);
4101 }
4102 } else {
4103 event_keys.truncate(limit - 1);
4104 }
4105
4106 let transaction_digests = event_keys
4108 .iter()
4109 .map(|(_, digest, _, _)| *digest)
4110 .collect::<HashSet<_>>()
4111 .into_iter()
4112 .collect::<Vec<_>>();
4113
4114 let events = kv_store
4115 .multi_get_events_by_tx_digests(&transaction_digests)
4116 .await?;
4117
4118 let events_map: HashMap<_, _> =
4119 transaction_digests.iter().zip(events.into_iter()).collect();
4120
4121 let stored_events = event_keys
4122 .into_iter()
4123 .map(|k| {
4124 (
4125 k,
4126 events_map
4127 .get(&k.1)
4128 .expect("fetched digest is missing")
4129 .clone()
4130 .and_then(|e| e.data.get(k.2).cloned()),
4131 )
4132 })
4133 .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4134 event
4135 .map(|e| (e, tx_digest, event_seq, timestamp))
4136 .ok_or(IotaError::TransactionEventsNotFound { digest })
4137 })
4138 .collect::<Result<Vec<_>, _>>()?;
4139
4140 let epoch_store = self.load_epoch_store_one_call_per_task();
4141 let backing_store = self.get_backing_package_store().as_ref();
4142 let mut layout_resolver = epoch_store
4143 .executor()
4144 .type_layout_resolver(Box::new(backing_store));
4145 let mut events = vec![];
4146 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4147 events.push(IotaEvent::try_from(
4148 e.clone(),
4149 tx_digest,
4150 event_seq as u64,
4151 Some(timestamp),
4152 layout_resolver.get_annotated_layout(&e.type_)?,
4153 )?)
4154 }
4155 Ok(events)
4156 }
4157
4158 pub async fn insert_genesis_object(&self, object: Object) {
4159 self.get_reconfig_api()
4160 .try_insert_genesis_object(object)
4161 .expect("Cannot insert genesis object")
4162 }
4163
4164 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4165 futures::future::join_all(
4166 objects
4167 .iter()
4168 .map(|o| self.insert_genesis_object(o.clone())),
4169 )
4170 .await;
4171 }
4172
4173 #[instrument(level = "trace", skip_all)]
4175 pub fn get_transaction_status(
4176 &self,
4177 transaction_digest: &TransactionDigest,
4178 epoch_store: &Arc<AuthorityPerEpochStore>,
4179 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4180 if let Some(effects) =
4182 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4183 {
4184 if let Some(transaction) = self
4185 .get_transaction_cache_reader()
4186 .try_get_transaction_block(transaction_digest)?
4187 {
4188 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4189 let events = if let Some(digest) = effects.events_digest() {
4190 self.get_transaction_events(digest)?
4191 } else {
4192 TransactionEvents::default()
4193 };
4194 return Ok(Some((
4195 (*transaction).clone().into_message(),
4196 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4197 )));
4198 } else {
4199 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4204 }
4205 }
4206 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4207 self.metrics.tx_already_processed.inc();
4208 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4209 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4210 } else {
4211 Ok(None)
4212 }
4213 }
4214
4215 #[instrument(level = "trace", skip_all)]
4219 pub fn get_signed_effects_and_maybe_resign(
4220 &self,
4221 transaction_digest: &TransactionDigest,
4222 epoch_store: &Arc<AuthorityPerEpochStore>,
4223 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4224 let effects = self
4225 .get_transaction_cache_reader()
4226 .try_get_executed_effects(transaction_digest)?;
4227 match effects {
4228 Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4229 None => Ok(None),
4230 }
4231 }
4232
4233 #[instrument(level = "trace", skip_all)]
4234 pub(crate) fn sign_effects(
4235 &self,
4236 effects: TransactionEffects,
4237 epoch_store: &Arc<AuthorityPerEpochStore>,
4238 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4239 let tx_digest = *effects.transaction_digest();
4240 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4241 Some(sig) if sig.epoch == epoch_store.epoch() => {
4242 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4243 }
4244 _ => {
4245 debug!(
4268 ?tx_digest,
4269 epoch=?epoch_store.epoch(),
4270 "Re-signing the effects with the current epoch"
4271 );
4272
4273 let sig = AuthoritySignInfo::new(
4274 epoch_store.epoch(),
4275 &effects,
4276 Intent::iota_app(IntentScope::TransactionEffects),
4277 self.name,
4278 &*self.secret,
4279 );
4280
4281 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4282
4283 epoch_store.insert_effects_digest_and_signature(
4284 &tx_digest,
4285 effects.digest(),
4286 &sig,
4287 )?;
4288
4289 effects
4290 }
4291 };
4292
4293 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4294 signed_effects,
4295 ))
4296 }
4297
4298 #[instrument(level = "trace", skip_all)]
4300 fn fullnode_only_get_tx_coins_for_indexing(
4301 &self,
4302 inner_temporary_store: &InnerTemporaryStore,
4303 epoch_store: &Arc<AuthorityPerEpochStore>,
4304 ) -> Option<TxCoins> {
4305 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4306 return None;
4307 }
4308 let written_coin_objects = inner_temporary_store
4309 .written
4310 .iter()
4311 .filter_map(|(k, v)| {
4312 if v.is_coin() {
4313 Some((*k, v.clone()))
4314 } else {
4315 None
4316 }
4317 })
4318 .collect();
4319 let input_coin_objects = inner_temporary_store
4320 .input_objects
4321 .iter()
4322 .filter_map(|(k, v)| {
4323 if v.is_coin() {
4324 Some((*k, v.clone()))
4325 } else {
4326 None
4327 }
4328 })
4329 .collect::<ObjectMap>();
4330 Some((input_coin_objects, written_coin_objects))
4331 }
4332
4333 #[instrument(level = "trace", skip_all)]
4345 pub async fn get_transaction_lock(
4346 &self,
4347 object_ref: &ObjectRef,
4348 epoch_store: &AuthorityPerEpochStore,
4349 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4350 let lock_info = self
4351 .get_object_cache_reader()
4352 .try_get_lock(*object_ref, epoch_store)?;
4353 let lock_info = match lock_info {
4354 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4355 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4356 provided_obj_ref: *object_ref,
4357 current_version: locked_ref.1,
4358 }
4359 .into());
4360 }
4361 ObjectLockStatus::Initialized => {
4362 return Ok(None);
4363 }
4364 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4365 };
4366
4367 epoch_store.get_signed_transaction(&lock_info)
4368 }
4369
4370 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4371 self.get_object_cache_reader().try_get_objects(objects)
4372 }
4373
4374 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4376 self.try_get_objects(objects)
4377 .await
4378 .expect("storage access failed")
4379 }
4380
4381 pub async fn try_get_object_or_tombstone(
4382 &self,
4383 object_id: ObjectID,
4384 ) -> IotaResult<Option<ObjectRef>> {
4385 self.get_object_cache_reader()
4386 .try_get_latest_object_ref_or_tombstone(object_id)
4387 }
4388
4389 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4391 self.try_get_object_or_tombstone(object_id)
4392 .await
4393 .expect("storage access failed")
4394 }
4395
4396 pub fn set_override_protocol_upgrade_buffer_stake(
4406 &self,
4407 expected_epoch: EpochId,
4408 buffer_stake_bps: u64,
4409 ) -> IotaResult {
4410 let epoch_store = self.load_epoch_store_one_call_per_task();
4411 let actual_epoch = epoch_store.epoch();
4412 if actual_epoch != expected_epoch {
4413 return Err(IotaError::WrongEpoch {
4414 expected_epoch,
4415 actual_epoch,
4416 });
4417 }
4418
4419 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4420 }
4421
4422 pub fn clear_override_protocol_upgrade_buffer_stake(
4423 &self,
4424 expected_epoch: EpochId,
4425 ) -> IotaResult {
4426 let epoch_store = self.load_epoch_store_one_call_per_task();
4427 let actual_epoch = epoch_store.epoch();
4428 if actual_epoch != expected_epoch {
4429 return Err(IotaError::WrongEpoch {
4430 expected_epoch,
4431 actual_epoch,
4432 });
4433 }
4434
4435 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4436 }
4437
4438 pub async fn get_available_system_packages(
4442 &self,
4443 binary_config: &BinaryConfig,
4444 ) -> Vec<ObjectRef> {
4445 let mut results = vec![];
4446
4447 let system_packages = BuiltInFramework::iter_system_packages();
4448
4449 #[cfg(msim)]
4451 let extra_packages = framework_injection::get_extra_packages(self.name);
4452 #[cfg(msim)]
4453 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4454
4455 for system_package in system_packages {
4456 let modules = system_package.modules().to_vec();
4457 #[cfg(msim)]
4459 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4460 .unwrap_or(modules);
4461
4462 let Some(obj_ref) = iota_framework::compare_system_package(
4463 &self.get_object_store(),
4464 &system_package.id,
4465 &modules,
4466 system_package.dependencies.to_vec(),
4467 binary_config,
4468 )
4469 .await
4470 else {
4471 return vec![];
4472 };
4473 results.push(obj_ref);
4474 }
4475
4476 results
4477 }
4478
4479 async fn get_system_package_bytes(
4496 &self,
4497 system_packages: Vec<ObjectRef>,
4498 binary_config: &BinaryConfig,
4499 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4500 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4501 let objects = self.get_objects(&ids).await;
4502
4503 let mut res = Vec::with_capacity(system_packages.len());
4504 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4505 let prev_transaction = match object {
4506 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4507 info!("Framework {} does not need updating", system_package_ref.0);
4509 continue;
4510 }
4511
4512 Some(cur_object) => cur_object.previous_transaction,
4513 None => TransactionDigest::genesis_marker(),
4514 };
4515
4516 #[cfg(msim)]
4517 let SystemPackage {
4518 id: _,
4519 bytes,
4520 dependencies,
4521 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4522 .unwrap_or_else(|| {
4523 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4524 });
4525
4526 #[cfg(not(msim))]
4527 let SystemPackage {
4528 id: _,
4529 bytes,
4530 dependencies,
4531 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4532
4533 let modules: Vec<_> = bytes
4534 .iter()
4535 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4536 .collect();
4537
4538 let new_object = Object::new_system_package(
4539 &modules,
4540 system_package_ref.1,
4541 dependencies.clone(),
4542 prev_transaction,
4543 );
4544
4545 let new_ref = new_object.compute_object_reference();
4546 if new_ref != system_package_ref {
4547 error!(
4548 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4549 );
4550 return None;
4551 }
4552
4553 res.push((system_package_ref.1, bytes, dependencies));
4554 }
4555
4556 Some(res)
4557 }
4558
4559 fn is_protocol_version_supported_v1(
4563 proposed_protocol_version: ProtocolVersion,
4564 committee: &Committee,
4565 capabilities: Vec<AuthorityCapabilitiesV1>,
4566 mut buffer_stake_bps: u64,
4567 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4568 if buffer_stake_bps > 10000 {
4569 warn!("clamping buffer_stake_bps to 10000");
4570 buffer_stake_bps = 10000;
4571 }
4572
4573 let mut desired_upgrades: Vec<_> = capabilities
4576 .into_iter()
4577 .filter_map(|mut cap| {
4578 if cap.available_system_packages.is_empty() {
4580 return None;
4581 }
4582
4583 cap.available_system_packages.sort();
4584
4585 info!(
4586 "validator {:?} supports {:?} with system packages: {:?}",
4587 cap.authority.concise(),
4588 cap.supported_protocol_versions,
4589 cap.available_system_packages,
4590 );
4591
4592 cap.supported_protocol_versions
4596 .get_version_digest(proposed_protocol_version)
4597 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4598 })
4599 .collect();
4600
4601 desired_upgrades.sort();
4604 desired_upgrades
4605 .into_iter()
4606 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4607 .into_iter()
4608 .find_map(|((digest, packages), group)| {
4609 assert!(!packages.is_empty());
4611
4612 let mut stake_aggregator: StakeAggregator<(), true> =
4613 StakeAggregator::new(Arc::new(committee.clone()));
4614
4615 for (_, _, authority) in group {
4616 stake_aggregator.insert_generic(authority, ());
4617 }
4618
4619 let total_votes = stake_aggregator.total_votes();
4620 let quorum_threshold = committee.quorum_threshold();
4621 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4622
4623 info!(
4624 protocol_config_digest = ?digest,
4625 ?total_votes,
4626 ?quorum_threshold,
4627 ?buffer_stake_bps,
4628 ?effective_threshold,
4629 ?proposed_protocol_version,
4630 ?packages,
4631 "support for upgrade"
4632 );
4633
4634 let has_support = total_votes >= effective_threshold;
4635 has_support.then_some((proposed_protocol_version, digest, packages))
4636 })
4637 }
4638
4639 fn choose_protocol_version_and_system_packages_v1(
4643 current_protocol_version: ProtocolVersion,
4644 current_protocol_digest: Digest,
4645 committee: &Committee,
4646 capabilities: Vec<AuthorityCapabilitiesV1>,
4647 buffer_stake_bps: u64,
4648 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4649 let mut next_protocol_version = current_protocol_version;
4650 let mut system_packages = vec![];
4651 let mut protocol_version_digest = current_protocol_digest;
4652
4653 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4657 next_protocol_version + 1,
4658 committee,
4659 capabilities.clone(),
4660 buffer_stake_bps,
4661 ) {
4662 next_protocol_version = version;
4663 protocol_version_digest = digest;
4664 system_packages = packages;
4665 }
4666
4667 (
4668 next_protocol_version,
4669 protocol_version_digest,
4670 system_packages,
4671 )
4672 }
4673
4674 fn get_validators_supporting_protocol_version(
4679 target_protocol_version: ProtocolVersion,
4680 target_digest: Digest,
4681 active_validators: &[AuthorityPublicKey],
4682 capabilities: &[AuthorityCapabilitiesV1],
4683 ) -> Vec<u64> {
4684 let mut eligible_validators = Vec::new();
4685
4686 for capability in capabilities {
4687 if let Some(digest) = capability
4689 .supported_protocol_versions
4690 .get_version_digest(target_protocol_version)
4691 {
4692 if digest == target_digest {
4693 if let Some(index) = active_validators
4695 .iter()
4696 .position(|name| AuthorityName::from(name) == capability.authority)
4697 {
4698 eligible_validators.push(index as u64);
4699 }
4700 }
4701 }
4702 }
4703
4704 eligible_validators.sort();
4706 eligible_validators
4707 }
4708
4709 fn calculate_eligible_validators_weight(
4714 eligible_validator_indices: &[u64],
4715 active_validators: &[AuthorityPublicKey],
4716 committee: &Committee,
4717 ) -> u64 {
4718 let mut total_weight = 0u64;
4719
4720 for &index in eligible_validator_indices {
4721 let authority_pubkey = &active_validators[index as usize];
4722 if let Some((_, weight)) = committee
4724 .members()
4725 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4726 {
4727 total_weight += weight;
4728 }
4729 }
4730
4731 total_weight
4732 }
4733
4734 #[instrument(level = "debug", skip_all)]
4735 fn create_authenticator_state_tx(
4736 &self,
4737 epoch_store: &Arc<AuthorityPerEpochStore>,
4738 ) -> Option<EndOfEpochTransactionKind> {
4739 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4740 info!("authenticator state transactions not enabled");
4741 return None;
4742 }
4743
4744 let authenticator_state_exists = epoch_store.authenticator_state_exists();
4745 let tx = if authenticator_state_exists {
4746 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4747 let min_epoch =
4748 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4749 let authenticator_obj_initial_shared_version = epoch_store
4750 .epoch_start_config()
4751 .authenticator_obj_initial_shared_version()
4752 .expect("initial version must exist");
4753
4754 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4755 min_epoch,
4756 authenticator_obj_initial_shared_version,
4757 );
4758
4759 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4760
4761 tx
4762 } else {
4763 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4764 info!("Creating AuthenticatorStateCreate tx");
4765 tx
4766 };
4767 Some(tx)
4768 }
4769
4770 #[instrument(level = "error", skip_all)]
4783 pub async fn create_and_execute_advance_epoch_tx(
4784 &self,
4785 epoch_store: &Arc<AuthorityPerEpochStore>,
4786 gas_cost_summary: &GasCostSummary,
4787 checkpoint: CheckpointSequenceNumber,
4788 epoch_start_timestamp_ms: CheckpointTimestamp,
4789 ) -> anyhow::Result<(
4790 IotaSystemState,
4791 Option<SystemEpochInfoEvent>,
4792 TransactionEffects,
4793 )> {
4794 let mut txns = Vec::new();
4795
4796 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4797 txns.push(tx);
4798 }
4799
4800 let next_epoch = epoch_store.epoch() + 1;
4801
4802 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4803 let authority_capabilities = epoch_store
4804 .get_capabilities_v1()
4805 .expect("read capabilities from db cannot fail");
4806 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4807 Self::choose_protocol_version_and_system_packages_v1(
4808 epoch_store.protocol_version(),
4809 SupportedProtocolVersionsWithHashes::protocol_config_digest(
4810 epoch_store.protocol_config(),
4811 ),
4812 epoch_store.committee(),
4813 authority_capabilities.clone(),
4814 buffer_stake_bps,
4815 );
4816
4817 let config = epoch_store.protocol_config();
4821 let binary_config = to_binary_config(config);
4822 let Some(next_epoch_system_package_bytes) = self
4823 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4824 .await
4825 else {
4826 error!(
4827 "upgraded system packages {:?} are not locally available, cannot create \
4828 ChangeEpochTx. validator binary must be upgraded to the correct version!",
4829 next_epoch_system_packages
4830 );
4831 bail!("missing system packages: cannot form ChangeEpochTx");
4841 };
4842
4843 if config.select_committee_from_eligible_validators() {
4847 let active_validators = epoch_store.epoch_start_state().get_active_validators();
4849
4850 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
4851
4852 if config.select_committee_supporting_next_epoch_version() {
4856 eligible_active_validators = Self::get_validators_supporting_protocol_version(
4857 next_epoch_protocol_version,
4858 next_epoch_protocol_digest,
4859 &active_validators,
4860 &authority_capabilities,
4861 );
4862
4863 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
4865 &eligible_active_validators,
4866 &active_validators,
4867 epoch_store.committee(),
4868 );
4869
4870 let committee = epoch_store.committee();
4874 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4875
4876 if eligible_validators_weight < effective_threshold {
4877 error!(
4878 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
4879 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
4880 );
4881 eligible_active_validators = (0..active_validators.len() as u64).collect();
4884 }
4885 }
4886
4887 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
4888 next_epoch,
4889 next_epoch_protocol_version,
4890 gas_cost_summary.storage_cost,
4891 gas_cost_summary.computation_cost,
4892 gas_cost_summary.computation_cost_burned,
4893 gas_cost_summary.storage_rebate,
4894 gas_cost_summary.non_refundable_storage_fee,
4895 epoch_start_timestamp_ms,
4896 next_epoch_system_package_bytes,
4897 eligible_active_validators,
4898 ));
4899 } else if config.protocol_defined_base_fee()
4900 && config.max_committee_members_count_as_option().is_some()
4901 {
4902 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4903 next_epoch,
4904 next_epoch_protocol_version,
4905 gas_cost_summary.storage_cost,
4906 gas_cost_summary.computation_cost,
4907 gas_cost_summary.computation_cost_burned,
4908 gas_cost_summary.storage_rebate,
4909 gas_cost_summary.non_refundable_storage_fee,
4910 epoch_start_timestamp_ms,
4911 next_epoch_system_package_bytes,
4912 ));
4913 } else {
4914 txns.push(EndOfEpochTransactionKind::new_change_epoch(
4915 next_epoch,
4916 next_epoch_protocol_version,
4917 gas_cost_summary.storage_cost,
4918 gas_cost_summary.computation_cost,
4919 gas_cost_summary.storage_rebate,
4920 gas_cost_summary.non_refundable_storage_fee,
4921 epoch_start_timestamp_ms,
4922 next_epoch_system_package_bytes,
4923 ));
4924 }
4925
4926 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4927
4928 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4929 tx.clone(),
4930 epoch_store.epoch(),
4931 checkpoint,
4932 );
4933
4934 let tx_digest = executable_tx.digest();
4935
4936 info!(
4937 ?next_epoch,
4938 ?next_epoch_protocol_version,
4939 ?next_epoch_system_packages,
4940 computation_cost=?gas_cost_summary.computation_cost,
4941 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4942 storage_cost=?gas_cost_summary.storage_cost,
4943 storage_rebate=?gas_cost_summary.storage_rebate,
4944 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4945 ?tx_digest,
4946 "Creating advance epoch transaction"
4947 );
4948
4949 fail_point_async!("change_epoch_tx_delay");
4950 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
4951
4952 if self
4956 .get_transaction_cache_reader()
4957 .try_is_tx_already_executed(tx_digest)?
4958 {
4959 warn!("change epoch tx has already been executed via state sync");
4960 bail!("change epoch tx has already been executed via state sync",);
4961 }
4962
4963 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
4964
4965 epoch_store.assign_shared_object_versions_idempotent(
4969 self.get_object_cache_reader().as_ref(),
4970 std::slice::from_ref(&executable_tx),
4971 )?;
4972
4973 let input_objects =
4974 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4975
4976 let (temporary_store, effects, _execution_error_opt) =
4977 self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4978 let system_obj = get_iota_system_state(&temporary_store.written)
4979 .expect("change epoch tx must write to system object");
4980 let system_epoch_info_event = temporary_store
4982 .events
4983 .data
4984 .into_iter()
4985 .find(|event| event.is_system_epoch_info_event())
4986 .map(SystemEpochInfoEvent::from);
4987 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4990
4991 self.get_state_sync_store()
4995 .try_insert_transaction_and_effects(&tx, &effects)
4996 .map_err(|err| {
4997 let err: anyhow::Error = err.into();
4998 err
4999 })?;
5000
5001 info!(
5002 "Effects summary of the change epoch transaction: {:?}",
5003 effects.summary_for_debug()
5004 );
5005 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5006 assert!(effects.status().is_ok());
5008 Ok((system_obj, system_epoch_info_event, effects))
5009 }
5010
5011 #[instrument(level = "error", skip_all)]
5015 async fn revert_uncommitted_epoch_transactions(
5016 &self,
5017 epoch_store: &AuthorityPerEpochStore,
5018 ) -> IotaResult {
5019 {
5020 let state = epoch_store.get_reconfig_state_write_lock_guard();
5021 if state.should_accept_user_certs() {
5022 epoch_store.close_user_certs(state);
5031 }
5032 }
5034 let pending_certificates = epoch_store.pending_consensus_certificates();
5035 info!(
5036 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5037 pending_certificates.len(),
5038 pending_certificates,
5039 );
5040 for digest in pending_certificates {
5041 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5042 info!(
5043 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5044 digest
5045 );
5046 continue;
5047 }
5048 info!("Reverting {:?} at the end of epoch", digest);
5049 epoch_store.revert_executed_transaction(&digest)?;
5050 self.get_reconfig_api().try_revert_state_update(&digest)?;
5051 }
5052 info!("All uncommitted local transactions reverted");
5053 Ok(())
5054 }
5055
5056 #[instrument(level = "error", skip_all)]
5057 async fn reopen_epoch_db(
5058 &self,
5059 cur_epoch_store: &AuthorityPerEpochStore,
5060 new_committee: Committee,
5061 epoch_start_configuration: EpochStartConfiguration,
5062 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5063 epoch_last_checkpoint: CheckpointSequenceNumber,
5064 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5065 let new_epoch = new_committee.epoch;
5066 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5067 assert_eq!(
5068 epoch_start_configuration.epoch_start_state().epoch(),
5069 new_committee.epoch
5070 );
5071 fail_point!("before-open-new-epoch-store");
5072 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5073 self.name,
5074 new_committee,
5075 epoch_start_configuration,
5076 self.get_backing_package_store().clone(),
5077 self.get_object_store().clone(),
5078 expensive_safety_check_config,
5079 epoch_last_checkpoint,
5080 )?;
5081 self.epoch_store.store(new_epoch_store.clone());
5082 Ok(new_epoch_store)
5083 }
5084
5085 #[cfg(test)]
5086 pub(crate) fn iter_live_object_set_for_testing(
5087 &self,
5088 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5089 self.get_accumulator_store()
5090 .iter_cached_live_object_set_for_testing()
5091 }
5092
5093 #[cfg(test)]
5094 pub(crate) fn shutdown_execution_for_test(&self) {
5095 self.tx_execution_shutdown
5096 .lock()
5097 .take()
5098 .unwrap()
5099 .send(())
5100 .unwrap();
5101 }
5102
5103 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5106 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5107 self.get_object_cache_reader()
5108 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5109 self.get_reconfig_api()
5110 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5111 }
5112}
5113
5114pub struct RandomnessRoundReceiver {
5115 authority_state: Arc<AuthorityState>,
5116 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5117}
5118
5119impl RandomnessRoundReceiver {
5120 pub fn spawn(
5121 authority_state: Arc<AuthorityState>,
5122 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5123 ) -> JoinHandle<()> {
5124 let rrr = RandomnessRoundReceiver {
5125 authority_state,
5126 randomness_rx,
5127 };
5128 spawn_monitored_task!(rrr.run())
5129 }
5130
5131 async fn run(mut self) {
5132 info!("RandomnessRoundReceiver event loop started");
5133
5134 loop {
5135 tokio::select! {
5136 maybe_recv = self.randomness_rx.recv() => {
5137 if let Some((epoch, round, bytes)) = maybe_recv {
5138 self.handle_new_randomness(epoch, round, bytes);
5139 } else {
5140 break;
5141 }
5142 },
5143 }
5144 }
5145
5146 info!("RandomnessRoundReceiver event loop ended");
5147 }
5148
5149 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5150 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5151 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5152 if epoch_store.epoch() != epoch {
5153 warn!(
5154 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5155 epoch_store.epoch()
5156 );
5157 return;
5158 }
5159 let transaction = VerifiedTransaction::new_randomness_state_update(
5160 epoch,
5161 round,
5162 bytes,
5163 epoch_store
5164 .epoch_start_config()
5165 .randomness_obj_initial_shared_version(),
5166 );
5167 debug!(
5168 "created randomness state update transaction with digest: {:?}",
5169 transaction.digest()
5170 );
5171 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5172 let digest = *transaction.digest();
5173
5174 self.authority_state
5179 .get_cache_commit()
5180 .persist_transaction(&transaction);
5181
5182 self.authority_state
5184 .transaction_manager()
5185 .enqueue(vec![transaction], &epoch_store);
5186
5187 let authority_state = self.authority_state.clone();
5188 spawn_monitored_task!(async move {
5189 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5198 let result = tokio::time::timeout(
5199 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5200 authority_state
5201 .get_transaction_cache_reader()
5202 .try_notify_read_executed_effects(&[digest]),
5203 )
5204 .await;
5205 let result = match result {
5206 Ok(result) => result,
5207 Err(_) => {
5208 if cfg!(debug_assertions) {
5209 panic!(
5211 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5212 );
5213 }
5214 warn!(
5215 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5216 );
5217 authority_state
5219 .get_transaction_cache_reader()
5220 .try_notify_read_executed_effects(&[digest])
5221 .await
5222 }
5223 };
5224
5225 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5226 let effects = effects.pop().expect("should return effects");
5227 if *effects.status() != ExecutionStatus::Success {
5228 fatal!(
5229 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5230 );
5231 }
5232 debug!(
5233 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5234 );
5235 });
5236 }
5237}
5238
5239#[async_trait]
5240impl TransactionKeyValueStoreTrait for AuthorityState {
5241 async fn multi_get(
5242 &self,
5243 transaction_keys: &[TransactionDigest],
5244 effects_keys: &[TransactionDigest],
5245 ) -> IotaResult<KVStoreTransactionData> {
5246 let txns = if !transaction_keys.is_empty() {
5247 self.get_transaction_cache_reader()
5248 .try_multi_get_transaction_blocks(transaction_keys)?
5249 .into_iter()
5250 .map(|t| t.map(|t| (*t).clone().into_inner()))
5251 .collect()
5252 } else {
5253 vec![]
5254 };
5255
5256 let fx = if !effects_keys.is_empty() {
5257 self.get_transaction_cache_reader()
5258 .try_multi_get_executed_effects(effects_keys)?
5259 } else {
5260 vec![]
5261 };
5262
5263 Ok((txns, fx))
5264 }
5265
5266 async fn multi_get_checkpoints(
5267 &self,
5268 checkpoint_summaries: &[CheckpointSequenceNumber],
5269 checkpoint_contents: &[CheckpointSequenceNumber],
5270 checkpoint_summaries_by_digest: &[CheckpointDigest],
5271 ) -> IotaResult<(
5272 Vec<Option<CertifiedCheckpointSummary>>,
5273 Vec<Option<CheckpointContents>>,
5274 Vec<Option<CertifiedCheckpointSummary>>,
5275 )> {
5276 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5278 let store = self.get_checkpoint_store();
5279 for seq in checkpoint_summaries {
5280 let checkpoint = store
5281 .get_checkpoint_by_sequence_number(*seq)?
5282 .map(|c| c.into_inner());
5283
5284 summaries.push(checkpoint);
5285 }
5286
5287 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5288 for seq in checkpoint_contents {
5289 let checkpoint = store
5290 .get_checkpoint_by_sequence_number(*seq)?
5291 .and_then(|summary| {
5292 store
5293 .get_checkpoint_contents(&summary.content_digest)
5294 .expect("db read cannot fail")
5295 });
5296 contents.push(checkpoint);
5297 }
5298
5299 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5300 for digest in checkpoint_summaries_by_digest {
5301 let checkpoint = store
5302 .get_checkpoint_by_digest(digest)?
5303 .map(|c| c.into_inner());
5304 summaries_by_digest.push(checkpoint);
5305 }
5306
5307 Ok((summaries, contents, summaries_by_digest))
5308 }
5309
5310 async fn get_transaction_perpetual_checkpoint(
5311 &self,
5312 digest: TransactionDigest,
5313 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5314 self.get_checkpoint_cache()
5315 .try_get_transaction_perpetual_checkpoint(&digest)
5316 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5317 }
5318
5319 async fn get_object(
5320 &self,
5321 object_id: ObjectID,
5322 version: VersionNumber,
5323 ) -> IotaResult<Option<Object>> {
5324 self.get_object_cache_reader()
5325 .try_get_object_by_key(&object_id, version)
5326 }
5327
5328 async fn multi_get_transactions_perpetual_checkpoints(
5329 &self,
5330 digests: &[TransactionDigest],
5331 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5332 let res = self
5333 .get_checkpoint_cache()
5334 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5335
5336 Ok(res
5337 .into_iter()
5338 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5339 .collect())
5340 }
5341
5342 #[instrument(skip(self))]
5343 async fn multi_get_events_by_tx_digests(
5344 &self,
5345 digests: &[TransactionDigest],
5346 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5347 if digests.is_empty() {
5348 return Ok(vec![]);
5349 }
5350 let events_digests: Vec<_> = self
5351 .get_transaction_cache_reader()
5352 .try_multi_get_executed_effects(digests)?
5353 .into_iter()
5354 .map(|t| t.and_then(|t| t.events_digest().cloned()))
5355 .collect();
5356 let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5357 let mut events = self
5358 .get_transaction_cache_reader()
5359 .try_multi_get_events(&non_empty_events)?
5360 .into_iter();
5361 Ok(events_digests
5362 .into_iter()
5363 .map(|ev| ev.and_then(|_| events.next()?))
5364 .collect())
5365 }
5366}
5367
5368#[cfg(msim)]
5369pub mod framework_injection {
5370 use std::{
5371 cell::RefCell,
5372 collections::{BTreeMap, BTreeSet},
5373 };
5374
5375 use iota_framework::{BuiltInFramework, SystemPackage};
5376 use iota_types::{
5377 base_types::{AuthorityName, ObjectID},
5378 is_system_package,
5379 };
5380 use move_binary_format::CompiledModule;
5381
5382 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5383
5384 thread_local! {
5386 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5387 }
5388
5389 type Framework = Vec<CompiledModule>;
5390
5391 pub type PackageUpgradeCallback =
5392 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5393
5394 enum PackageOverrideConfig {
5395 Global(Framework),
5396 PerValidator(PackageUpgradeCallback),
5397 }
5398
5399 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5400 modules
5401 .iter()
5402 .map(|m| {
5403 let mut buf = Vec::new();
5404 m.serialize_with_version(m.version, &mut buf).unwrap();
5405 buf
5406 })
5407 .collect()
5408 }
5409
5410 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5411 OVERRIDE.with(|bs| {
5412 bs.borrow_mut()
5413 .insert(package_id, PackageOverrideConfig::Global(modules))
5414 });
5415 }
5416
5417 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5418 OVERRIDE.with(|bs| {
5419 bs.borrow_mut()
5420 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5421 });
5422 }
5423
5424 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5425 OVERRIDE.with(|cfg| {
5426 cfg.borrow().get(package_id).and_then(|entry| match entry {
5427 PackageOverrideConfig::Global(framework) => {
5428 Some(compiled_modules_to_bytes(framework))
5429 }
5430 PackageOverrideConfig::PerValidator(func) => {
5431 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5432 }
5433 })
5434 })
5435 }
5436
5437 pub fn get_override_modules(
5438 package_id: &ObjectID,
5439 name: AuthorityName,
5440 ) -> Option<Vec<CompiledModule>> {
5441 OVERRIDE.with(|cfg| {
5442 cfg.borrow().get(package_id).and_then(|entry| match entry {
5443 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5444 PackageOverrideConfig::PerValidator(func) => func(name),
5445 })
5446 })
5447 }
5448
5449 pub fn get_override_system_package(
5450 package_id: &ObjectID,
5451 name: AuthorityName,
5452 ) -> Option<SystemPackage> {
5453 let bytes = get_override_bytes(package_id, name)?;
5454 let dependencies = if is_system_package(*package_id) {
5455 BuiltInFramework::get_package_by_id(package_id)
5456 .dependencies
5457 .to_vec()
5458 } else {
5459 BuiltInFramework::all_package_ids()
5462 };
5463 Some(SystemPackage {
5464 id: *package_id,
5465 bytes,
5466 dependencies,
5467 })
5468 }
5469
5470 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5471 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5472 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5473 cfg.borrow()
5474 .keys()
5475 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5476 .collect()
5477 });
5478
5479 extra
5480 .into_iter()
5481 .map(|package| SystemPackage {
5482 id: package,
5483 bytes: get_override_bytes(&package, name).unwrap(),
5484 dependencies: BuiltInFramework::all_package_ids(),
5485 })
5486 .collect()
5487 }
5488}
5489
5490#[derive(Debug, Serialize, Deserialize, Clone)]
5491pub struct ObjDumpFormat {
5492 pub id: ObjectID,
5493 pub version: VersionNumber,
5494 pub digest: ObjectDigest,
5495 pub object: Object,
5496}
5497
5498impl ObjDumpFormat {
5499 fn new(object: Object) -> Self {
5500 let oref = object.compute_object_reference();
5501 Self {
5502 id: oref.0,
5503 version: oref.1,
5504 digest: oref.2,
5505 object,
5506 }
5507 }
5508}
5509
5510#[derive(Debug, Serialize, Deserialize, Clone)]
5511pub struct NodeStateDump {
5512 pub tx_digest: TransactionDigest,
5513 pub sender_signed_data: SenderSignedData,
5514 pub executed_epoch: u64,
5515 pub reference_gas_price: u64,
5516 pub protocol_version: u64,
5517 pub epoch_start_timestamp_ms: u64,
5518 pub computed_effects: TransactionEffects,
5519 pub expected_effects_digest: TransactionEffectsDigest,
5520 pub relevant_system_packages: Vec<ObjDumpFormat>,
5521 pub shared_objects: Vec<ObjDumpFormat>,
5522 pub loaded_child_objects: Vec<ObjDumpFormat>,
5523 pub modified_at_versions: Vec<ObjDumpFormat>,
5524 pub runtime_reads: Vec<ObjDumpFormat>,
5525 pub input_objects: Vec<ObjDumpFormat>,
5526}
5527
5528impl NodeStateDump {
5529 pub fn new(
5530 tx_digest: &TransactionDigest,
5531 effects: &TransactionEffects,
5532 expected_effects_digest: TransactionEffectsDigest,
5533 object_store: &dyn ObjectStore,
5534 epoch_store: &Arc<AuthorityPerEpochStore>,
5535 inner_temporary_store: &InnerTemporaryStore,
5536 certificate: &VerifiedExecutableTransaction,
5537 ) -> IotaResult<Self> {
5538 let executed_epoch = epoch_store.epoch();
5540 let reference_gas_price = epoch_store.reference_gas_price();
5541 let epoch_start_config = epoch_store.epoch_start_config();
5542 let protocol_version = epoch_store.protocol_version().as_u64();
5543 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5544
5545 let mut relevant_system_packages = Vec::new();
5547 for sys_package_id in BuiltInFramework::all_package_ids() {
5548 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5549 relevant_system_packages.push(ObjDumpFormat::new(w))
5550 }
5551 }
5552
5553 let mut shared_objects = Vec::new();
5555 for kind in effects.input_shared_objects() {
5556 match kind {
5557 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5558 if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5559 shared_objects.push(ObjDumpFormat::new(w))
5560 }
5561 }
5562 InputSharedObject::ReadDeleted(..)
5563 | InputSharedObject::MutateDeleted(..)
5564 | InputSharedObject::Cancelled(..) => (), }
5567 }
5568
5569 let mut loaded_child_objects = Vec::new();
5572 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5573 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5574 loaded_child_objects.push(ObjDumpFormat::new(w))
5575 }
5576 }
5577
5578 let mut modified_at_versions = Vec::new();
5580 for (id, ver) in effects.modified_at_versions() {
5581 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5582 modified_at_versions.push(ObjDumpFormat::new(w))
5583 }
5584 }
5585
5586 let mut runtime_reads = Vec::new();
5590 for obj in inner_temporary_store
5591 .runtime_packages_loaded_from_db
5592 .values()
5593 {
5594 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5595 }
5596
5597 Ok(Self {
5600 tx_digest: *tx_digest,
5601 executed_epoch,
5602 reference_gas_price,
5603 epoch_start_timestamp_ms,
5604 protocol_version,
5605 relevant_system_packages,
5606 shared_objects,
5607 loaded_child_objects,
5608 modified_at_versions,
5609 runtime_reads,
5610 sender_signed_data: certificate.clone().into_message(),
5611 input_objects: inner_temporary_store
5612 .input_objects
5613 .values()
5614 .map(|o| ObjDumpFormat::new(o.clone()))
5615 .collect(),
5616 computed_effects: effects.clone(),
5617 expected_effects_digest,
5618 })
5619 }
5620
5621 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5622 let mut objects = Vec::new();
5623 objects.extend(self.relevant_system_packages.clone());
5624 objects.extend(self.shared_objects.clone());
5625 objects.extend(self.loaded_child_objects.clone());
5626 objects.extend(self.modified_at_versions.clone());
5627 objects.extend(self.runtime_reads.clone());
5628 objects.extend(self.input_objects.clone());
5629 objects
5630 }
5631
5632 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5633 let file_name = format!(
5634 "{}_{}_NODE_DUMP.json",
5635 self.tx_digest,
5636 AuthorityState::unixtime_now_ms()
5637 );
5638 let mut path = path.to_path_buf();
5639 path.push(&file_name);
5640 let mut file = File::create(path.clone())?;
5641 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5642 Ok(path)
5643 }
5644
5645 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5646 let file = File::open(path)?;
5647 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5648 }
5649}