1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49 key_value_store::{
50 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51 },
52 key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57 IOTA_SYSTEM_ADDRESS, TypeTag,
58 authenticator_state::get_authenticator_state,
59 base_types::*,
60 committee::{Committee, EpochId, ProtocolVersion},
61 crypto::{
62 AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
63 default_hash,
64 },
65 deny_list_v1::check_coin_deny_list_v1_during_signing,
66 digests::{ChainIdentifier, Digest, TransactionEventsDigest},
67 dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
68 effects::{
69 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
70 TransactionEvents, VerifiedSignedTransactionEffects,
71 },
72 error::{ExecutionError, IotaError, IotaResult, UserInputError},
73 event::{Event, EventID, SystemEpochInfoEvent},
74 executable_transaction::VerifiedExecutableTransaction,
75 execution_config_utils::to_binary_config,
76 execution_status::ExecutionStatus,
77 gas::{GasCostSummary, IotaGasStatus},
78 gas_coin::NANOS_PER_IOTA,
79 inner_temporary_store::{
80 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
81 WrittenObjects,
82 },
83 iota_system_state::{
84 IotaSystemState, IotaSystemStateTrait,
85 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
86 },
87 is_system_package,
88 layout_resolver::{LayoutResolver, into_struct_layout},
89 message_envelope::Message,
90 messages_checkpoint::{
91 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
92 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
93 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
94 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
95 },
96 messages_consensus::AuthorityCapabilitiesV1,
97 messages_grpc::{
98 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
99 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
100 TransactionStatus,
101 },
102 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
103 object::{
104 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
105 bounded_visitor::BoundedVisitor,
106 },
107 storage::{
108 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
109 },
110 supported_protocol_versions::{
111 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
112 },
113 transaction::*,
114 transaction_executor::{SimulateTransactionResult, VmChecks},
115};
116use itertools::Itertools;
117use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
118use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
119use parking_lot::Mutex;
120use prometheus::{
121 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
122 register_histogram_vec_with_registry, register_histogram_with_registry,
123 register_int_counter_vec_with_registry, register_int_counter_with_registry,
124 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
125};
126use serde::{Deserialize, Serialize, de::DeserializeOwned};
127use tap::TapFallible;
128use tokio::{
129 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
130 task::JoinHandle,
131};
132use tracing::{debug, error, info, instrument, trace, warn};
133use typed_store::TypedStoreError;
134
135use self::{
136 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
137};
138#[cfg(msim)]
139pub use crate::checkpoints::checkpoint_executor::utils::{
140 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
141};
142use crate::{
143 authority::{
144 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
145 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
146 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
147 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
148 authority_store_tables::AuthorityPrunerTables,
149 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
150 },
151 authority_client::NetworkAuthorityClient,
152 checkpoints::CheckpointStore,
153 congestion_tracker::CongestionTracker,
154 consensus_adapter::ConsensusAdapter,
155 epoch::committee_store::CommitteeStore,
156 execution_cache::{
157 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
158 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
159 TransactionCacheRead,
160 },
161 execution_driver::execution_process,
162 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
163 metrics::{LatencyObserver, RateTracker},
164 module_cache_metrics::ResolverMetrics,
165 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
166 rest_index::RestIndexStore,
167 stake_aggregator::StakeAggregator,
168 state_accumulator::{AccumulatorStore, StateAccumulator},
169 subscription_handler::SubscriptionHandler,
170 transaction_input_loader::TransactionInputLoader,
171 transaction_manager::TransactionManager,
172 transaction_outputs::TransactionOutputs,
173 validator_tx_finalizer::ValidatorTxFinalizer,
174 verify_indexes::verify_indexes,
175};
176
177#[cfg(test)]
178#[path = "unit_tests/authority_tests.rs"]
179pub mod authority_tests;
180
181#[cfg(test)]
182#[path = "unit_tests/transaction_tests.rs"]
183pub mod transaction_tests;
184
185#[cfg(test)]
186#[path = "unit_tests/batch_transaction_tests.rs"]
187mod batch_transaction_tests;
188
189#[cfg(test)]
190#[path = "unit_tests/move_integration_tests.rs"]
191pub mod move_integration_tests;
192
193#[cfg(test)]
194#[path = "unit_tests/gas_tests.rs"]
195mod gas_tests;
196
197#[cfg(test)]
198#[path = "unit_tests/batch_verification_tests.rs"]
199mod batch_verification_tests;
200
201#[cfg(test)]
202#[path = "unit_tests/coin_deny_list_tests.rs"]
203mod coin_deny_list_tests;
204
205#[cfg(test)]
206#[path = "unit_tests/auth_unit_test_utils.rs"]
207pub mod auth_unit_test_utils;
208
209pub mod authority_test_utils;
210
211pub mod authority_per_epoch_store;
212pub mod authority_per_epoch_store_pruner;
213
214pub mod authority_store_pruner;
215pub mod authority_store_tables;
216pub mod authority_store_types;
217pub mod epoch_start_configuration;
218pub mod shared_object_congestion_tracker;
219pub mod shared_object_version_manager;
220pub mod suggested_gas_price_calculator;
221pub mod test_authority_builder;
222pub mod transaction_deferral;
223
224pub(crate) mod authority_store;
225pub mod backpressure;
226
227pub struct AuthorityMetrics {
229 tx_orders: IntCounter,
230 total_certs: IntCounter,
231 total_cert_attempts: IntCounter,
232 total_effects: IntCounter,
233 pub shared_obj_tx: IntCounter,
234 sponsored_tx: IntCounter,
235 tx_already_processed: IntCounter,
236 num_input_objs: Histogram,
237 num_shared_objects: Histogram,
238 batch_size: Histogram,
239
240 authority_state_handle_transaction_latency: Histogram,
241
242 execute_certificate_latency_single_writer: Histogram,
243 execute_certificate_latency_shared_object: Histogram,
244
245 internal_execution_latency: Histogram,
246 execution_load_input_objects_latency: Histogram,
247 prepare_certificate_latency: Histogram,
248 commit_certificate_latency: Histogram,
249 db_checkpoint_latency: Histogram,
250
251 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
252 pub(crate) transaction_manager_num_missing_objects: IntGauge,
253 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
254 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
255 pub(crate) transaction_manager_num_ready: IntGauge,
256 pub(crate) transaction_manager_object_cache_size: IntGauge,
257 pub(crate) transaction_manager_object_cache_hits: IntCounter,
258 pub(crate) transaction_manager_object_cache_misses: IntCounter,
259 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
260 pub(crate) transaction_manager_package_cache_size: IntGauge,
261 pub(crate) transaction_manager_package_cache_hits: IntCounter,
262 pub(crate) transaction_manager_package_cache_misses: IntCounter,
263 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
264 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
265
266 pub(crate) execution_driver_executed_transactions: IntCounter,
267 pub(crate) execution_driver_dispatch_queue: IntGauge,
268 pub(crate) execution_queueing_delay_s: Histogram,
269 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
270 pub(crate) execution_gas_latency_ratio: Histogram,
271
272 pub(crate) skipped_consensus_txns: IntCounter,
273 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
274
275 pub(crate) authority_overload_status: IntGauge,
276 pub(crate) authority_load_shedding_percentage: IntGauge,
277
278 pub(crate) transaction_overload_sources: IntCounterVec,
279
280 post_processing_total_events_emitted: IntCounter,
282 post_processing_total_tx_indexed: IntCounter,
283 post_processing_total_tx_had_event_processed: IntCounter,
284 post_processing_total_failures: IntCounter,
285
286 pub consensus_handler_processed: IntCounterVec,
288 pub consensus_handler_transaction_sizes: HistogramVec,
289 pub consensus_handler_num_low_scoring_authorities: IntGauge,
290 pub consensus_handler_scores: IntGaugeVec,
291 pub consensus_handler_deferred_transactions: IntCounter,
292 pub consensus_handler_congested_transactions: IntCounter,
293 pub consensus_handler_cancelled_transactions: IntCounter,
294 pub consensus_handler_max_object_costs: IntGaugeVec,
295 pub consensus_committed_subdags: IntCounterVec,
296 pub consensus_committed_messages: IntGaugeVec,
297 pub consensus_committed_user_transactions: IntGaugeVec,
298 pub consensus_handler_leader_round: IntGauge,
299 pub consensus_calculated_throughput: IntGauge,
300 pub consensus_calculated_throughput_profile: IntGauge,
301
302 pub limits_metrics: Arc<LimitsMetrics>,
303
304 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
306
307 pub zklogin_sig_count: IntCounter,
309 pub multisig_sig_count: IntCounter,
311
312 pub execution_queueing_latency: LatencyObserver,
315
316 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
323
324 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
327}
328
329const POSITIVE_INT_BUCKETS: &[f64] = &[
331 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
332 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
333 7000000., 10000000.,
334];
335
336const LATENCY_SEC_BUCKETS: &[f64] = &[
337 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
338 10., 20., 30., 60., 90.,
339];
340
341const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
343 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
344 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
345];
346
347const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
348 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
349 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
350];
351
352pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
356 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
357 let execute_certificate_latency = register_histogram_vec_with_registry!(
358 "authority_state_execute_certificate_latency",
359 "Latency of executing certificates, including waiting for inputs",
360 &["tx_type"],
361 LATENCY_SEC_BUCKETS.to_vec(),
362 registry,
363 )
364 .unwrap();
365
366 let execute_certificate_latency_single_writer =
367 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
368 let execute_certificate_latency_shared_object =
369 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
370
371 Self {
372 tx_orders: register_int_counter_with_registry!(
373 "total_transaction_orders",
374 "Total number of transaction orders",
375 registry,
376 )
377 .unwrap(),
378 total_certs: register_int_counter_with_registry!(
379 "total_transaction_certificates",
380 "Total number of transaction certificates handled",
381 registry,
382 )
383 .unwrap(),
384 total_cert_attempts: register_int_counter_with_registry!(
385 "total_handle_certificate_attempts",
386 "Number of calls to handle_certificate",
387 registry,
388 )
389 .unwrap(),
390 total_effects: register_int_counter_with_registry!(
392 "total_transaction_effects",
393 "Total number of transaction effects produced",
394 registry,
395 )
396 .unwrap(),
397
398 shared_obj_tx: register_int_counter_with_registry!(
399 "num_shared_obj_tx",
400 "Number of transactions involving shared objects",
401 registry,
402 )
403 .unwrap(),
404
405 sponsored_tx: register_int_counter_with_registry!(
406 "num_sponsored_tx",
407 "Number of sponsored transactions",
408 registry,
409 )
410 .unwrap(),
411
412 tx_already_processed: register_int_counter_with_registry!(
413 "num_tx_already_processed",
414 "Number of transaction orders already processed previously",
415 registry,
416 )
417 .unwrap(),
418 num_input_objs: register_histogram_with_registry!(
419 "num_input_objects",
420 "Distribution of number of input TX objects per TX",
421 POSITIVE_INT_BUCKETS.to_vec(),
422 registry,
423 )
424 .unwrap(),
425 num_shared_objects: register_histogram_with_registry!(
426 "num_shared_objects",
427 "Number of shared input objects per TX",
428 POSITIVE_INT_BUCKETS.to_vec(),
429 registry,
430 )
431 .unwrap(),
432 batch_size: register_histogram_with_registry!(
433 "batch_size",
434 "Distribution of size of transaction batch",
435 POSITIVE_INT_BUCKETS.to_vec(),
436 registry,
437 )
438 .unwrap(),
439 authority_state_handle_transaction_latency: register_histogram_with_registry!(
440 "authority_state_handle_transaction_latency",
441 "Latency of handling transactions",
442 LATENCY_SEC_BUCKETS.to_vec(),
443 registry,
444 )
445 .unwrap(),
446 execute_certificate_latency_single_writer,
447 execute_certificate_latency_shared_object,
448 internal_execution_latency: register_histogram_with_registry!(
449 "authority_state_internal_execution_latency",
450 "Latency of actual certificate executions",
451 LATENCY_SEC_BUCKETS.to_vec(),
452 registry,
453 )
454 .unwrap(),
455 execution_load_input_objects_latency: register_histogram_with_registry!(
456 "authority_state_execution_load_input_objects_latency",
457 "Latency of loading input objects for execution",
458 LOW_LATENCY_SEC_BUCKETS.to_vec(),
459 registry,
460 )
461 .unwrap(),
462 prepare_certificate_latency: register_histogram_with_registry!(
463 "authority_state_prepare_certificate_latency",
464 "Latency of executing certificates, before committing the results",
465 LATENCY_SEC_BUCKETS.to_vec(),
466 registry,
467 )
468 .unwrap(),
469 commit_certificate_latency: register_histogram_with_registry!(
470 "authority_state_commit_certificate_latency",
471 "Latency of committing certificate execution results",
472 LATENCY_SEC_BUCKETS.to_vec(),
473 registry,
474 )
475 .unwrap(),
476 db_checkpoint_latency: register_histogram_with_registry!(
477 "db_checkpoint_latency",
478 "Latency of checkpointing dbs",
479 LATENCY_SEC_BUCKETS.to_vec(),
480 registry,
481 ).unwrap(),
482 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
483 "transaction_manager_num_enqueued_certificates",
484 "Current number of certificates enqueued to TransactionManager",
485 &["result"],
486 registry,
487 )
488 .unwrap(),
489 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
490 "transaction_manager_num_missing_objects",
491 "Current number of missing objects in TransactionManager",
492 registry,
493 )
494 .unwrap(),
495 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
496 "transaction_manager_num_pending_certificates",
497 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
498 registry,
499 )
500 .unwrap(),
501 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
502 "transaction_manager_num_executing_certificates",
503 "Number of executing certificates, including queued and actually running certificates",
504 registry,
505 )
506 .unwrap(),
507 transaction_manager_num_ready: register_int_gauge_with_registry!(
508 "transaction_manager_num_ready",
509 "Number of ready transactions in TransactionManager",
510 registry,
511 )
512 .unwrap(),
513 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
514 "transaction_manager_object_cache_size",
515 "Current size of object-availability cache in TransactionManager",
516 registry,
517 )
518 .unwrap(),
519 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
520 "transaction_manager_object_cache_hits",
521 "Number of object-availability cache hits in TransactionManager",
522 registry,
523 )
524 .unwrap(),
525 authority_overload_status: register_int_gauge_with_registry!(
526 "authority_overload_status",
527 "Whether authority is current experiencing overload and enters load shedding mode.",
528 registry)
529 .unwrap(),
530 authority_load_shedding_percentage: register_int_gauge_with_registry!(
531 "authority_load_shedding_percentage",
532 "The percentage of transactions is shed when the authority is in load shedding mode.",
533 registry)
534 .unwrap(),
535 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
536 "transaction_manager_object_cache_misses",
537 "Number of object-availability cache misses in TransactionManager",
538 registry,
539 )
540 .unwrap(),
541 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
542 "transaction_manager_object_cache_evictions",
543 "Number of object-availability cache evictions in TransactionManager",
544 registry,
545 )
546 .unwrap(),
547 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
548 "transaction_manager_package_cache_size",
549 "Current size of package-availability cache in TransactionManager",
550 registry,
551 )
552 .unwrap(),
553 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
554 "transaction_manager_package_cache_hits",
555 "Number of package-availability cache hits in TransactionManager",
556 registry,
557 )
558 .unwrap(),
559 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
560 "transaction_manager_package_cache_misses",
561 "Number of package-availability cache misses in TransactionManager",
562 registry,
563 )
564 .unwrap(),
565 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
566 "transaction_manager_package_cache_evictions",
567 "Number of package-availability cache evictions in TransactionManager",
568 registry,
569 )
570 .unwrap(),
571 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
572 "transaction_manager_transaction_queue_age_s",
573 "Time spent in waiting for transaction in the queue",
574 LATENCY_SEC_BUCKETS.to_vec(),
575 registry,
576 )
577 .unwrap(),
578 transaction_overload_sources: register_int_counter_vec_with_registry!(
579 "transaction_overload_sources",
580 "Number of times each source indicates transaction overload.",
581 &["source"],
582 registry)
583 .unwrap(),
584 execution_driver_executed_transactions: register_int_counter_with_registry!(
585 "execution_driver_executed_transactions",
586 "Cumulative number of transaction executed by execution driver",
587 registry,
588 )
589 .unwrap(),
590 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
591 "execution_driver_dispatch_queue",
592 "Number of transaction pending in execution driver dispatch queue",
593 registry,
594 )
595 .unwrap(),
596 execution_queueing_delay_s: register_histogram_with_registry!(
597 "execution_queueing_delay_s",
598 "Queueing delay between a transaction is ready for execution until it starts executing.",
599 LATENCY_SEC_BUCKETS.to_vec(),
600 registry
601 )
602 .unwrap(),
603 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
604 "prepare_cert_gas_latency_ratio",
605 "The ratio of computation gas divided by VM execution latency.",
606 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
607 registry
608 )
609 .unwrap(),
610 execution_gas_latency_ratio: register_histogram_with_registry!(
611 "execution_gas_latency_ratio",
612 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
613 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
614 registry
615 )
616 .unwrap(),
617 skipped_consensus_txns: register_int_counter_with_registry!(
618 "skipped_consensus_txns",
619 "Total number of consensus transactions skipped",
620 registry,
621 )
622 .unwrap(),
623 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
624 "skipped_consensus_txns_cache_hit",
625 "Total number of consensus transactions skipped because of local cache hit",
626 registry,
627 )
628 .unwrap(),
629 post_processing_total_events_emitted: register_int_counter_with_registry!(
630 "post_processing_total_events_emitted",
631 "Total number of events emitted in post processing",
632 registry,
633 )
634 .unwrap(),
635 post_processing_total_tx_indexed: register_int_counter_with_registry!(
636 "post_processing_total_tx_indexed",
637 "Total number of txes indexed in post processing",
638 registry,
639 )
640 .unwrap(),
641 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
642 "post_processing_total_tx_had_event_processed",
643 "Total number of txes finished event processing in post processing",
644 registry,
645 )
646 .unwrap(),
647 post_processing_total_failures: register_int_counter_with_registry!(
648 "post_processing_total_failures",
649 "Total number of failure in post processing",
650 registry,
651 )
652 .unwrap(),
653 consensus_handler_processed: register_int_counter_vec_with_registry!(
654 "consensus_handler_processed",
655 "Number of transactions processed by consensus handler",
656 &["class"],
657 registry
658 ).unwrap(),
659 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
660 "consensus_handler_transaction_sizes",
661 "Sizes of each type of transactions processed by consensus handler",
662 &["class"],
663 POSITIVE_INT_BUCKETS.to_vec(),
664 registry
665 ).unwrap(),
666 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
667 "consensus_handler_num_low_scoring_authorities",
668 "Number of low scoring authorities based on reputation scores from consensus",
669 registry
670 ).unwrap(),
671 consensus_handler_scores: register_int_gauge_vec_with_registry!(
672 "consensus_handler_scores",
673 "scores from consensus for each authority",
674 &["authority"],
675 registry,
676 ).unwrap(),
677 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
678 "consensus_handler_deferred_transactions",
679 "Number of transactions deferred by consensus handler",
680 registry,
681 ).unwrap(),
682 consensus_handler_congested_transactions: register_int_counter_with_registry!(
683 "consensus_handler_congested_transactions",
684 "Number of transactions deferred by consensus handler due to congestion",
685 registry,
686 ).unwrap(),
687 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
688 "consensus_handler_cancelled_transactions",
689 "Number of transactions cancelled by consensus handler",
690 registry,
691 ).unwrap(),
692 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
693 "consensus_handler_max_congestion_control_object_costs",
694 "Max object costs for congestion control in the current consensus commit",
695 &["commit_type"],
696 registry,
697 ).unwrap(),
698 consensus_committed_subdags: register_int_counter_vec_with_registry!(
699 "consensus_committed_subdags",
700 "Number of committed subdags, sliced by leader",
701 &["authority"],
702 registry,
703 ).unwrap(),
704 consensus_committed_messages: register_int_gauge_vec_with_registry!(
705 "consensus_committed_messages",
706 "Total number of committed consensus messages, sliced by author",
707 &["authority"],
708 registry,
709 ).unwrap(),
710 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
711 "consensus_committed_user_transactions",
712 "Number of committed user transactions, sliced by submitter",
713 &["authority"],
714 registry,
715 ).unwrap(),
716 consensus_handler_leader_round: register_int_gauge_with_registry!(
717 "consensus_handler_leader_round",
718 "The leader round of the current consensus output being processed in the consensus handler",
719 registry,
720 ).unwrap(),
721 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
722 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
723 zklogin_sig_count: register_int_counter_with_registry!(
724 "zklogin_sig_count",
725 "Count of zkLogin signatures",
726 registry,
727 )
728 .unwrap(),
729 multisig_sig_count: register_int_counter_with_registry!(
730 "multisig_sig_count",
731 "Count of zkLogin signatures",
732 registry,
733 )
734 .unwrap(),
735 consensus_calculated_throughput: register_int_gauge_with_registry!(
736 "consensus_calculated_throughput",
737 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
738 registry,
739 ).unwrap(),
740 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
741 "consensus_calculated_throughput_profile",
742 "The current active calculated throughput profile",
743 registry
744 ).unwrap(),
745 execution_queueing_latency: LatencyObserver::new(),
746 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
747 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
748 }
749 }
750
751 pub fn reset_on_reconfigure(&self) {
755 self.consensus_committed_messages.reset();
756 self.consensus_handler_scores.reset();
757 self.consensus_committed_user_transactions.reset();
758 }
759}
760
761pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
768
769pub struct AuthorityState {
770 pub name: AuthorityName,
773 pub secret: StableSyncAuthoritySigner,
775
776 input_loader: TransactionInputLoader,
778 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
779
780 epoch_store: ArcSwap<AuthorityPerEpochStore>,
781
782 execution_lock: RwLock<EpochId>,
788
789 pub indexes: Option<Arc<IndexStore>>,
790 pub rest_index: Option<Arc<RestIndexStore>>,
791
792 pub subscription_handler: Arc<SubscriptionHandler>,
793 pub checkpoint_store: Arc<CheckpointStore>,
794
795 committee_store: Arc<CommitteeStore>,
796
797 transaction_manager: Arc<TransactionManager>,
799
800 #[cfg_attr(not(test), expect(unused))]
802 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
803
804 pub metrics: Arc<AuthorityMetrics>,
805 _pruner: AuthorityStorePruner,
806 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
807
808 db_checkpoint_config: DBCheckpointConfig,
810
811 pub config: NodeConfig,
812
813 pub overload_info: AuthorityOverloadInfo,
815
816 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
817
818 chain_identifier: ChainIdentifier,
821
822 pub(crate) congestion_tracker: Arc<CongestionTracker>,
823}
824
825impl AuthorityState {
834 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
835 epoch_store.committee().authority_exists(&self.name)
836 }
837
838 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
839 epoch_store
840 .active_validators()
841 .iter()
842 .any(|a| AuthorityName::from(a) == self.name)
843 }
844
845 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
846 !self.is_committee_validator(epoch_store)
847 }
848
849 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
850 &self.committee_store
851 }
852
853 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
854 self.committee_store.clone()
855 }
856
857 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
858 &self.config.authority_overload_config
859 }
860
861 pub fn get_epoch_state_commitments(
862 &self,
863 epoch: EpochId,
864 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
865 self.checkpoint_store.get_epoch_state_commitments(epoch)
866 }
867
868 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
872 async fn handle_transaction_impl(
873 &self,
874 transaction: VerifiedTransaction,
875 epoch_store: &Arc<AuthorityPerEpochStore>,
876 ) -> IotaResult<VerifiedSignedTransaction> {
877 let _execution_lock = self.execution_lock_for_signing();
879
880 let tx_digest = transaction.digest();
881 let tx_data = transaction.data().transaction_data();
882
883 let input_object_kinds = tx_data.input_objects()?;
884 let receiving_objects_refs = tx_data.receiving_objects();
885
886 iota_transaction_checks::deny::check_transaction_for_signing(
890 tx_data,
891 transaction.tx_signatures(),
892 &input_object_kinds,
893 &receiving_objects_refs,
894 &self.config.transaction_deny_config,
895 self.get_backing_package_store().as_ref(),
896 )?;
897
898 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
899 Some(tx_digest),
900 &input_object_kinds,
901 &receiving_objects_refs,
902 epoch_store.epoch(),
903 )?;
904
905 let (_gas_status, checked_input_objects) =
906 iota_transaction_checks::check_transaction_input(
907 epoch_store.protocol_config(),
908 epoch_store.reference_gas_price(),
909 tx_data,
910 input_objects,
911 &receiving_objects,
912 &self.metrics.bytecode_verifier_metrics,
913 &self.config.verifier_signing_config,
914 )?;
915
916 check_coin_deny_list_v1_during_signing(
917 tx_data.sender(),
918 &checked_input_objects,
919 &receiving_objects,
920 &self.get_object_store(),
921 )?;
922
923 let owned_objects = checked_input_objects.inner().filter_owned_objects();
924
925 let signed_transaction = VerifiedSignedTransaction::new(
926 epoch_store.epoch(),
927 transaction,
928 self.name,
929 &*self.secret,
930 );
931
932 self.get_cache_writer().try_acquire_transaction_locks(
937 epoch_store,
938 &owned_objects,
939 signed_transaction.clone(),
940 )?;
941
942 Ok(signed_transaction)
943 }
944
945 #[instrument(name = "handle_transaction", level = "trace", skip_all)]
947 pub async fn handle_transaction(
948 &self,
949 epoch_store: &Arc<AuthorityPerEpochStore>,
950 transaction: VerifiedTransaction,
951 ) -> IotaResult<HandleTransactionResponse> {
952 let tx_digest = *transaction.digest();
953 debug!("handle_transaction");
954
955 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
957 return Ok(HandleTransactionResponse { status });
958 }
959
960 let _metrics_guard = self
961 .metrics
962 .authority_state_handle_transaction_latency
963 .start_timer();
964 self.metrics.tx_orders.inc();
965
966 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
967 match signed {
968 Ok(s) => {
969 if self.is_committee_validator(epoch_store) {
970 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
971 let tx = s.clone();
972 let validator_tx_finalizer = validator_tx_finalizer.clone();
973 let cache_reader = self.get_transaction_cache_reader().clone();
974 let epoch_store = epoch_store.clone();
975 spawn_monitored_task!(epoch_store.within_alive_epoch(
976 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
977 ));
978 }
979 }
980 Ok(HandleTransactionResponse {
981 status: TransactionStatus::Signed(s.into_inner().into_sig()),
982 })
983 }
984 Err(err) => Ok(HandleTransactionResponse {
988 status: self
989 .get_transaction_status(&tx_digest, epoch_store)?
990 .ok_or(err)?
991 .1,
992 }),
993 }
994 }
995
996 pub fn check_system_overload_at_signing(&self) -> bool {
997 self.config
998 .authority_overload_config
999 .check_system_overload_at_signing
1000 }
1001
1002 pub fn check_system_overload_at_execution(&self) -> bool {
1003 self.config
1004 .authority_overload_config
1005 .check_system_overload_at_execution
1006 }
1007
1008 pub(crate) fn check_system_overload(
1009 &self,
1010 consensus_adapter: &Arc<ConsensusAdapter>,
1011 tx_data: &SenderSignedData,
1012 do_authority_overload_check: bool,
1013 ) -> IotaResult {
1014 if do_authority_overload_check {
1015 self.check_authority_overload(tx_data).tap_err(|_| {
1016 self.update_overload_metrics("execution_queue");
1017 })?;
1018 }
1019 self.transaction_manager
1020 .check_execution_overload(self.overload_config(), tx_data)
1021 .tap_err(|_| {
1022 self.update_overload_metrics("execution_pending");
1023 })?;
1024 consensus_adapter.check_consensus_overload().tap_err(|_| {
1025 self.update_overload_metrics("consensus");
1026 })?;
1027
1028 let pending_tx_count = self
1029 .get_cache_commit()
1030 .approximate_pending_transaction_count();
1031 if pending_tx_count
1032 > self
1033 .config
1034 .execution_cache_config
1035 .writeback_cache
1036 .backpressure_threshold_for_rpc()
1037 {
1038 return Err(IotaError::ValidatorOverloadedRetryAfter {
1039 retry_after_secs: 10,
1040 });
1041 }
1042
1043 Ok(())
1044 }
1045
1046 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1047 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1048 return Ok(());
1049 }
1050
1051 let load_shedding_percentage = self
1052 .overload_info
1053 .load_shedding_percentage
1054 .load(Ordering::Relaxed);
1055 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1056 }
1057
1058 fn update_overload_metrics(&self, source: &str) {
1059 self.metrics
1060 .transaction_overload_sources
1061 .with_label_values(&[source])
1062 .inc();
1063 }
1064
1065 #[instrument(level = "trace", skip_all)]
1067 pub async fn execute_certificate(
1068 &self,
1069 certificate: &VerifiedCertificate,
1070 epoch_store: &Arc<AuthorityPerEpochStore>,
1071 ) -> IotaResult<TransactionEffects> {
1072 let _metrics_guard = if certificate.contains_shared_object() {
1073 self.metrics
1074 .execute_certificate_latency_shared_object
1075 .start_timer()
1076 } else {
1077 self.metrics
1078 .execute_certificate_latency_single_writer
1079 .start_timer()
1080 };
1081 trace!("execute_certificate");
1082
1083 self.metrics.total_cert_attempts.inc();
1084
1085 if !certificate.contains_shared_object() {
1086 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1091 }
1092
1093 epoch_store
1096 .within_alive_epoch(self.notify_read_effects(certificate))
1097 .await
1098 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1099 .and_then(|r| r)
1100 }
1101
1102 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1117 pub fn try_execute_immediately(
1118 &self,
1119 certificate: &VerifiedExecutableTransaction,
1120 expected_effects_digest: Option<TransactionEffectsDigest>,
1121 epoch_store: &Arc<AuthorityPerEpochStore>,
1122 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1123 let _scope = monitored_scope("Execution::try_execute_immediately");
1124 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1125
1126 let tx_digest = certificate.digest();
1127
1128 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1130
1131 if let Some(effects) = self
1134 .get_transaction_cache_reader()
1135 .try_get_executed_effects(tx_digest)?
1136 {
1137 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1138 assert_eq!(
1139 effects.digest(),
1140 expected_effects_digest_inner,
1141 "Unexpected effects digest for transaction {tx_digest:?}"
1142 );
1143 }
1144 tx_guard.release();
1145 return Ok((effects, None));
1146 }
1147 let input_objects =
1148 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1149
1150 let expected_effects_digest =
1156 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1157
1158 self.process_certificate(
1159 tx_guard,
1160 certificate,
1161 input_objects,
1162 expected_effects_digest,
1163 epoch_store,
1164 )
1165 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1166 .tap_ok(
1167 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1168 )
1169 }
1170
1171 pub fn read_objects_for_execution(
1172 &self,
1173 tx_lock: &CertLockGuard,
1174 certificate: &VerifiedExecutableTransaction,
1175 epoch_store: &Arc<AuthorityPerEpochStore>,
1176 ) -> IotaResult<InputObjects> {
1177 let _scope = monitored_scope("Execution::load_input_objects");
1178 let _metrics_guard = self
1179 .metrics
1180 .execution_load_input_objects_latency
1181 .start_timer();
1182 let input_objects = &certificate.data().transaction_data().input_objects()?;
1183 self.input_loader.read_objects_for_execution(
1184 epoch_store,
1185 &certificate.key(),
1186 tx_lock,
1187 input_objects,
1188 epoch_store.epoch(),
1189 )
1190 }
1191
1192 pub fn try_execute_for_test(
1196 &self,
1197 certificate: &VerifiedCertificate,
1198 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1199 let epoch_store = self.epoch_store_for_testing();
1200 let (effects, execution_error_opt) = self.try_execute_immediately(
1201 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1202 None,
1203 &epoch_store,
1204 )?;
1205 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1206 Ok((signed_effects, execution_error_opt))
1207 }
1208
1209 pub fn execute_for_test(
1211 &self,
1212 certificate: &VerifiedCertificate,
1213 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1214 self.try_execute_for_test(certificate)
1215 .expect("try_execute_for_test should not fail")
1216 }
1217
1218 pub async fn notify_read_effects(
1219 &self,
1220 certificate: &VerifiedCertificate,
1221 ) -> IotaResult<TransactionEffects> {
1222 self.get_transaction_cache_reader()
1223 .try_notify_read_executed_effects(&[*certificate.digest()])
1224 .await
1225 .map(|mut r| r.pop().expect("must return correct number of effects"))
1226 }
1227
1228 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1229 self.get_object_cache_reader()
1230 .try_check_owned_objects_are_live(owned_object_refs)
1231 }
1232
1233 pub(crate) fn debug_dump_transaction_state(
1238 &self,
1239 tx_digest: &TransactionDigest,
1240 effects: &TransactionEffects,
1241 expected_effects_digest: TransactionEffectsDigest,
1242 inner_temporary_store: &InnerTemporaryStore,
1243 certificate: &VerifiedExecutableTransaction,
1244 debug_dump_config: &StateDebugDumpConfig,
1245 ) -> IotaResult<PathBuf> {
1246 let dump_dir = debug_dump_config
1247 .dump_file_directory
1248 .as_ref()
1249 .cloned()
1250 .unwrap_or(std::env::temp_dir());
1251 let epoch_store = self.load_epoch_store_one_call_per_task();
1252
1253 NodeStateDump::new(
1254 tx_digest,
1255 effects,
1256 expected_effects_digest,
1257 self.get_object_store().as_ref(),
1258 &epoch_store,
1259 inner_temporary_store,
1260 certificate,
1261 )?
1262 .write_to_file(&dump_dir)
1263 .map_err(|e| IotaError::FileIO(e.to_string()))
1264 }
1265
1266 #[instrument(level = "trace", skip_all)]
1267 pub(crate) fn process_certificate(
1268 &self,
1269 tx_guard: CertTxGuard,
1270 certificate: &VerifiedExecutableTransaction,
1271 input_objects: InputObjects,
1272 expected_effects_digest: Option<TransactionEffectsDigest>,
1273 epoch_store: &Arc<AuthorityPerEpochStore>,
1274 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1275 let process_certificate_start_time = tokio::time::Instant::now();
1276 let digest = *certificate.digest();
1277
1278 fail_point_if!("correlated-crash-process-certificate", || {
1279 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1280 iota_simulator::task::kill_current_node(None);
1281 }
1282 });
1283
1284 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1285 let execution_guard = match execution_guard {
1291 Ok(execution_guard) => execution_guard,
1292 Err(err) => {
1293 tx_guard.release();
1294 return Err(err);
1295 }
1296 };
1297 if *execution_guard != epoch_store.epoch() {
1301 tx_guard.release();
1302 info!("The epoch of the execution_guard doesn't match the epoch store");
1303 return Err(IotaError::WrongEpoch {
1304 expected_epoch: epoch_store.epoch(),
1305 actual_epoch: *execution_guard,
1306 });
1307 }
1308
1309 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1315 &execution_guard,
1316 certificate,
1317 input_objects,
1318 epoch_store,
1319 ) {
1320 Err(e) => {
1321 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1322 tx_guard.release();
1323 return Err(e);
1324 }
1325 Ok(res) => res,
1326 };
1327
1328 if let Some(expected_effects_digest) = expected_effects_digest {
1329 if effects.digest() != expected_effects_digest {
1330 match self.debug_dump_transaction_state(
1332 &digest,
1333 &effects,
1334 expected_effects_digest,
1335 &inner_temporary_store,
1336 certificate,
1337 &self.config.state_debug_dump_config,
1338 ) {
1339 Ok(out_path) => {
1340 info!(
1341 "Dumped node state for transaction {} to {}",
1342 digest,
1343 out_path.as_path().display().to_string()
1344 );
1345 }
1346 Err(e) => {
1347 error!("Error dumping state for transaction {}: {e}", digest);
1348 }
1349 }
1350 error!(
1351 tx_digest = ?digest,
1352 ?expected_effects_digest,
1353 actual_effects = ?effects,
1354 "fork detected!"
1355 );
1356 panic!(
1357 "Transaction {} is expected to have effects digest {}, but got {}!",
1358 digest,
1359 expected_effects_digest,
1360 effects.digest(),
1361 );
1362 }
1363 }
1364
1365 fail_point!("crash");
1366
1367 self.commit_certificate(
1368 certificate,
1369 inner_temporary_store,
1370 &effects,
1371 tx_guard,
1372 execution_guard,
1373 epoch_store,
1374 )?;
1375
1376 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1377 certificate.data().transaction_data().kind()
1378 {
1379 if let Some(err) = &execution_error_opt {
1380 debug_fatal!("Authenticator state update failed: {:?}", err);
1381 }
1382 epoch_store.update_authenticator_state(auth_state);
1383
1384 if cfg!(debug_assertions) {
1387 let authenticator_state = get_authenticator_state(self.get_object_store())
1388 .expect("Read cannot fail")
1389 .expect("Authenticator state must exist");
1390
1391 let mut sys_jwks: Vec<_> = authenticator_state
1392 .active_jwks
1393 .into_iter()
1394 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1395 .collect();
1396 let mut active_jwks: Vec<_> = epoch_store
1397 .signature_verifier
1398 .get_jwks()
1399 .into_iter()
1400 .collect();
1401 sys_jwks.sort();
1402 active_jwks.sort();
1403
1404 assert_eq!(sys_jwks, active_jwks);
1405 }
1406 }
1407
1408 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1409 if elapsed > 0.0 {
1410 self.metrics
1411 .execution_gas_latency_ratio
1412 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1413 };
1414 Ok((effects, execution_error_opt))
1415 }
1416
1417 #[instrument(level = "trace", skip_all)]
1418 fn commit_certificate(
1419 &self,
1420 certificate: &VerifiedExecutableTransaction,
1421 inner_temporary_store: InnerTemporaryStore,
1422 effects: &TransactionEffects,
1423 tx_guard: CertTxGuard,
1424 _execution_guard: ExecutionLockReadGuard<'_>,
1425 epoch_store: &Arc<AuthorityPerEpochStore>,
1426 ) -> IotaResult {
1427 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1428 monitored_scope("Execution::commit_certificate");
1429 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1430
1431 let tx_key = certificate.key();
1432 let tx_digest = certificate.digest();
1433 let input_object_count = inner_temporary_store.input_objects.len();
1434 let shared_object_count = effects.input_shared_objects().len();
1435
1436 let output_keys = inner_temporary_store.get_output_keys(effects);
1437
1438 let _ = self
1440 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1441 .tap_err(|e| {
1442 self.metrics.post_processing_total_failures.inc();
1443 error!(?tx_digest, "tx post processing failed: {e}");
1444 });
1445
1446 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1450
1451 fail_point!("crash");
1453
1454 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1455 certificate.clone().into_unsigned(),
1456 effects.clone(),
1457 inner_temporary_store,
1458 );
1459 self.get_cache_writer()
1460 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1461
1462 if certificate.transaction_data().is_end_of_epoch_tx() {
1463 self.get_object_cache_reader()
1466 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1467 }
1468
1469 tx_guard.commit_tx();
1471
1472 self.transaction_manager
1476 .notify_commit(tx_digest, output_keys, epoch_store);
1477
1478 self.update_metrics(certificate, input_object_count, shared_object_count);
1479
1480 Ok(())
1481 }
1482
1483 fn update_metrics(
1484 &self,
1485 certificate: &VerifiedExecutableTransaction,
1486 input_object_count: usize,
1487 shared_object_count: usize,
1488 ) {
1489 if certificate.has_zklogin_sig() {
1491 self.metrics.zklogin_sig_count.inc();
1492 } else if certificate.has_upgraded_multisig() {
1493 self.metrics.multisig_sig_count.inc();
1494 }
1495
1496 self.metrics.total_effects.inc();
1497 self.metrics.total_certs.inc();
1498
1499 if shared_object_count > 0 {
1500 self.metrics.shared_obj_tx.inc();
1501 }
1502
1503 if certificate.is_sponsored_tx() {
1504 self.metrics.sponsored_tx.inc();
1505 }
1506
1507 self.metrics
1508 .num_input_objs
1509 .observe(input_object_count as f64);
1510 self.metrics
1511 .num_shared_objects
1512 .observe(shared_object_count as f64);
1513 self.metrics.batch_size.observe(
1514 certificate
1515 .data()
1516 .intent_message()
1517 .value
1518 .kind()
1519 .num_commands() as f64,
1520 );
1521 }
1522
1523 #[instrument(level = "trace", skip_all)]
1535 fn prepare_certificate(
1536 &self,
1537 _execution_guard: &ExecutionLockReadGuard<'_>,
1538 certificate: &VerifiedExecutableTransaction,
1539 input_objects: InputObjects,
1540 epoch_store: &Arc<AuthorityPerEpochStore>,
1541 ) -> IotaResult<(
1542 InnerTemporaryStore,
1543 TransactionEffects,
1544 Option<ExecutionError>,
1545 )> {
1546 let _scope = monitored_scope("Execution::prepare_certificate");
1547 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1548 let prepare_certificate_start_time = tokio::time::Instant::now();
1549
1550 let tx_data = certificate.data().transaction_data();
1553 tx_data.validity_check(epoch_store.protocol_config())?;
1554
1555 let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1558 certificate,
1559 input_objects,
1560 epoch_store.protocol_config(),
1561 epoch_store.reference_gas_price(),
1562 )?;
1563
1564 let owned_object_refs = input_objects.inner().filter_owned_objects();
1565 self.check_owned_locks(&owned_object_refs)?;
1566 let tx_digest = *certificate.digest();
1567 let protocol_config = epoch_store.protocol_config();
1568 let transaction_data = &certificate.data().intent_message().value;
1569 let (kind, signer, gas) = transaction_data.execution_parts();
1570
1571 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1572 let (inner_temp_store, _, mut effects, execution_error_opt) =
1573 epoch_store.executor().execute_transaction_to_effects(
1574 self.get_backing_store().as_ref(),
1575 protocol_config,
1576 self.metrics.limits_metrics.clone(),
1577 self.config
1580 .expensive_safety_check_config
1581 .enable_deep_per_tx_iota_conservation_check(),
1582 self.config.certificate_deny_config.certificate_deny_set(),
1583 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1584 epoch_store
1585 .epoch_start_config()
1586 .epoch_data()
1587 .epoch_start_timestamp(),
1588 input_objects,
1589 gas,
1590 gas_status,
1591 kind,
1592 signer,
1593 tx_digest,
1594 &mut None,
1595 );
1596
1597 fail_point_if!("cp_execution_nondeterminism", || {
1598 #[cfg(msim)]
1599 self.create_fail_state(certificate, epoch_store, &mut effects);
1600 });
1601
1602 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1603 if elapsed > 0.0 {
1604 self.metrics
1605 .prepare_cert_gas_latency_ratio
1606 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1607 }
1608
1609 Ok((inner_temp_store, effects, execution_error_opt.err()))
1610 }
1611
1612 pub fn prepare_certificate_for_benchmark(
1613 &self,
1614 certificate: &VerifiedExecutableTransaction,
1615 input_objects: InputObjects,
1616 epoch_store: &Arc<AuthorityPerEpochStore>,
1617 ) -> IotaResult<(
1618 InnerTemporaryStore,
1619 TransactionEffects,
1620 Option<ExecutionError>,
1621 )> {
1622 let lock = RwLock::new(epoch_store.epoch());
1623 let execution_guard = lock.try_read().unwrap();
1624
1625 self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1626 }
1627
1628 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1631 #[allow(clippy::type_complexity)]
1632 pub fn dry_exec_transaction(
1633 &self,
1634 transaction: TransactionData,
1635 transaction_digest: TransactionDigest,
1636 ) -> IotaResult<(
1637 DryRunTransactionBlockResponse,
1638 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1639 TransactionEffects,
1640 Option<ObjectID>,
1641 )> {
1642 let epoch_store = self.load_epoch_store_one_call_per_task();
1643 if !self.is_fullnode(&epoch_store) {
1644 return Err(IotaError::UnsupportedFeature {
1645 error: "dry-exec is only supported on fullnodes".to_string(),
1646 });
1647 }
1648
1649 if transaction.kind().is_system_tx() {
1650 return Err(IotaError::UnsupportedFeature {
1651 error: "dry-exec does not support system transactions".to_string(),
1652 });
1653 }
1654
1655 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1656 }
1657
1658 #[allow(clippy::type_complexity)]
1659 pub fn dry_exec_transaction_for_benchmark(
1660 &self,
1661 transaction: TransactionData,
1662 transaction_digest: TransactionDigest,
1663 ) -> IotaResult<(
1664 DryRunTransactionBlockResponse,
1665 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1666 TransactionEffects,
1667 Option<ObjectID>,
1668 )> {
1669 let epoch_store = self.load_epoch_store_one_call_per_task();
1670 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1671 }
1672
1673 #[instrument(level = "trace", skip_all)]
1674 #[allow(clippy::type_complexity)]
1675 fn dry_exec_transaction_impl(
1676 &self,
1677 epoch_store: &AuthorityPerEpochStore,
1678 transaction: TransactionData,
1679 transaction_digest: TransactionDigest,
1680 ) -> IotaResult<(
1681 DryRunTransactionBlockResponse,
1682 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1683 TransactionEffects,
1684 Option<ObjectID>,
1685 )> {
1686 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1688
1689 let input_object_kinds = transaction.input_objects()?;
1690 let receiving_object_refs = transaction.receiving_objects();
1691
1692 iota_transaction_checks::deny::check_transaction_for_signing(
1693 &transaction,
1694 &[],
1695 &input_object_kinds,
1696 &receiving_object_refs,
1697 &self.config.transaction_deny_config,
1698 self.get_backing_package_store().as_ref(),
1699 )?;
1700
1701 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1702 None,
1704 &input_object_kinds,
1705 &receiving_object_refs,
1706 epoch_store.epoch(),
1707 )?;
1708
1709 let mut transaction = transaction;
1711 let mut gas_object_refs = transaction.gas().to_vec();
1712 let reference_gas_price = epoch_store.reference_gas_price();
1713 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1714 let sender = transaction.gas_owner();
1715 let gas_object_id = ObjectID::random();
1716 let gas_object = Object::new_move(
1717 MoveObject::new_gas_coin(
1718 OBJECT_START_VERSION,
1719 gas_object_id,
1720 SIMULATION_GAS_COIN_VALUE,
1721 ),
1722 Owner::AddressOwner(sender),
1723 TransactionDigest::genesis_marker(),
1724 );
1725 let gas_object_ref = gas_object.compute_object_reference();
1726 gas_object_refs = vec![gas_object_ref];
1727 transaction.gas_data_mut().payment = gas_object_refs.clone();
1729 (
1730 iota_transaction_checks::check_transaction_input_with_given_gas(
1731 epoch_store.protocol_config(),
1732 reference_gas_price,
1733 &transaction,
1734 input_objects,
1735 receiving_objects,
1736 gas_object,
1737 &self.metrics.bytecode_verifier_metrics,
1738 &self.config.verifier_signing_config,
1739 )?,
1740 Some(gas_object_id),
1741 )
1742 } else {
1743 (
1744 iota_transaction_checks::check_transaction_input(
1745 epoch_store.protocol_config(),
1746 reference_gas_price,
1747 &transaction,
1748 input_objects,
1749 &receiving_objects,
1750 &self.metrics.bytecode_verifier_metrics,
1751 &self.config.verifier_signing_config,
1752 )?,
1753 None,
1754 )
1755 };
1756
1757 let protocol_config = epoch_store.protocol_config();
1758 let (kind, signer, _) = transaction.execution_parts();
1759
1760 let silent = true;
1761 let executor = iota_execution::executor(protocol_config, silent, None)
1762 .expect("Creating an executor should not fail here");
1763
1764 let expensive_checks = false;
1765 let (inner_temp_store, _, effects, _execution_error) = executor
1766 .execute_transaction_to_effects(
1767 self.get_backing_store().as_ref(),
1768 protocol_config,
1769 self.metrics.limits_metrics.clone(),
1770 expensive_checks,
1771 self.config.certificate_deny_config.certificate_deny_set(),
1772 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1773 epoch_store
1774 .epoch_start_config()
1775 .epoch_data()
1776 .epoch_start_timestamp(),
1777 checked_input_objects,
1778 gas_object_refs,
1779 gas_status,
1780 kind,
1781 signer,
1782 transaction_digest,
1783 &mut None,
1784 );
1785 let tx_digest = *effects.transaction_digest();
1786
1787 let module_cache =
1788 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1789
1790 let mut layout_resolver =
1791 epoch_store
1792 .executor()
1793 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1794 &inner_temp_store,
1795 self.get_backing_package_store(),
1796 )));
1797 let object_changes = Vec::new();
1799
1800 let balance_changes = Vec::new();
1802
1803 let written_with_kind = effects
1804 .created()
1805 .into_iter()
1806 .map(|(oref, _)| (oref, WriteKind::Create))
1807 .chain(
1808 effects
1809 .unwrapped()
1810 .into_iter()
1811 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1812 )
1813 .chain(
1814 effects
1815 .mutated()
1816 .into_iter()
1817 .map(|(oref, _)| (oref, WriteKind::Mutate)),
1818 )
1819 .map(|(oref, kind)| {
1820 let obj = inner_temp_store.written.get(&oref.0).unwrap();
1821 (oref.0, (oref, obj.clone(), kind))
1823 })
1824 .collect();
1825
1826 Ok((
1827 DryRunTransactionBlockResponse {
1828 suggested_gas_price: self
1830 .congestion_tracker
1831 .get_prediction_suggested_gas_price(&transaction),
1832 input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1833 .map_err(|e| IotaError::TransactionSerialization {
1834 error: format!(
1835 "Failed to convert transaction to IotaTransactionBlockData: {e}",
1836 ),
1837 })?, effects: effects.clone().try_into()?,
1839 events: IotaTransactionBlockEvents::try_from(
1840 inner_temp_store.events.clone(),
1841 tx_digest,
1842 None,
1843 layout_resolver.as_mut(),
1844 )?,
1845 object_changes,
1846 balance_changes,
1847 },
1848 written_with_kind,
1849 effects,
1850 mock_gas,
1851 ))
1852 }
1853
1854 pub fn simulate_transaction(
1855 &self,
1856 mut transaction: TransactionData,
1857 checks: VmChecks,
1858 ) -> IotaResult<SimulateTransactionResult> {
1859 if transaction.kind().is_system_tx() {
1860 return Err(IotaError::UnsupportedFeature {
1861 error: "simulate does not support system transactions".to_string(),
1862 });
1863 }
1864
1865 let epoch_store = self.load_epoch_store_one_call_per_task();
1866 if !self.is_fullnode(&epoch_store) {
1867 return Err(IotaError::UnsupportedFeature {
1868 error: "simulate is only supported on fullnodes".to_string(),
1869 });
1870 }
1871
1872 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1876
1877 let input_object_kinds = transaction.input_objects()?;
1878 let receiving_object_refs = transaction.receiving_objects();
1879
1880 iota_transaction_checks::deny::check_transaction_for_signing(
1883 &transaction,
1884 &[],
1885 &input_object_kinds,
1886 &receiving_object_refs,
1887 &self.config.transaction_deny_config,
1888 self.get_backing_package_store().as_ref(),
1889 )?;
1890
1891 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1893 None,
1895 &input_object_kinds,
1896 &receiving_object_refs,
1897 epoch_store.epoch(),
1898 )?;
1899
1900 let mock_gas_id = if transaction.gas().is_empty() {
1902 let mock_gas_object = Object::new_move(
1903 MoveObject::new_gas_coin(
1904 OBJECT_START_VERSION,
1905 ObjectID::MAX,
1906 SIMULATION_GAS_COIN_VALUE,
1907 ),
1908 Owner::AddressOwner(transaction.gas_data().owner),
1909 TransactionDigest::genesis_marker(),
1910 );
1911 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
1912 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
1913 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
1914 Some(mock_gas_object.id())
1915 } else {
1916 None
1917 };
1918
1919 let protocol_config = epoch_store.protocol_config();
1920
1921 let (gas_status, checked_input_objects) = if checks.enabled() {
1924 iota_transaction_checks::check_transaction_input(
1925 protocol_config,
1926 epoch_store.reference_gas_price(),
1927 &transaction,
1928 input_objects,
1929 &receiving_objects,
1930 &self.metrics.bytecode_verifier_metrics,
1931 &self.config.verifier_signing_config,
1932 )?
1933 } else {
1934 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
1935 protocol_config,
1936 transaction.kind(),
1937 input_objects,
1938 receiving_objects,
1939 )?;
1940 let gas_status = IotaGasStatus::new(
1941 transaction.gas_budget(),
1942 transaction.gas_price(),
1943 epoch_store.reference_gas_price(),
1944 protocol_config,
1945 )?;
1946
1947 (gas_status, checked_input_objects)
1948 };
1949
1950 let executor = iota_execution::executor(
1952 protocol_config,
1953 true, None,
1955 )
1956 .expect("Creating an executor should not fail here");
1957
1958 let (kind, signer, gas_coins) = transaction.execution_parts();
1960 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
1961 self.get_backing_store().as_ref(),
1962 protocol_config,
1963 self.metrics.limits_metrics.clone(),
1964 false, self.config.certificate_deny_config.certificate_deny_set(),
1966 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1967 epoch_store
1968 .epoch_start_config()
1969 .epoch_data()
1970 .epoch_start_timestamp(),
1971 checked_input_objects,
1972 gas_coins,
1973 gas_status,
1974 kind,
1975 signer,
1976 transaction.digest(),
1977 checks.disabled(),
1978 );
1979
1980 Ok(SimulateTransactionResult {
1983 input_objects: inner_temp_store.input_objects,
1984 output_objects: inner_temp_store.written,
1985 events: effects.events_digest().map(|_| inner_temp_store.events),
1986 effects,
1987 execution_result,
1988 mock_gas_id,
1989 })
1990 }
1991
1992 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
1997 pub async fn dev_inspect_transaction_block(
1998 &self,
1999 sender: IotaAddress,
2000 transaction_kind: TransactionKind,
2001 gas_price: Option<u64>,
2002 gas_budget: Option<u64>,
2003 gas_sponsor: Option<IotaAddress>,
2004 gas_objects: Option<Vec<ObjectRef>>,
2005 show_raw_txn_data_and_effects: Option<bool>,
2006 skip_checks: Option<bool>,
2007 ) -> IotaResult<DevInspectResults> {
2008 let epoch_store = self.load_epoch_store_one_call_per_task();
2009
2010 if !self.is_fullnode(&epoch_store) {
2011 return Err(IotaError::UnsupportedFeature {
2012 error: "dev-inspect is only supported on fullnodes".to_string(),
2013 });
2014 }
2015
2016 if transaction_kind.is_system_tx() {
2017 return Err(IotaError::UnsupportedFeature {
2018 error: "system transactions are not supported".to_string(),
2019 });
2020 }
2021
2022 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2023 let skip_checks = skip_checks.unwrap_or(true);
2024 let reference_gas_price = epoch_store.reference_gas_price();
2025 let protocol_config = epoch_store.protocol_config();
2026 let max_tx_gas = protocol_config.max_tx_gas();
2027
2028 let price = gas_price.unwrap_or(reference_gas_price);
2029 let budget = gas_budget.unwrap_or(max_tx_gas);
2030 let owner = gas_sponsor.unwrap_or(sender);
2031 let payment = gas_objects.unwrap_or_default();
2034 let transaction = TransactionData::V1(TransactionDataV1 {
2035 kind: transaction_kind.clone(),
2036 sender,
2037 gas_data: GasData {
2038 payment,
2039 owner,
2040 price,
2041 budget,
2042 },
2043 expiration: TransactionExpiration::None,
2044 });
2045
2046 let raw_txn_data = if show_raw_txn_data_and_effects {
2047 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2048 error: "Failed to serialize transaction during dev inspect".to_string(),
2049 })?
2050 } else {
2051 vec![]
2052 };
2053
2054 transaction.validity_check_no_gas_check(protocol_config)?;
2055
2056 let input_object_kinds = transaction.input_objects()?;
2057 let receiving_object_refs = transaction.receiving_objects();
2058
2059 iota_transaction_checks::deny::check_transaction_for_signing(
2060 &transaction,
2061 &[],
2062 &input_object_kinds,
2063 &receiving_object_refs,
2064 &self.config.transaction_deny_config,
2065 self.get_backing_package_store().as_ref(),
2066 )?;
2067
2068 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2069 None,
2071 &input_object_kinds,
2072 &receiving_object_refs,
2073 epoch_store.epoch(),
2074 )?;
2075
2076 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2078 SIMULATION_GAS_COIN_VALUE,
2079 transaction.gas_owner(),
2080 );
2081
2082 let gas_objects = if transaction.gas().is_empty() {
2083 let gas_object_ref = dummy_gas_object.compute_object_reference();
2084 vec![gas_object_ref]
2085 } else {
2086 transaction.gas().to_vec()
2087 };
2088
2089 let (gas_status, checked_input_objects) = if skip_checks {
2090 if transaction.gas().is_empty() {
2095 input_objects.push(ObjectReadResult::new(
2096 InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2097 dummy_gas_object.into(),
2098 ));
2099 }
2100 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2101 protocol_config,
2102 &transaction_kind,
2103 input_objects,
2104 receiving_objects,
2105 )?;
2106 let gas_status = IotaGasStatus::new(
2107 max_tx_gas,
2108 transaction.gas_price(),
2109 reference_gas_price,
2110 protocol_config,
2111 )?;
2112
2113 (gas_status, checked_input_objects)
2114 } else {
2115 if transaction.gas().is_empty() {
2119 iota_transaction_checks::check_transaction_input_with_given_gas(
2120 epoch_store.protocol_config(),
2121 reference_gas_price,
2122 &transaction,
2123 input_objects,
2124 receiving_objects,
2125 dummy_gas_object,
2126 &self.metrics.bytecode_verifier_metrics,
2127 &self.config.verifier_signing_config,
2128 )?
2129 } else {
2130 iota_transaction_checks::check_transaction_input(
2131 epoch_store.protocol_config(),
2132 reference_gas_price,
2133 &transaction,
2134 input_objects,
2135 &receiving_objects,
2136 &self.metrics.bytecode_verifier_metrics,
2137 &self.config.verifier_signing_config,
2138 )?
2139 }
2140 };
2141
2142 let executor = iota_execution::executor(protocol_config, true, None)
2143 .expect("Creating an executor should not fail here");
2144 let intent_msg = IntentMessage::new(
2145 Intent {
2146 version: IntentVersion::V0,
2147 scope: IntentScope::TransactionData,
2148 app_id: IntentAppId::Iota,
2149 },
2150 transaction,
2151 );
2152 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2153 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2154 self.get_backing_store().as_ref(),
2155 protocol_config,
2156 self.metrics.limits_metrics.clone(),
2157 false,
2159 self.config.certificate_deny_config.certificate_deny_set(),
2160 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2161 epoch_store
2162 .epoch_start_config()
2163 .epoch_data()
2164 .epoch_start_timestamp(),
2165 checked_input_objects,
2166 gas_objects,
2167 gas_status,
2168 transaction_kind,
2169 sender,
2170 transaction_digest,
2171 skip_checks,
2172 );
2173
2174 let raw_effects = if show_raw_txn_data_and_effects {
2175 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2176 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2177 })?
2178 } else {
2179 vec![]
2180 };
2181
2182 let mut layout_resolver =
2183 epoch_store
2184 .executor()
2185 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2186 &inner_temp_store,
2187 self.get_backing_package_store(),
2188 )));
2189
2190 DevInspectResults::new(
2191 effects,
2192 inner_temp_store.events.clone(),
2193 execution_result,
2194 raw_txn_data,
2195 raw_effects,
2196 layout_resolver.as_mut(),
2197 )
2198 }
2199
2200 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2202 let epoch_store = self.epoch_store_for_testing();
2203 Ok(epoch_store.reference_gas_price())
2204 }
2205
2206 #[instrument(level = "trace", skip_all)]
2207 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2208 self.get_transaction_cache_reader()
2209 .try_is_tx_already_executed(digest)
2210 }
2211
2212 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2214 self.try_is_tx_already_executed(digest)
2215 .expect("storage access failed")
2216 }
2217
2218 #[instrument(level = "debug", skip_all, err)]
2220 fn index_tx(
2221 &self,
2222 indexes: &IndexStore,
2223 digest: &TransactionDigest,
2224 cert: &VerifiedExecutableTransaction,
2226 effects: &TransactionEffects,
2227 events: &TransactionEvents,
2228 timestamp_ms: u64,
2229 tx_coins: Option<TxCoins>,
2230 written: &WrittenObjects,
2231 inner_temporary_store: &InnerTemporaryStore,
2232 ) -> IotaResult<u64> {
2233 let changes = self
2234 .process_object_index(effects, written, inner_temporary_store)
2235 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2236
2237 indexes.index_tx(
2238 cert.data().intent_message().value.sender(),
2239 cert.data()
2240 .intent_message()
2241 .value
2242 .input_objects()?
2243 .iter()
2244 .map(|o| o.object_id()),
2245 effects
2246 .all_changed_objects()
2247 .into_iter()
2248 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2249 cert.data()
2250 .intent_message()
2251 .value
2252 .move_calls()
2253 .into_iter()
2254 .map(|(package, module, function)| {
2255 (*package, module.to_owned(), function.to_owned())
2256 }),
2257 events,
2258 changes,
2259 digest,
2260 timestamp_ms,
2261 tx_coins,
2262 )
2263 }
2264
2265 #[cfg(msim)]
2266 fn create_fail_state(
2267 &self,
2268 certificate: &VerifiedExecutableTransaction,
2269 epoch_store: &Arc<AuthorityPerEpochStore>,
2270 effects: &mut TransactionEffects,
2271 ) {
2272 use std::cell::RefCell;
2273 thread_local! {
2274 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2275 }
2276 if !certificate.data().intent_message().value.is_system_tx() {
2277 let committee = epoch_store.committee();
2278 let cur_stake = (**committee).weight(&self.name);
2279 if cur_stake > 0 {
2280 FAIL_STATE.with_borrow_mut(|fail_state| {
2281 if fail_state.0 < committee.validity_threshold() {
2283 fail_state.0 += cur_stake;
2284 fail_state.1.insert(self.name);
2285 }
2286
2287 if fail_state.1.contains(&self.name) {
2288 info!("cp_exec failing tx");
2289 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2290 }
2291 });
2292 }
2293 }
2294 }
2295
2296 fn process_object_index(
2297 &self,
2298 effects: &TransactionEffects,
2299 written: &WrittenObjects,
2300 inner_temporary_store: &InnerTemporaryStore,
2301 ) -> IotaResult<ObjectIndexChanges> {
2302 let epoch_store = self.load_epoch_store_one_call_per_task();
2303 let mut layout_resolver =
2304 epoch_store
2305 .executor()
2306 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2307 inner_temporary_store,
2308 self.get_backing_package_store(),
2309 )));
2310
2311 let modified_at_version = effects
2312 .modified_at_versions()
2313 .into_iter()
2314 .collect::<HashMap<_, _>>();
2315
2316 let tx_digest = effects.transaction_digest();
2317 let mut deleted_owners = vec![];
2318 let mut deleted_dynamic_fields = vec![];
2319 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2320 let old_version = modified_at_version.get(&id).unwrap();
2321 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2324 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2325 ) {
2326 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2327 Owner::ObjectOwner(object_id) => {
2328 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2329 }
2330 _ => {}
2331 }
2332 }
2333
2334 let mut new_owners = vec![];
2335 let mut new_dynamic_fields = vec![];
2336
2337 for (oref, owner, kind) in effects.all_changed_objects() {
2338 let id = &oref.0;
2339 if let WriteKind::Mutate = kind {
2342 let Some(old_version) = modified_at_version.get(id) else {
2343 panic!(
2344 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2345 );
2346 };
2347 let Some(old_object) = self
2350 .get_object_store()
2351 .try_get_object_by_key(id, *old_version)?
2352 else {
2353 panic!(
2354 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2355 );
2356 };
2357 if old_object.owner != owner {
2358 match old_object.owner {
2359 Owner::AddressOwner(addr) => {
2360 deleted_owners.push((addr, *id));
2361 }
2362 Owner::ObjectOwner(object_id) => {
2363 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2364 }
2365 _ => {}
2366 }
2367 }
2368 }
2369
2370 match owner {
2371 Owner::AddressOwner(addr) => {
2372 let new_object = written.get(id).unwrap_or_else(
2375 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2376 );
2377 assert_eq!(
2378 new_object.version(),
2379 oref.1,
2380 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2381 tx_digest,
2382 id,
2383 new_object.version(),
2384 oref.1
2385 );
2386
2387 let type_ = new_object
2388 .type_()
2389 .map(|type_| ObjectType::Struct(type_.clone()))
2390 .unwrap_or(ObjectType::Package);
2391
2392 new_owners.push((
2393 (addr, *id),
2394 ObjectInfo {
2395 object_id: *id,
2396 version: oref.1,
2397 digest: oref.2,
2398 type_,
2399 owner,
2400 previous_transaction: *effects.transaction_digest(),
2401 },
2402 ));
2403 }
2404 Owner::ObjectOwner(owner) => {
2405 let new_object = written.get(id).unwrap_or_else(
2406 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2407 );
2408 assert_eq!(
2409 new_object.version(),
2410 oref.1,
2411 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2412 tx_digest,
2413 id,
2414 new_object.version(),
2415 oref.1
2416 );
2417
2418 let Some(df_info) = self
2419 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2420 .unwrap_or_else(|e| {
2421 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2422 None
2423 }
2424 )
2425 else {
2426 continue;
2428 };
2429 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2430 }
2431 _ => {}
2432 }
2433 }
2434
2435 Ok(ObjectIndexChanges {
2436 deleted_owners,
2437 deleted_dynamic_fields,
2438 new_owners,
2439 new_dynamic_fields,
2440 })
2441 }
2442
2443 fn try_create_dynamic_field_info(
2444 &self,
2445 o: &Object,
2446 written: &WrittenObjects,
2447 resolver: &mut dyn LayoutResolver,
2448 get_latest_object_version: bool,
2449 ) -> IotaResult<Option<DynamicFieldInfo>> {
2450 let Some(move_object) = o.data.try_as_move().cloned() else {
2452 return Ok(None);
2453 };
2454
2455 if !move_object.type_().is_dynamic_field() {
2457 return Ok(None);
2458 }
2459
2460 let layout = resolver
2461 .get_annotated_layout(&move_object.type_().clone().into())?
2462 .into_layout();
2463
2464 let field =
2465 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2466 IotaError::ObjectDeserialization {
2467 error: e.to_string(),
2468 }
2469 })?;
2470
2471 let type_ = field.kind;
2472 let name_type: TypeTag = field.name_layout.into();
2473 let bcs_name = field.name_bytes.to_owned();
2474
2475 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2476 .map_err(|e| {
2477 warn!("{e}");
2478 IotaError::ObjectDeserialization {
2479 error: e.to_string(),
2480 }
2481 })?;
2482
2483 let name = DynamicFieldName {
2484 type_: name_type,
2485 value: IotaMoveValue::from(name_value).to_json_value(),
2486 };
2487
2488 let value_metadata = field.value_metadata().map_err(|e| {
2489 warn!("{e}");
2490 IotaError::ObjectDeserialization {
2491 error: e.to_string(),
2492 }
2493 })?;
2494
2495 Ok(Some(match value_metadata {
2496 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2497 name,
2498 bcs_name,
2499 type_,
2500 object_type: object_type.to_canonical_string(true),
2501 object_id: o.id(),
2502 version: o.version(),
2503 digest: o.digest(),
2504 },
2505
2506 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2507 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2512 (
2513 object.version(),
2514 object.digest(),
2515 object.data.type_().unwrap().clone(),
2516 )
2517 } else {
2518 let object = if get_latest_object_version {
2520 self.get_object_store()
2527 .try_get_object(&object_id)?
2528 .ok_or_else(|| UserInputError::ObjectNotFound {
2529 object_id,
2530 version: Some(o.version()),
2531 })?
2532 } else {
2533 self.get_object_store()
2535 .try_get_object_by_key(&object_id, o.version())?
2536 .ok_or_else(|| UserInputError::ObjectNotFound {
2537 object_id,
2538 version: Some(o.version()),
2539 })?
2540 };
2541
2542 (
2543 object.version(),
2544 object.digest(),
2545 object.data.type_().unwrap().clone(),
2546 )
2547 };
2548
2549 DynamicFieldInfo {
2550 name,
2551 bcs_name,
2552 type_,
2553 object_type: object_type.to_string(),
2554 object_id,
2555 version,
2556 digest,
2557 }
2558 }
2559 }))
2560 }
2561
2562 #[instrument(level = "trace", skip_all, err)]
2563 fn post_process_one_tx(
2564 &self,
2565 certificate: &VerifiedExecutableTransaction,
2566 effects: &TransactionEffects,
2567 inner_temporary_store: &InnerTemporaryStore,
2568 epoch_store: &Arc<AuthorityPerEpochStore>,
2569 ) -> IotaResult {
2570 if self.indexes.is_none() {
2571 return Ok(());
2572 }
2573
2574 let tx_digest = certificate.digest();
2575 let timestamp_ms = Self::unixtime_now_ms();
2576 let events = &inner_temporary_store.events;
2577 let written = &inner_temporary_store.written;
2578 let tx_coins =
2579 self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2580
2581 if let Some(indexes) = &self.indexes {
2583 let _ = self
2584 .index_tx(
2585 indexes.as_ref(),
2586 tx_digest,
2587 certificate,
2588 effects,
2589 events,
2590 timestamp_ms,
2591 tx_coins,
2592 written,
2593 inner_temporary_store,
2594 )
2595 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2596 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2597 .expect("Indexing tx should not fail");
2598
2599 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2600 let events = self.make_transaction_block_events(
2601 events.clone(),
2602 *tx_digest,
2603 timestamp_ms,
2604 epoch_store,
2605 inner_temporary_store,
2606 )?;
2607 self.subscription_handler
2609 .process_tx(certificate.data().transaction_data(), &effects, &events)
2610 .tap_ok(|_| {
2611 self.metrics
2612 .post_processing_total_tx_had_event_processed
2613 .inc()
2614 })
2615 .tap_err(|e| {
2616 warn!(
2617 ?tx_digest,
2618 "Post processing - Couldn't process events for tx: {}", e
2619 )
2620 })?;
2621
2622 self.metrics
2623 .post_processing_total_events_emitted
2624 .inc_by(events.data.len() as u64);
2625 };
2626 Ok(())
2627 }
2628
2629 fn make_transaction_block_events(
2630 &self,
2631 transaction_events: TransactionEvents,
2632 digest: TransactionDigest,
2633 timestamp_ms: u64,
2634 epoch_store: &Arc<AuthorityPerEpochStore>,
2635 inner_temporary_store: &InnerTemporaryStore,
2636 ) -> IotaResult<IotaTransactionBlockEvents> {
2637 let mut layout_resolver =
2638 epoch_store
2639 .executor()
2640 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2641 inner_temporary_store,
2642 self.get_backing_package_store(),
2643 )));
2644 IotaTransactionBlockEvents::try_from(
2645 transaction_events,
2646 digest,
2647 Some(timestamp_ms),
2648 layout_resolver.as_mut(),
2649 )
2650 }
2651
2652 pub fn unixtime_now_ms() -> u64 {
2653 let now = SystemTime::now()
2654 .duration_since(UNIX_EPOCH)
2655 .expect("Time went backwards")
2656 .as_millis();
2657 u64::try_from(now).expect("Travelling in time machine")
2658 }
2659
2660 #[instrument(level = "trace", skip_all)]
2661 pub async fn handle_transaction_info_request(
2662 &self,
2663 request: TransactionInfoRequest,
2664 ) -> IotaResult<TransactionInfoResponse> {
2665 let epoch_store = self.load_epoch_store_one_call_per_task();
2666 let (transaction, status) = self
2667 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2668 .ok_or(IotaError::TransactionNotFound {
2669 digest: request.transaction_digest,
2670 })?;
2671 Ok(TransactionInfoResponse {
2672 transaction,
2673 status,
2674 })
2675 }
2676
2677 #[instrument(level = "trace", skip_all)]
2678 pub async fn handle_object_info_request(
2679 &self,
2680 request: ObjectInfoRequest,
2681 ) -> IotaResult<ObjectInfoResponse> {
2682 let epoch_store = self.load_epoch_store_one_call_per_task();
2683
2684 let requested_object_seq = match request.request_kind {
2685 ObjectInfoRequestKind::LatestObjectInfo => {
2686 let (_, seq, _) = self
2687 .try_get_object_or_tombstone(request.object_id)
2688 .await?
2689 .ok_or_else(|| {
2690 IotaError::from(UserInputError::ObjectNotFound {
2691 object_id: request.object_id,
2692 version: None,
2693 })
2694 })?;
2695 seq
2696 }
2697 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2698 };
2699
2700 let object = self
2701 .get_object_store()
2702 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2703 .ok_or_else(|| {
2704 IotaError::from(UserInputError::ObjectNotFound {
2705 object_id: request.object_id,
2706 version: Some(requested_object_seq),
2707 })
2708 })?;
2709
2710 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2711 (request.generate_layout, object.data.try_as_move())
2712 {
2713 Some(into_struct_layout(
2714 epoch_store
2715 .executor()
2716 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2717 .get_annotated_layout(&move_obj.type_().clone().into())?,
2718 )?)
2719 } else {
2720 None
2721 };
2722
2723 let lock = if !object.is_address_owned() {
2724 None
2726 } else {
2727 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2728 .await?
2729 .map(|s| s.into_inner())
2730 };
2731
2732 Ok(ObjectInfoResponse {
2733 object,
2734 layout,
2735 lock_for_debugging: lock,
2736 })
2737 }
2738
2739 #[instrument(level = "trace", skip_all)]
2740 pub fn handle_checkpoint_request(
2741 &self,
2742 request: &CheckpointRequest,
2743 ) -> IotaResult<CheckpointResponse> {
2744 let summary = if request.certified {
2745 let summary = match request.sequence_number {
2746 Some(seq) => self
2747 .checkpoint_store
2748 .get_checkpoint_by_sequence_number(seq)?,
2749 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2750 }
2751 .map(|v| v.into_inner());
2752 summary.map(CheckpointSummaryResponse::Certified)
2753 } else {
2754 let summary = match request.sequence_number {
2755 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2756 None => self
2757 .checkpoint_store
2758 .get_latest_locally_computed_checkpoint()?,
2759 };
2760 summary.map(CheckpointSummaryResponse::Pending)
2761 };
2762 let contents = match &summary {
2763 Some(s) => self
2764 .checkpoint_store
2765 .get_checkpoint_contents(&s.content_digest())?,
2766 None => None,
2767 };
2768 Ok(CheckpointResponse {
2769 checkpoint: summary,
2770 contents,
2771 })
2772 }
2773
2774 fn check_protocol_version(
2775 supported_protocol_versions: SupportedProtocolVersions,
2776 current_version: ProtocolVersion,
2777 ) {
2778 info!("current protocol version is now {:?}", current_version);
2779 info!("supported versions are: {:?}", supported_protocol_versions);
2780 if !supported_protocol_versions.is_version_supported(current_version) {
2781 let msg = format!(
2782 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2783 );
2784
2785 error!("{}", msg);
2786 eprintln!("{msg}");
2787
2788 #[cfg(not(msim))]
2789 std::process::exit(1);
2790
2791 #[cfg(msim)]
2792 iota_simulator::task::shutdown_current_node();
2793 }
2794 }
2795
2796 #[expect(clippy::disallowed_methods)] pub async fn new(
2798 name: AuthorityName,
2799 secret: StableSyncAuthoritySigner,
2800 supported_protocol_versions: SupportedProtocolVersions,
2801 store: Arc<AuthorityStore>,
2802 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2803 epoch_store: Arc<AuthorityPerEpochStore>,
2804 committee_store: Arc<CommitteeStore>,
2805 indexes: Option<Arc<IndexStore>>,
2806 rest_index: Option<Arc<RestIndexStore>>,
2807 checkpoint_store: Arc<CheckpointStore>,
2808 prometheus_registry: &Registry,
2809 genesis_objects: &[Object],
2810 db_checkpoint_config: &DBCheckpointConfig,
2811 config: NodeConfig,
2812 archive_readers: ArchiveReaderBalancer,
2813 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2814 chain_identifier: ChainIdentifier,
2815 pruner_db: Option<Arc<AuthorityPrunerTables>>,
2816 ) -> Arc<Self> {
2817 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2818
2819 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2820 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2821 let transaction_manager = Arc::new(TransactionManager::new(
2822 execution_cache_trait_pointers.object_cache_reader.clone(),
2823 execution_cache_trait_pointers
2824 .transaction_cache_reader
2825 .clone(),
2826 &epoch_store,
2827 tx_ready_certificates,
2828 metrics.clone(),
2829 ));
2830 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2831
2832 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2833 epoch_store.get_parent_path(),
2834 &config.authority_store_pruning_config,
2835 );
2836 let _pruner = AuthorityStorePruner::new(
2837 store.perpetual_tables.clone(),
2838 checkpoint_store.clone(),
2839 rest_index.clone(),
2840 config.authority_store_pruning_config.clone(),
2841 epoch_store.committee().authority_exists(&name),
2842 epoch_store.epoch_start_state().epoch_duration_ms(),
2843 prometheus_registry,
2844 archive_readers,
2845 pruner_db,
2846 );
2847 let input_loader =
2848 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2849 let epoch = epoch_store.epoch();
2850 let rgp = epoch_store.reference_gas_price();
2851 let state = Arc::new(AuthorityState {
2852 name,
2853 secret,
2854 execution_lock: RwLock::new(epoch),
2855 epoch_store: ArcSwap::new(epoch_store.clone()),
2856 input_loader,
2857 execution_cache_trait_pointers,
2858 indexes,
2859 rest_index,
2860 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2861 checkpoint_store,
2862 committee_store,
2863 transaction_manager,
2864 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2865 metrics,
2866 _pruner,
2867 _authority_per_epoch_pruner,
2868 db_checkpoint_config: db_checkpoint_config.clone(),
2869 config,
2870 overload_info: AuthorityOverloadInfo::default(),
2871 validator_tx_finalizer,
2872 chain_identifier,
2873 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
2874 });
2875
2876 let authority_state = Arc::downgrade(&state);
2878 spawn_monitored_task!(execution_process(
2879 authority_state,
2880 rx_ready_certificates,
2881 rx_execution_shutdown,
2882 ));
2883
2884 state
2886 .create_owner_index_if_empty(genesis_objects, &epoch_store)
2887 .expect("Error indexing genesis objects.");
2888
2889 state
2890 }
2891
2892 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2894 &self.execution_cache_trait_pointers.object_cache_reader
2895 }
2896
2897 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2898 &self.execution_cache_trait_pointers.transaction_cache_reader
2899 }
2900
2901 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2902 &self.execution_cache_trait_pointers.cache_writer
2903 }
2904
2905 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2906 &self.execution_cache_trait_pointers.backing_store
2907 }
2908
2909 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2910 &self.execution_cache_trait_pointers.backing_package_store
2911 }
2912
2913 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2914 &self.execution_cache_trait_pointers.object_store
2915 }
2916
2917 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2918 &self.execution_cache_trait_pointers.reconfig_api
2919 }
2920
2921 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2922 &self.execution_cache_trait_pointers.accumulator_store
2923 }
2924
2925 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2926 &self.execution_cache_trait_pointers.checkpoint_cache
2927 }
2928
2929 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2930 &self.execution_cache_trait_pointers.state_sync_store
2931 }
2932
2933 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2934 &self.execution_cache_trait_pointers.cache_commit
2935 }
2936
2937 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2938 self.execution_cache_trait_pointers
2939 .testing_api
2940 .database_for_testing()
2941 }
2942
2943 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2944 &self,
2945 config: NodeConfig,
2946 metrics: Arc<AuthorityStorePruningMetrics>,
2947 ) -> anyhow::Result<()> {
2948 let archive_readers =
2949 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
2950 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
2951 &self.database_for_testing().perpetual_tables,
2952 &self.checkpoint_store,
2953 self.rest_index.as_deref(),
2954 None,
2955 config.authority_store_pruning_config,
2956 metrics,
2957 archive_readers,
2958 EPOCH_DURATION_MS_FOR_TESTING,
2959 )
2960 .await
2961 }
2962
2963 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
2964 &self.transaction_manager
2965 }
2966
2967 pub fn enqueue_transactions_for_execution(
2970 &self,
2971 txns: Vec<VerifiedExecutableTransaction>,
2972 epoch_store: &Arc<AuthorityPerEpochStore>,
2973 ) {
2974 self.transaction_manager.enqueue(txns, epoch_store)
2975 }
2976
2977 pub fn enqueue_certificates_for_execution(
2979 &self,
2980 certs: Vec<VerifiedCertificate>,
2981 epoch_store: &Arc<AuthorityPerEpochStore>,
2982 ) {
2983 self.transaction_manager
2984 .enqueue_certificates(certs, epoch_store)
2985 }
2986
2987 pub fn enqueue_with_expected_effects_digest(
2988 &self,
2989 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
2990 epoch_store: &AuthorityPerEpochStore,
2991 ) {
2992 self.transaction_manager
2993 .enqueue_with_expected_effects_digest(certs, epoch_store)
2994 }
2995
2996 fn create_owner_index_if_empty(
2997 &self,
2998 genesis_objects: &[Object],
2999 epoch_store: &Arc<AuthorityPerEpochStore>,
3000 ) -> IotaResult {
3001 let Some(index_store) = &self.indexes else {
3002 return Ok(());
3003 };
3004 if !index_store.is_empty() {
3005 return Ok(());
3006 }
3007
3008 let mut new_owners = vec![];
3009 let mut new_dynamic_fields = vec![];
3010 let mut layout_resolver = epoch_store
3011 .executor()
3012 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3013 for o in genesis_objects.iter() {
3014 match o.owner {
3015 Owner::AddressOwner(addr) => new_owners.push((
3016 (addr, o.id()),
3017 ObjectInfo::new(&o.compute_object_reference(), o),
3018 )),
3019 Owner::ObjectOwner(object_id) => {
3020 let id = o.id();
3021 let Some(info) = self.try_create_dynamic_field_info(
3022 o,
3023 &BTreeMap::new(),
3024 layout_resolver.as_mut(),
3025 true,
3026 )?
3027 else {
3028 continue;
3029 };
3030 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3031 }
3032 _ => {}
3033 }
3034 }
3035
3036 index_store.insert_genesis_objects(ObjectIndexChanges {
3037 deleted_owners: vec![],
3038 deleted_dynamic_fields: vec![],
3039 new_owners,
3040 new_dynamic_fields,
3041 })
3042 }
3043
3044 pub fn execution_lock_for_executable_transaction(
3048 &self,
3049 transaction: &VerifiedExecutableTransaction,
3050 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3051 let lock = self
3052 .execution_lock
3053 .try_read()
3054 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3055 if *lock == transaction.auth_sig().epoch() {
3056 Ok(lock)
3057 } else {
3058 Err(IotaError::WrongEpoch {
3059 expected_epoch: *lock,
3060 actual_epoch: transaction.auth_sig().epoch(),
3061 })
3062 }
3063 }
3064
3065 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3071 self.execution_lock
3072 .try_read()
3073 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3074 }
3075
3076 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3077 self.execution_lock.write().await
3078 }
3079
3080 #[instrument(level = "error", skip_all)]
3081 pub async fn reconfigure(
3082 &self,
3083 cur_epoch_store: &AuthorityPerEpochStore,
3084 supported_protocol_versions: SupportedProtocolVersions,
3085 new_committee: Committee,
3086 epoch_start_configuration: EpochStartConfiguration,
3087 accumulator: Arc<StateAccumulator>,
3088 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3089 epoch_supply_change: i64,
3090 epoch_last_checkpoint: CheckpointSequenceNumber,
3091 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3092 Self::check_protocol_version(
3093 supported_protocol_versions,
3094 epoch_start_configuration
3095 .epoch_start_state()
3096 .protocol_version(),
3097 );
3098 self.metrics.reset_on_reconfigure();
3099 self.committee_store.insert_new_committee(&new_committee)?;
3100
3101 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3103
3104 cur_epoch_store.epoch_terminated().await;
3106
3107 let highest_locally_built_checkpoint_seq = self
3108 .checkpoint_store
3109 .get_latest_locally_computed_checkpoint()?
3110 .map(|c| *c.sequence_number())
3111 .unwrap_or(0);
3112
3113 assert!(
3114 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3115 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3116 );
3117 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3118 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3122 if num_shared_version_assignments > 1 {
3125 debug_fatal!(
3127 "all shared_version_assignments should have been removed \
3128 (num_shared_version_assignments: {num_shared_version_assignments})"
3129 );
3130 }
3131 }
3132
3133 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3139 .await?;
3140 self.get_reconfig_api()
3141 .clear_state_end_of_epoch(&execution_lock);
3142 self.check_system_consistency(
3143 cur_epoch_store,
3144 accumulator,
3145 expensive_safety_check_config,
3146 epoch_supply_change,
3147 )?;
3148 self.get_reconfig_api()
3149 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3150 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3151 if self
3152 .db_checkpoint_config
3153 .perform_db_checkpoints_at_epoch_end
3154 {
3155 let checkpoint_indexes = self
3156 .db_checkpoint_config
3157 .perform_index_db_checkpoints_at_epoch_end
3158 .unwrap_or(false);
3159 let current_epoch = cur_epoch_store.epoch();
3160 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3161 self.checkpoint_all_dbs(
3162 &epoch_checkpoint_path,
3163 cur_epoch_store,
3164 checkpoint_indexes,
3165 )?;
3166 }
3167 }
3168
3169 self.get_reconfig_api()
3170 .reconfigure_cache(&epoch_start_configuration)
3171 .await;
3172
3173 let new_epoch = new_committee.epoch;
3174 let new_epoch_store = self
3175 .reopen_epoch_db(
3176 cur_epoch_store,
3177 new_committee,
3178 epoch_start_configuration,
3179 expensive_safety_check_config,
3180 epoch_last_checkpoint,
3181 )
3182 .await?;
3183 assert_eq!(new_epoch_store.epoch(), new_epoch);
3184 self.transaction_manager.reconfigure(new_epoch);
3185 *execution_lock = new_epoch;
3186 Ok(new_epoch_store)
3190 }
3191
3192 pub async fn reconfigure_for_testing(&self) {
3197 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3198 let epoch_store = self.epoch_store_for_testing().clone();
3199 let protocol_config = epoch_store.protocol_config().clone();
3200 let _guard =
3208 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3209 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3210 self.get_backing_package_store().clone(),
3211 self.get_object_store().clone(),
3212 &self.config.expensive_safety_check_config,
3213 self.checkpoint_store
3214 .get_epoch_last_checkpoint(epoch_store.epoch())
3215 .unwrap()
3216 .map(|c| *c.sequence_number())
3217 .unwrap_or_default(),
3218 );
3219 let new_epoch = new_epoch_store.epoch();
3220 self.transaction_manager.reconfigure(new_epoch);
3221 self.epoch_store.store(new_epoch_store);
3222 epoch_store.epoch_terminated().await;
3223 *execution_lock = new_epoch;
3224 }
3225
3226 #[instrument(level = "error", skip_all)]
3227 fn check_system_consistency(
3228 &self,
3229 cur_epoch_store: &AuthorityPerEpochStore,
3230 accumulator: Arc<StateAccumulator>,
3231 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3232 epoch_supply_change: i64,
3233 ) -> IotaResult<()> {
3234 info!(
3235 "Performing iota conservation consistency check for epoch {}",
3236 cur_epoch_store.epoch()
3237 );
3238
3239 if cfg!(debug_assertions) {
3240 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3241 }
3242
3243 self.get_reconfig_api()
3244 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3245
3246 if expensive_safety_check_config.enable_state_consistency_check() {
3248 info!(
3249 "Performing state consistency check for epoch {}",
3250 cur_epoch_store.epoch()
3251 );
3252 self.expensive_check_is_consistent_state(
3253 accumulator,
3254 cur_epoch_store,
3255 cfg!(debug_assertions), );
3257 }
3258
3259 if expensive_safety_check_config.enable_secondary_index_checks() {
3260 if let Some(indexes) = self.indexes.clone() {
3261 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3262 .expect("secondary indexes are inconsistent");
3263 }
3264 }
3265
3266 Ok(())
3267 }
3268
3269 fn expensive_check_is_consistent_state(
3270 &self,
3271 accumulator: Arc<StateAccumulator>,
3272 cur_epoch_store: &AuthorityPerEpochStore,
3273 panic: bool,
3274 ) {
3275 let live_object_set_hash = accumulator.digest_live_object_set();
3276
3277 let root_state_hash: ECMHLiveObjectSetDigest = self
3278 .get_accumulator_store()
3279 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3280 .expect("Retrieving root state hash cannot fail")
3281 .expect("Root state hash for epoch must exist")
3282 .1
3283 .digest()
3284 .into();
3285
3286 let is_inconsistent = root_state_hash != live_object_set_hash;
3287 if is_inconsistent {
3288 if panic {
3289 panic!(
3290 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3291 );
3292 } else {
3293 error!(
3294 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3295 root_state_hash, live_object_set_hash
3296 );
3297 }
3298 } else {
3299 info!("State consistency check passed");
3300 }
3301
3302 if !panic {
3303 accumulator.set_inconsistent_state(is_inconsistent);
3304 }
3305 }
3306
3307 pub fn current_epoch_for_testing(&self) -> EpochId {
3308 self.epoch_store_for_testing().epoch()
3309 }
3310
3311 #[instrument(level = "error", skip_all)]
3312 pub fn checkpoint_all_dbs(
3313 &self,
3314 checkpoint_path: &Path,
3315 cur_epoch_store: &AuthorityPerEpochStore,
3316 checkpoint_indexes: bool,
3317 ) -> IotaResult {
3318 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3319 let current_epoch = cur_epoch_store.epoch();
3320
3321 if checkpoint_path.exists() {
3322 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3323 return Ok(());
3324 }
3325
3326 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3327 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3328
3329 if checkpoint_path_tmp.exists() {
3330 fs::remove_dir_all(&checkpoint_path_tmp)
3331 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3332 }
3333
3334 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3335 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3336
3337 self.checkpoint_store
3340 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3341
3342 self.get_reconfig_api()
3343 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3344
3345 self.committee_store
3346 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3347
3348 if checkpoint_indexes {
3349 if let Some(indexes) = self.indexes.as_ref() {
3350 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3351 }
3352 }
3353
3354 fs::rename(checkpoint_path_tmp, checkpoint_path)
3355 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3356 Ok(())
3357 }
3358
3359 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3366 self.epoch_store.load()
3367 }
3368
3369 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3371 self.load_epoch_store_one_call_per_task()
3372 }
3373
3374 pub fn clone_committee_for_testing(&self) -> Committee {
3375 Committee::clone(self.epoch_store_for_testing().committee())
3376 }
3377
3378 #[instrument(level = "trace", skip_all)]
3379 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3380 self.get_object_store()
3381 .try_get_object(object_id)
3382 .map_err(Into::into)
3383 }
3384
3385 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3387 self.try_get_object(object_id)
3388 .await
3389 .expect("storage access failed")
3390 }
3391
3392 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3393 Ok(self
3394 .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3395 .await?
3396 .expect("framework object should always exist")
3397 .compute_object_reference())
3398 }
3399
3400 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3402 self.get_object_cache_reader()
3403 .try_get_iota_system_state_object_unsafe()
3404 }
3405
3406 #[instrument(level = "trace", skip_all)]
3407 pub fn get_checkpoint_by_sequence_number(
3408 &self,
3409 sequence_number: CheckpointSequenceNumber,
3410 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3411 Ok(self
3412 .checkpoint_store
3413 .get_checkpoint_by_sequence_number(sequence_number)?)
3414 }
3415
3416 #[instrument(level = "trace", skip_all)]
3417 pub fn get_transaction_checkpoint_for_tests(
3418 &self,
3419 digest: &TransactionDigest,
3420 epoch_store: &AuthorityPerEpochStore,
3421 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3422 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3423 let Some(checkpoint) = checkpoint else {
3424 return Ok(None);
3425 };
3426 let checkpoint = self
3427 .checkpoint_store
3428 .get_checkpoint_by_sequence_number(checkpoint)?;
3429 Ok(checkpoint)
3430 }
3431
3432 #[instrument(level = "trace", skip_all)]
3433 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3434 Ok(
3435 match self
3436 .get_object_cache_reader()
3437 .try_get_latest_object_or_tombstone(*object_id)?
3438 {
3439 Some((_, ObjectOrTombstone::Object(object))) => {
3440 let layout = self.get_object_layout(&object)?;
3441 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3442 }
3443 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3444 None => ObjectRead::NotExists(*object_id),
3445 },
3446 )
3447 }
3448
3449 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3451 self.chain_identifier
3452 }
3453
3454 #[instrument(level = "trace", skip_all)]
3455 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3456 where
3457 T: DeserializeOwned,
3458 {
3459 let o = self.get_object_read(object_id)?.into_object()?;
3460 if let Some(move_object) = o.data.try_as_move() {
3461 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3462 IotaError::ObjectDeserialization {
3463 error: format!("{e}"),
3464 }
3465 })?)
3466 } else {
3467 Err(IotaError::ObjectDeserialization {
3468 error: format!("Provided object : [{object_id}] is not a Move object."),
3469 })
3470 }
3471 }
3472
3473 #[instrument(level = "trace", skip_all)]
3479 pub fn get_past_object_read(
3480 &self,
3481 object_id: &ObjectID,
3482 version: SequenceNumber,
3483 ) -> IotaResult<PastObjectRead> {
3484 let Some(obj_ref) = self
3486 .get_object_cache_reader()
3487 .try_get_latest_object_ref_or_tombstone(*object_id)?
3488 else {
3489 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3490 };
3491
3492 if version > obj_ref.1 {
3493 return Ok(PastObjectRead::VersionTooHigh {
3494 object_id: *object_id,
3495 asked_version: version,
3496 latest_version: obj_ref.1,
3497 });
3498 }
3499
3500 if version < obj_ref.1 {
3501 return Ok(match self.read_object_at_version(object_id, version)? {
3503 Some((object, layout)) => {
3504 let obj_ref = object.compute_object_reference();
3505 PastObjectRead::VersionFound(obj_ref, object, layout)
3506 }
3507
3508 None => PastObjectRead::VersionNotFound(*object_id, version),
3509 });
3510 }
3511
3512 if !obj_ref.2.is_alive() {
3513 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3514 }
3515
3516 match self.read_object_at_version(object_id, obj_ref.1)? {
3517 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3518 None => {
3519 error!(
3520 "Object with in parent_entry is missing from object store, datastore is \
3521 inconsistent",
3522 );
3523 Err(UserInputError::ObjectNotFound {
3524 object_id: *object_id,
3525 version: Some(obj_ref.1),
3526 }
3527 .into())
3528 }
3529 }
3530 }
3531
3532 #[instrument(level = "trace", skip_all)]
3533 fn read_object_at_version(
3534 &self,
3535 object_id: &ObjectID,
3536 version: SequenceNumber,
3537 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3538 let Some(object) = self
3539 .get_object_cache_reader()
3540 .try_get_object_by_key(object_id, version)?
3541 else {
3542 return Ok(None);
3543 };
3544
3545 let layout = self.get_object_layout(&object)?;
3546 Ok(Some((object, layout)))
3547 }
3548
3549 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3550 let layout = object
3551 .data
3552 .try_as_move()
3553 .map(|object| {
3554 into_struct_layout(
3555 self.load_epoch_store_one_call_per_task()
3556 .executor()
3557 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3559 .get_annotated_layout(&object.type_().clone().into())?,
3560 )
3561 })
3562 .transpose()?;
3563 Ok(layout)
3564 }
3565
3566 fn get_owner_at_version(
3567 &self,
3568 object_id: &ObjectID,
3569 version: SequenceNumber,
3570 ) -> IotaResult<Owner> {
3571 self.get_object_store()
3572 .try_get_object_by_key(object_id, version)?
3573 .ok_or_else(|| {
3574 IotaError::from(UserInputError::ObjectNotFound {
3575 object_id: *object_id,
3576 version: Some(version),
3577 })
3578 })
3579 .map(|o| o.owner)
3580 }
3581
3582 #[instrument(level = "trace", skip_all)]
3583 pub fn get_owner_objects(
3584 &self,
3585 owner: IotaAddress,
3586 cursor: Option<ObjectID>,
3588 limit: usize,
3589 filter: Option<IotaObjectDataFilter>,
3590 ) -> IotaResult<Vec<ObjectInfo>> {
3591 if let Some(indexes) = &self.indexes {
3592 indexes.get_owner_objects(owner, cursor, limit, filter)
3593 } else {
3594 Err(IotaError::IndexStoreNotAvailable)
3595 }
3596 }
3597
3598 #[instrument(level = "trace", skip_all)]
3599 pub fn get_owned_coins_iterator_with_cursor(
3600 &self,
3601 owner: IotaAddress,
3602 cursor: (String, ObjectID),
3604 limit: usize,
3605 one_coin_type_only: bool,
3606 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3607 if let Some(indexes) = &self.indexes {
3608 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3609 } else {
3610 Err(IotaError::IndexStoreNotAvailable)
3611 }
3612 }
3613
3614 #[instrument(level = "trace", skip_all)]
3615 pub fn get_owner_objects_iterator(
3616 &self,
3617 owner: IotaAddress,
3618 cursor: Option<ObjectID>,
3620 filter: Option<IotaObjectDataFilter>,
3621 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3622 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3623 if let Some(indexes) = &self.indexes {
3624 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3625 } else {
3626 Err(IotaError::IndexStoreNotAvailable)
3627 }
3628 }
3629
3630 #[instrument(level = "trace", skip_all)]
3631 pub async fn get_move_objects<T>(
3632 &self,
3633 owner: IotaAddress,
3634 type_: MoveObjectType,
3635 ) -> IotaResult<Vec<T>>
3636 where
3637 T: DeserializeOwned,
3638 {
3639 let object_ids = self
3640 .get_owner_objects_iterator(owner, None, None)?
3641 .filter(|o| match &o.type_ {
3642 ObjectType::Struct(s) => &type_ == s,
3643 ObjectType::Package => false,
3644 })
3645 .map(|info| ObjectKey(info.object_id, info.version))
3646 .collect::<Vec<_>>();
3647 let mut move_objects = vec![];
3648
3649 let objects = self
3650 .get_object_store()
3651 .try_multi_get_objects_by_key(&object_ids)?;
3652
3653 for (o, id) in objects.into_iter().zip(object_ids) {
3654 let object = o.ok_or_else(|| {
3655 IotaError::from(UserInputError::ObjectNotFound {
3656 object_id: id.0,
3657 version: Some(id.1),
3658 })
3659 })?;
3660 let move_object = object.data.try_as_move().ok_or_else(|| {
3661 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3662 })?;
3663 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3664 IotaError::ObjectDeserialization {
3665 error: format!("{e}"),
3666 }
3667 })?);
3668 }
3669 Ok(move_objects)
3670 }
3671
3672 #[instrument(level = "trace", skip_all)]
3673 pub fn get_dynamic_fields(
3674 &self,
3675 owner: ObjectID,
3676 cursor: Option<ObjectID>,
3678 limit: usize,
3679 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3680 Ok(self
3681 .get_dynamic_fields_iterator(owner, cursor)?
3682 .take(limit)
3683 .collect::<Result<Vec<_>, _>>()?)
3684 }
3685
3686 fn get_dynamic_fields_iterator(
3687 &self,
3688 owner: ObjectID,
3689 cursor: Option<ObjectID>,
3691 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3692 {
3693 if let Some(indexes) = &self.indexes {
3694 indexes.get_dynamic_fields_iterator(owner, cursor)
3695 } else {
3696 Err(IotaError::IndexStoreNotAvailable)
3697 }
3698 }
3699
3700 #[instrument(level = "trace", skip_all)]
3701 pub fn get_dynamic_field_object_id(
3702 &self,
3703 owner: ObjectID,
3704 name_type: TypeTag,
3705 name_bcs_bytes: &[u8],
3706 ) -> IotaResult<Option<ObjectID>> {
3707 if let Some(indexes) = &self.indexes {
3708 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3709 } else {
3710 Err(IotaError::IndexStoreNotAvailable)
3711 }
3712 }
3713
3714 #[instrument(level = "trace", skip_all)]
3715 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3716 Ok(self.get_indexes()?.next_sequence_number())
3717 }
3718
3719 #[instrument(level = "trace", skip_all)]
3720 pub async fn get_executed_transaction_and_effects(
3721 &self,
3722 digest: TransactionDigest,
3723 kv_store: Arc<TransactionKeyValueStore>,
3724 ) -> IotaResult<(Transaction, TransactionEffects)> {
3725 let transaction = kv_store.get_tx(digest).await?;
3726 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3727 Ok((transaction, effects))
3728 }
3729
3730 #[instrument(level = "trace", skip_all)]
3731 pub fn multi_get_checkpoint_by_sequence_number(
3732 &self,
3733 sequence_numbers: &[CheckpointSequenceNumber],
3734 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3735 Ok(self
3736 .checkpoint_store
3737 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3738 }
3739
3740 #[instrument(level = "trace", skip_all)]
3741 pub fn get_transaction_events(
3742 &self,
3743 digest: &TransactionEventsDigest,
3744 ) -> IotaResult<TransactionEvents> {
3745 self.get_transaction_cache_reader()
3746 .try_get_events(digest)?
3747 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3748 }
3749
3750 pub fn get_transaction_input_objects(
3751 &self,
3752 effects: &TransactionEffects,
3753 ) -> anyhow::Result<Vec<Object>> {
3754 let input_object_keys = effects
3755 .modified_at_versions()
3756 .into_iter()
3757 .map(|(object_id, version)| ObjectKey(object_id, version))
3758 .collect::<Vec<_>>();
3759
3760 let input_objects = self
3761 .get_object_store()
3762 .try_multi_get_objects_by_key(&input_object_keys)?
3763 .into_iter()
3764 .enumerate()
3765 .map(|(idx, maybe_object)| {
3766 maybe_object.ok_or_else(|| {
3767 anyhow::anyhow!(
3768 "missing input object key {:?} from tx {}",
3769 input_object_keys[idx],
3770 effects.transaction_digest()
3771 )
3772 })
3773 })
3774 .collect::<anyhow::Result<Vec<_>>>()?;
3775 Ok(input_objects)
3776 }
3777
3778 pub fn get_transaction_output_objects(
3779 &self,
3780 effects: &TransactionEffects,
3781 ) -> anyhow::Result<Vec<Object>> {
3782 let output_object_keys = effects
3783 .all_changed_objects()
3784 .into_iter()
3785 .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3786 .collect::<Vec<_>>();
3787
3788 let output_objects = self
3789 .get_object_store()
3790 .try_multi_get_objects_by_key(&output_object_keys)?
3791 .into_iter()
3792 .enumerate()
3793 .map(|(idx, maybe_object)| {
3794 maybe_object.ok_or_else(|| {
3795 anyhow::anyhow!(
3796 "missing output object key {:?} from tx {}",
3797 output_object_keys[idx],
3798 effects.transaction_digest()
3799 )
3800 })
3801 })
3802 .collect::<anyhow::Result<Vec<_>>>()?;
3803 Ok(output_objects)
3804 }
3805
3806 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3807 match &self.indexes {
3808 Some(i) => Ok(i.clone()),
3809 None => Err(IotaError::UnsupportedFeature {
3810 error: "extended object indexing is not enabled on this server".into(),
3811 }),
3812 }
3813 }
3814
3815 pub async fn get_transactions_for_tests(
3816 self: &Arc<Self>,
3817 filter: Option<TransactionFilter>,
3818 cursor: Option<TransactionDigest>,
3819 limit: Option<usize>,
3820 reverse: bool,
3821 ) -> IotaResult<Vec<TransactionDigest>> {
3822 let metrics = KeyValueStoreMetrics::new_for_tests();
3823 let kv_store = Arc::new(TransactionKeyValueStore::new(
3824 "rocksdb",
3825 metrics,
3826 self.clone(),
3827 ));
3828 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3829 .await
3830 }
3831
3832 #[instrument(level = "trace", skip_all)]
3833 pub async fn get_transactions(
3834 &self,
3835 kv_store: &Arc<TransactionKeyValueStore>,
3836 filter: Option<TransactionFilter>,
3837 cursor: Option<TransactionDigest>,
3839 limit: Option<usize>,
3840 reverse: bool,
3841 ) -> IotaResult<Vec<TransactionDigest>> {
3842 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3843 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3844 let iter = checkpoint_contents.iter().map(|c| c.transaction);
3845 if reverse {
3846 let iter = iter
3847 .rev()
3848 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3849 .skip(usize::from(cursor.is_some()));
3850 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3851 } else {
3852 let iter = iter
3853 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3854 .skip(usize::from(cursor.is_some()));
3855 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3856 }
3857 }
3858 self.get_indexes()?
3859 .get_transactions(filter, cursor, limit, reverse)
3860 }
3861
3862 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3863 &self.checkpoint_store
3864 }
3865
3866 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3867 self.get_checkpoint_store()
3868 .get_highest_executed_checkpoint_seq_number()?
3869 .ok_or(IotaError::UserInput {
3870 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3871 })
3872 }
3873
3874 #[cfg(msim)]
3875 pub fn get_highest_pruned_checkpoint_for_testing(
3876 &self,
3877 ) -> IotaResult<CheckpointSequenceNumber> {
3878 self.database_for_testing()
3879 .perpetual_tables
3880 .get_highest_pruned_checkpoint()
3881 }
3882
3883 #[instrument(level = "trace", skip_all)]
3884 pub fn get_checkpoint_summary_by_sequence_number(
3885 &self,
3886 sequence_number: CheckpointSequenceNumber,
3887 ) -> IotaResult<CheckpointSummary> {
3888 let verified_checkpoint = self
3889 .get_checkpoint_store()
3890 .get_checkpoint_by_sequence_number(sequence_number)?;
3891 match verified_checkpoint {
3892 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3893 None => Err(IotaError::UserInput {
3894 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3895 }),
3896 }
3897 }
3898
3899 #[instrument(level = "trace", skip_all)]
3900 pub fn get_checkpoint_summary_by_digest(
3901 &self,
3902 digest: CheckpointDigest,
3903 ) -> IotaResult<CheckpointSummary> {
3904 let verified_checkpoint = self
3905 .get_checkpoint_store()
3906 .get_checkpoint_by_digest(&digest)?;
3907 match verified_checkpoint {
3908 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3909 None => Err(IotaError::UserInput {
3910 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3911 }),
3912 }
3913 }
3914
3915 #[instrument(level = "trace", skip_all)]
3916 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3917 if is_system_package(package_id) {
3918 return self.find_genesis_txn_digest();
3919 }
3920 Ok(self
3921 .get_object_read(&package_id)?
3922 .into_object()?
3923 .previous_transaction)
3924 }
3925
3926 #[instrument(level = "trace", skip_all)]
3927 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3928 let summary = self
3929 .get_verified_checkpoint_by_sequence_number(0)?
3930 .into_message();
3931 let content = self.get_checkpoint_contents(summary.content_digest)?;
3932 let genesis_transaction = content.enumerate_transactions(&summary).next();
3933 Ok(genesis_transaction
3934 .ok_or(IotaError::UserInput {
3935 error: UserInputError::GenesisTransactionNotFound,
3936 })?
3937 .1
3938 .transaction)
3939 }
3940
3941 #[instrument(level = "trace", skip_all)]
3942 pub fn get_verified_checkpoint_by_sequence_number(
3943 &self,
3944 sequence_number: CheckpointSequenceNumber,
3945 ) -> IotaResult<VerifiedCheckpoint> {
3946 let verified_checkpoint = self
3947 .get_checkpoint_store()
3948 .get_checkpoint_by_sequence_number(sequence_number)?;
3949 match verified_checkpoint {
3950 Some(verified_checkpoint) => Ok(verified_checkpoint),
3951 None => Err(IotaError::UserInput {
3952 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3953 }),
3954 }
3955 }
3956
3957 #[instrument(level = "trace", skip_all)]
3958 pub fn get_verified_checkpoint_summary_by_digest(
3959 &self,
3960 digest: CheckpointDigest,
3961 ) -> IotaResult<VerifiedCheckpoint> {
3962 let verified_checkpoint = self
3963 .get_checkpoint_store()
3964 .get_checkpoint_by_digest(&digest)?;
3965 match verified_checkpoint {
3966 Some(verified_checkpoint) => Ok(verified_checkpoint),
3967 None => Err(IotaError::UserInput {
3968 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3969 }),
3970 }
3971 }
3972
3973 #[instrument(level = "trace", skip_all)]
3974 pub fn get_checkpoint_contents(
3975 &self,
3976 digest: CheckpointContentsDigest,
3977 ) -> IotaResult<CheckpointContents> {
3978 self.get_checkpoint_store()
3979 .get_checkpoint_contents(&digest)?
3980 .ok_or(IotaError::UserInput {
3981 error: UserInputError::CheckpointContentsNotFound(digest),
3982 })
3983 }
3984
3985 #[instrument(level = "trace", skip_all)]
3986 pub fn get_checkpoint_contents_by_sequence_number(
3987 &self,
3988 sequence_number: CheckpointSequenceNumber,
3989 ) -> IotaResult<CheckpointContents> {
3990 let verified_checkpoint = self
3991 .get_checkpoint_store()
3992 .get_checkpoint_by_sequence_number(sequence_number)?;
3993 match verified_checkpoint {
3994 Some(verified_checkpoint) => {
3995 let content_digest = verified_checkpoint.into_inner().content_digest;
3996 self.get_checkpoint_contents(content_digest)
3997 }
3998 None => Err(IotaError::UserInput {
3999 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4000 }),
4001 }
4002 }
4003
4004 #[instrument(level = "trace", skip_all)]
4005 pub async fn query_events(
4006 &self,
4007 kv_store: &Arc<TransactionKeyValueStore>,
4008 query: EventFilter,
4009 cursor: Option<EventID>,
4011 limit: usize,
4012 descending: bool,
4013 ) -> IotaResult<Vec<IotaEvent>> {
4014 let index_store = self.get_indexes()?;
4015
4016 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4018 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4019 IotaError::TransactionNotFound {
4020 digest: cursor.tx_digest,
4021 },
4022 )?;
4023 (tx_seq, cursor.event_seq as usize)
4024 } else if descending {
4025 (u64::MAX, usize::MAX)
4026 } else {
4027 (0, 0)
4028 };
4029
4030 let limit = limit + 1;
4031 let mut event_keys = match query {
4032 EventFilter::All(filters) => {
4033 if filters.is_empty() {
4034 index_store.all_events(tx_num, event_num, limit, descending)?
4035 } else {
4036 return Err(IotaError::UserInput {
4037 error: UserInputError::Unsupported(
4038 "This query type does not currently support filter combinations"
4039 .to_string(),
4040 ),
4041 });
4042 }
4043 }
4044 EventFilter::Transaction(digest) => {
4045 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4046 }
4047 EventFilter::MoveModule { package, module } => {
4048 let module_id = ModuleId::new(package.into(), module);
4049 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4050 }
4051 EventFilter::MoveEventType(struct_name) => index_store
4052 .events_by_move_event_struct_name(
4053 &struct_name,
4054 tx_num,
4055 event_num,
4056 limit,
4057 descending,
4058 )?,
4059 EventFilter::Sender(sender) => {
4060 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4061 }
4062 EventFilter::TimeRange {
4063 start_time,
4064 end_time,
4065 } => index_store
4066 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4067 EventFilter::MoveEventModule { package, module } => index_store
4068 .events_by_move_event_module(
4069 &ModuleId::new(package.into(), module),
4070 tx_num,
4071 event_num,
4072 limit,
4073 descending,
4074 )?,
4075 EventFilter::Package(_)
4077 | EventFilter::MoveEventField { .. }
4078 | EventFilter::Any(_)
4079 | EventFilter::And(_, _)
4080 | EventFilter::Or(_, _) => {
4081 return Err(IotaError::UserInput {
4082 error: UserInputError::Unsupported(
4083 "This query type is not supported by the full node.".to_string(),
4084 ),
4085 });
4086 }
4087 };
4088
4089 if cursor.is_some() {
4092 if !event_keys.is_empty() {
4093 event_keys.remove(0);
4094 }
4095 } else {
4096 event_keys.truncate(limit - 1);
4097 }
4098
4099 let transaction_digests = event_keys
4101 .iter()
4102 .map(|(_, digest, _, _)| *digest)
4103 .collect::<HashSet<_>>()
4104 .into_iter()
4105 .collect::<Vec<_>>();
4106
4107 let events = kv_store
4108 .multi_get_events_by_tx_digests(&transaction_digests)
4109 .await?;
4110
4111 let events_map: HashMap<_, _> =
4112 transaction_digests.iter().zip(events.into_iter()).collect();
4113
4114 let stored_events = event_keys
4115 .into_iter()
4116 .map(|k| {
4117 (
4118 k,
4119 events_map
4120 .get(&k.1)
4121 .expect("fetched digest is missing")
4122 .clone()
4123 .and_then(|e| e.data.get(k.2).cloned()),
4124 )
4125 })
4126 .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4127 event
4128 .map(|e| (e, tx_digest, event_seq, timestamp))
4129 .ok_or(IotaError::TransactionEventsNotFound { digest })
4130 })
4131 .collect::<Result<Vec<_>, _>>()?;
4132
4133 let epoch_store = self.load_epoch_store_one_call_per_task();
4134 let backing_store = self.get_backing_package_store().as_ref();
4135 let mut layout_resolver = epoch_store
4136 .executor()
4137 .type_layout_resolver(Box::new(backing_store));
4138 let mut events = vec![];
4139 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4140 events.push(IotaEvent::try_from(
4141 e.clone(),
4142 tx_digest,
4143 event_seq as u64,
4144 Some(timestamp),
4145 layout_resolver.get_annotated_layout(&e.type_)?,
4146 )?)
4147 }
4148 Ok(events)
4149 }
4150
4151 pub async fn insert_genesis_object(&self, object: Object) {
4152 self.get_reconfig_api()
4153 .try_insert_genesis_object(object)
4154 .expect("Cannot insert genesis object")
4155 }
4156
4157 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4158 futures::future::join_all(
4159 objects
4160 .iter()
4161 .map(|o| self.insert_genesis_object(o.clone())),
4162 )
4163 .await;
4164 }
4165
4166 #[instrument(level = "trace", skip_all)]
4168 pub fn get_transaction_status(
4169 &self,
4170 transaction_digest: &TransactionDigest,
4171 epoch_store: &Arc<AuthorityPerEpochStore>,
4172 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4173 if let Some(effects) =
4175 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4176 {
4177 if let Some(transaction) = self
4178 .get_transaction_cache_reader()
4179 .try_get_transaction_block(transaction_digest)?
4180 {
4181 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4182 let events = if let Some(digest) = effects.events_digest() {
4183 self.get_transaction_events(digest)?
4184 } else {
4185 TransactionEvents::default()
4186 };
4187 return Ok(Some((
4188 (*transaction).clone().into_message(),
4189 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4190 )));
4191 } else {
4192 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4197 }
4198 }
4199 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4200 self.metrics.tx_already_processed.inc();
4201 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4202 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4203 } else {
4204 Ok(None)
4205 }
4206 }
4207
4208 #[instrument(level = "trace", skip_all)]
4212 pub fn get_signed_effects_and_maybe_resign(
4213 &self,
4214 transaction_digest: &TransactionDigest,
4215 epoch_store: &Arc<AuthorityPerEpochStore>,
4216 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4217 let effects = self
4218 .get_transaction_cache_reader()
4219 .try_get_executed_effects(transaction_digest)?;
4220 match effects {
4221 Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4222 None => Ok(None),
4223 }
4224 }
4225
4226 #[instrument(level = "trace", skip_all)]
4227 pub(crate) fn sign_effects(
4228 &self,
4229 effects: TransactionEffects,
4230 epoch_store: &Arc<AuthorityPerEpochStore>,
4231 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4232 let tx_digest = *effects.transaction_digest();
4233 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4234 Some(sig) if sig.epoch == epoch_store.epoch() => {
4235 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4236 }
4237 _ => {
4238 debug!(
4261 ?tx_digest,
4262 epoch=?epoch_store.epoch(),
4263 "Re-signing the effects with the current epoch"
4264 );
4265
4266 let sig = AuthoritySignInfo::new(
4267 epoch_store.epoch(),
4268 &effects,
4269 Intent::iota_app(IntentScope::TransactionEffects),
4270 self.name,
4271 &*self.secret,
4272 );
4273
4274 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4275
4276 epoch_store.insert_effects_digest_and_signature(
4277 &tx_digest,
4278 effects.digest(),
4279 &sig,
4280 )?;
4281
4282 effects
4283 }
4284 };
4285
4286 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4287 signed_effects,
4288 ))
4289 }
4290
4291 #[instrument(level = "trace", skip_all)]
4293 fn fullnode_only_get_tx_coins_for_indexing(
4294 &self,
4295 inner_temporary_store: &InnerTemporaryStore,
4296 epoch_store: &Arc<AuthorityPerEpochStore>,
4297 ) -> Option<TxCoins> {
4298 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4299 return None;
4300 }
4301 let written_coin_objects = inner_temporary_store
4302 .written
4303 .iter()
4304 .filter_map(|(k, v)| {
4305 if v.is_coin() {
4306 Some((*k, v.clone()))
4307 } else {
4308 None
4309 }
4310 })
4311 .collect();
4312 let input_coin_objects = inner_temporary_store
4313 .input_objects
4314 .iter()
4315 .filter_map(|(k, v)| {
4316 if v.is_coin() {
4317 Some((*k, v.clone()))
4318 } else {
4319 None
4320 }
4321 })
4322 .collect::<ObjectMap>();
4323 Some((input_coin_objects, written_coin_objects))
4324 }
4325
4326 #[instrument(level = "trace", skip_all)]
4338 pub async fn get_transaction_lock(
4339 &self,
4340 object_ref: &ObjectRef,
4341 epoch_store: &AuthorityPerEpochStore,
4342 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4343 let lock_info = self
4344 .get_object_cache_reader()
4345 .try_get_lock(*object_ref, epoch_store)?;
4346 let lock_info = match lock_info {
4347 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4348 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4349 provided_obj_ref: *object_ref,
4350 current_version: locked_ref.1,
4351 }
4352 .into());
4353 }
4354 ObjectLockStatus::Initialized => {
4355 return Ok(None);
4356 }
4357 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4358 };
4359
4360 epoch_store.get_signed_transaction(&lock_info)
4361 }
4362
4363 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4364 self.get_object_cache_reader().try_get_objects(objects)
4365 }
4366
4367 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4369 self.try_get_objects(objects)
4370 .await
4371 .expect("storage access failed")
4372 }
4373
4374 pub async fn try_get_object_or_tombstone(
4375 &self,
4376 object_id: ObjectID,
4377 ) -> IotaResult<Option<ObjectRef>> {
4378 self.get_object_cache_reader()
4379 .try_get_latest_object_ref_or_tombstone(object_id)
4380 }
4381
4382 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4384 self.try_get_object_or_tombstone(object_id)
4385 .await
4386 .expect("storage access failed")
4387 }
4388
4389 pub fn set_override_protocol_upgrade_buffer_stake(
4399 &self,
4400 expected_epoch: EpochId,
4401 buffer_stake_bps: u64,
4402 ) -> IotaResult {
4403 let epoch_store = self.load_epoch_store_one_call_per_task();
4404 let actual_epoch = epoch_store.epoch();
4405 if actual_epoch != expected_epoch {
4406 return Err(IotaError::WrongEpoch {
4407 expected_epoch,
4408 actual_epoch,
4409 });
4410 }
4411
4412 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4413 }
4414
4415 pub fn clear_override_protocol_upgrade_buffer_stake(
4416 &self,
4417 expected_epoch: EpochId,
4418 ) -> IotaResult {
4419 let epoch_store = self.load_epoch_store_one_call_per_task();
4420 let actual_epoch = epoch_store.epoch();
4421 if actual_epoch != expected_epoch {
4422 return Err(IotaError::WrongEpoch {
4423 expected_epoch,
4424 actual_epoch,
4425 });
4426 }
4427
4428 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4429 }
4430
4431 pub async fn get_available_system_packages(
4435 &self,
4436 binary_config: &BinaryConfig,
4437 ) -> Vec<ObjectRef> {
4438 let mut results = vec![];
4439
4440 let system_packages = BuiltInFramework::iter_system_packages();
4441
4442 #[cfg(msim)]
4444 let extra_packages = framework_injection::get_extra_packages(self.name);
4445 #[cfg(msim)]
4446 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4447
4448 for system_package in system_packages {
4449 let modules = system_package.modules().to_vec();
4450 #[cfg(msim)]
4452 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4453 .unwrap_or(modules);
4454
4455 let Some(obj_ref) = iota_framework::compare_system_package(
4456 &self.get_object_store(),
4457 &system_package.id,
4458 &modules,
4459 system_package.dependencies.to_vec(),
4460 binary_config,
4461 )
4462 .await
4463 else {
4464 return vec![];
4465 };
4466 results.push(obj_ref);
4467 }
4468
4469 results
4470 }
4471
4472 async fn get_system_package_bytes(
4489 &self,
4490 system_packages: Vec<ObjectRef>,
4491 binary_config: &BinaryConfig,
4492 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4493 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4494 let objects = self.get_objects(&ids).await;
4495
4496 let mut res = Vec::with_capacity(system_packages.len());
4497 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4498 let prev_transaction = match object {
4499 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4500 info!("Framework {} does not need updating", system_package_ref.0);
4502 continue;
4503 }
4504
4505 Some(cur_object) => cur_object.previous_transaction,
4506 None => TransactionDigest::genesis_marker(),
4507 };
4508
4509 #[cfg(msim)]
4510 let SystemPackage {
4511 id: _,
4512 bytes,
4513 dependencies,
4514 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4515 .unwrap_or_else(|| {
4516 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4517 });
4518
4519 #[cfg(not(msim))]
4520 let SystemPackage {
4521 id: _,
4522 bytes,
4523 dependencies,
4524 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4525
4526 let modules: Vec<_> = bytes
4527 .iter()
4528 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4529 .collect();
4530
4531 let new_object = Object::new_system_package(
4532 &modules,
4533 system_package_ref.1,
4534 dependencies.clone(),
4535 prev_transaction,
4536 );
4537
4538 let new_ref = new_object.compute_object_reference();
4539 if new_ref != system_package_ref {
4540 error!(
4541 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4542 );
4543 return None;
4544 }
4545
4546 res.push((system_package_ref.1, bytes, dependencies));
4547 }
4548
4549 Some(res)
4550 }
4551
4552 fn is_protocol_version_supported_v1(
4556 proposed_protocol_version: ProtocolVersion,
4557 committee: &Committee,
4558 capabilities: Vec<AuthorityCapabilitiesV1>,
4559 mut buffer_stake_bps: u64,
4560 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4561 if buffer_stake_bps > 10000 {
4562 warn!("clamping buffer_stake_bps to 10000");
4563 buffer_stake_bps = 10000;
4564 }
4565
4566 let mut desired_upgrades: Vec<_> = capabilities
4569 .into_iter()
4570 .filter_map(|mut cap| {
4571 if cap.available_system_packages.is_empty() {
4573 return None;
4574 }
4575
4576 cap.available_system_packages.sort();
4577
4578 info!(
4579 "validator {:?} supports {:?} with system packages: {:?}",
4580 cap.authority.concise(),
4581 cap.supported_protocol_versions,
4582 cap.available_system_packages,
4583 );
4584
4585 cap.supported_protocol_versions
4589 .get_version_digest(proposed_protocol_version)
4590 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4591 })
4592 .collect();
4593
4594 desired_upgrades.sort();
4597 desired_upgrades
4598 .into_iter()
4599 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4600 .into_iter()
4601 .find_map(|((digest, packages), group)| {
4602 assert!(!packages.is_empty());
4604
4605 let mut stake_aggregator: StakeAggregator<(), true> =
4606 StakeAggregator::new(Arc::new(committee.clone()));
4607
4608 for (_, _, authority) in group {
4609 stake_aggregator.insert_generic(authority, ());
4610 }
4611
4612 let total_votes = stake_aggregator.total_votes();
4613 let quorum_threshold = committee.quorum_threshold();
4614 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4615
4616 info!(
4617 protocol_config_digest = ?digest,
4618 ?total_votes,
4619 ?quorum_threshold,
4620 ?buffer_stake_bps,
4621 ?effective_threshold,
4622 ?proposed_protocol_version,
4623 ?packages,
4624 "support for upgrade"
4625 );
4626
4627 let has_support = total_votes >= effective_threshold;
4628 has_support.then_some((proposed_protocol_version, digest, packages))
4629 })
4630 }
4631
4632 fn choose_protocol_version_and_system_packages_v1(
4636 current_protocol_version: ProtocolVersion,
4637 current_protocol_digest: Digest,
4638 committee: &Committee,
4639 capabilities: Vec<AuthorityCapabilitiesV1>,
4640 buffer_stake_bps: u64,
4641 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4642 let mut next_protocol_version = current_protocol_version;
4643 let mut system_packages = vec![];
4644 let mut protocol_version_digest = current_protocol_digest;
4645
4646 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4650 next_protocol_version + 1,
4651 committee,
4652 capabilities.clone(),
4653 buffer_stake_bps,
4654 ) {
4655 next_protocol_version = version;
4656 protocol_version_digest = digest;
4657 system_packages = packages;
4658 }
4659
4660 (
4661 next_protocol_version,
4662 protocol_version_digest,
4663 system_packages,
4664 )
4665 }
4666
4667 fn get_validators_supporting_protocol_version(
4672 target_protocol_version: ProtocolVersion,
4673 target_digest: Digest,
4674 active_validators: &[AuthorityPublicKey],
4675 capabilities: &[AuthorityCapabilitiesV1],
4676 ) -> Vec<u64> {
4677 let mut eligible_validators = Vec::new();
4678
4679 for capability in capabilities {
4680 if let Some(digest) = capability
4682 .supported_protocol_versions
4683 .get_version_digest(target_protocol_version)
4684 {
4685 if digest == target_digest {
4686 if let Some(index) = active_validators
4688 .iter()
4689 .position(|name| AuthorityName::from(name) == capability.authority)
4690 {
4691 eligible_validators.push(index as u64);
4692 }
4693 }
4694 }
4695 }
4696
4697 eligible_validators.sort();
4699 eligible_validators
4700 }
4701
4702 fn calculate_eligible_validators_weight(
4707 eligible_validator_indices: &[u64],
4708 active_validators: &[AuthorityPublicKey],
4709 committee: &Committee,
4710 ) -> u64 {
4711 let mut total_weight = 0u64;
4712
4713 for &index in eligible_validator_indices {
4714 let authority_pubkey = &active_validators[index as usize];
4715 if let Some((_, weight)) = committee
4717 .members()
4718 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4719 {
4720 total_weight += weight;
4721 }
4722 }
4723
4724 total_weight
4725 }
4726
4727 #[instrument(level = "debug", skip_all)]
4728 fn create_authenticator_state_tx(
4729 &self,
4730 epoch_store: &Arc<AuthorityPerEpochStore>,
4731 ) -> Option<EndOfEpochTransactionKind> {
4732 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4733 info!("authenticator state transactions not enabled");
4734 return None;
4735 }
4736
4737 let authenticator_state_exists = epoch_store.authenticator_state_exists();
4738 let tx = if authenticator_state_exists {
4739 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4740 let min_epoch =
4741 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4742 let authenticator_obj_initial_shared_version = epoch_store
4743 .epoch_start_config()
4744 .authenticator_obj_initial_shared_version()
4745 .expect("initial version must exist");
4746
4747 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4748 min_epoch,
4749 authenticator_obj_initial_shared_version,
4750 );
4751
4752 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4753
4754 tx
4755 } else {
4756 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4757 info!("Creating AuthenticatorStateCreate tx");
4758 tx
4759 };
4760 Some(tx)
4761 }
4762
4763 #[instrument(level = "error", skip_all)]
4776 pub async fn create_and_execute_advance_epoch_tx(
4777 &self,
4778 epoch_store: &Arc<AuthorityPerEpochStore>,
4779 gas_cost_summary: &GasCostSummary,
4780 checkpoint: CheckpointSequenceNumber,
4781 epoch_start_timestamp_ms: CheckpointTimestamp,
4782 ) -> anyhow::Result<(
4783 IotaSystemState,
4784 Option<SystemEpochInfoEvent>,
4785 TransactionEffects,
4786 )> {
4787 let mut txns = Vec::new();
4788
4789 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4790 txns.push(tx);
4791 }
4792
4793 let next_epoch = epoch_store.epoch() + 1;
4794
4795 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4796 let authority_capabilities = epoch_store
4797 .get_capabilities_v1()
4798 .expect("read capabilities from db cannot fail");
4799 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4800 Self::choose_protocol_version_and_system_packages_v1(
4801 epoch_store.protocol_version(),
4802 SupportedProtocolVersionsWithHashes::protocol_config_digest(
4803 epoch_store.protocol_config(),
4804 ),
4805 epoch_store.committee(),
4806 authority_capabilities.clone(),
4807 buffer_stake_bps,
4808 );
4809
4810 let config = epoch_store.protocol_config();
4814 let binary_config = to_binary_config(config);
4815 let Some(next_epoch_system_package_bytes) = self
4816 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4817 .await
4818 else {
4819 error!(
4820 "upgraded system packages {:?} are not locally available, cannot create \
4821 ChangeEpochTx. validator binary must be upgraded to the correct version!",
4822 next_epoch_system_packages
4823 );
4824 bail!("missing system packages: cannot form ChangeEpochTx");
4834 };
4835
4836 if config.select_committee_from_eligible_validators() {
4840 let active_validators = epoch_store.epoch_start_state().get_active_validators();
4842
4843 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
4844
4845 if config.select_committee_supporting_next_epoch_version() {
4849 eligible_active_validators = Self::get_validators_supporting_protocol_version(
4850 next_epoch_protocol_version,
4851 next_epoch_protocol_digest,
4852 &active_validators,
4853 &authority_capabilities,
4854 );
4855
4856 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
4858 &eligible_active_validators,
4859 &active_validators,
4860 epoch_store.committee(),
4861 );
4862
4863 let committee = epoch_store.committee();
4867 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4868
4869 if eligible_validators_weight < effective_threshold {
4870 error!(
4871 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
4872 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
4873 );
4874 eligible_active_validators = (0..active_validators.len() as u64).collect();
4877 }
4878 }
4879
4880 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
4881 next_epoch,
4882 next_epoch_protocol_version,
4883 gas_cost_summary.storage_cost,
4884 gas_cost_summary.computation_cost,
4885 gas_cost_summary.computation_cost_burned,
4886 gas_cost_summary.storage_rebate,
4887 gas_cost_summary.non_refundable_storage_fee,
4888 epoch_start_timestamp_ms,
4889 next_epoch_system_package_bytes,
4890 eligible_active_validators,
4891 ));
4892 } else if config.protocol_defined_base_fee()
4893 && config.max_committee_members_count_as_option().is_some()
4894 {
4895 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4896 next_epoch,
4897 next_epoch_protocol_version,
4898 gas_cost_summary.storage_cost,
4899 gas_cost_summary.computation_cost,
4900 gas_cost_summary.computation_cost_burned,
4901 gas_cost_summary.storage_rebate,
4902 gas_cost_summary.non_refundable_storage_fee,
4903 epoch_start_timestamp_ms,
4904 next_epoch_system_package_bytes,
4905 ));
4906 } else {
4907 txns.push(EndOfEpochTransactionKind::new_change_epoch(
4908 next_epoch,
4909 next_epoch_protocol_version,
4910 gas_cost_summary.storage_cost,
4911 gas_cost_summary.computation_cost,
4912 gas_cost_summary.storage_rebate,
4913 gas_cost_summary.non_refundable_storage_fee,
4914 epoch_start_timestamp_ms,
4915 next_epoch_system_package_bytes,
4916 ));
4917 }
4918
4919 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4920
4921 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4922 tx.clone(),
4923 epoch_store.epoch(),
4924 checkpoint,
4925 );
4926
4927 let tx_digest = executable_tx.digest();
4928
4929 info!(
4930 ?next_epoch,
4931 ?next_epoch_protocol_version,
4932 ?next_epoch_system_packages,
4933 computation_cost=?gas_cost_summary.computation_cost,
4934 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4935 storage_cost=?gas_cost_summary.storage_cost,
4936 storage_rebate=?gas_cost_summary.storage_rebate,
4937 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4938 ?tx_digest,
4939 "Creating advance epoch transaction"
4940 );
4941
4942 fail_point_async!("change_epoch_tx_delay");
4943 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
4944
4945 if self
4949 .get_transaction_cache_reader()
4950 .try_is_tx_already_executed(tx_digest)?
4951 {
4952 warn!("change epoch tx has already been executed via state sync");
4953 bail!("change epoch tx has already been executed via state sync",);
4954 }
4955
4956 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
4957
4958 epoch_store.assign_shared_object_versions_idempotent(
4962 self.get_object_cache_reader().as_ref(),
4963 std::slice::from_ref(&executable_tx),
4964 )?;
4965
4966 let input_objects =
4967 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4968
4969 let (temporary_store, effects, _execution_error_opt) =
4970 self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4971 let system_obj = get_iota_system_state(&temporary_store.written)
4972 .expect("change epoch tx must write to system object");
4973 let system_epoch_info_event = temporary_store
4975 .events
4976 .data
4977 .into_iter()
4978 .find(|event| event.is_system_epoch_info_event())
4979 .map(SystemEpochInfoEvent::from);
4980 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4983
4984 self.get_state_sync_store()
4988 .try_insert_transaction_and_effects(&tx, &effects)
4989 .map_err(|err| {
4990 let err: anyhow::Error = err.into();
4991 err
4992 })?;
4993
4994 info!(
4995 "Effects summary of the change epoch transaction: {:?}",
4996 effects.summary_for_debug()
4997 );
4998 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
4999 assert!(effects.status().is_ok());
5001 Ok((system_obj, system_epoch_info_event, effects))
5002 }
5003
5004 #[instrument(level = "error", skip_all)]
5008 async fn revert_uncommitted_epoch_transactions(
5009 &self,
5010 epoch_store: &AuthorityPerEpochStore,
5011 ) -> IotaResult {
5012 {
5013 let state = epoch_store.get_reconfig_state_write_lock_guard();
5014 if state.should_accept_user_certs() {
5015 epoch_store.close_user_certs(state);
5024 }
5025 }
5027 let pending_certificates = epoch_store.pending_consensus_certificates();
5028 info!(
5029 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5030 pending_certificates.len(),
5031 pending_certificates,
5032 );
5033 for digest in pending_certificates {
5034 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5035 info!(
5036 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5037 digest
5038 );
5039 continue;
5040 }
5041 info!("Reverting {:?} at the end of epoch", digest);
5042 epoch_store.revert_executed_transaction(&digest)?;
5043 self.get_reconfig_api().try_revert_state_update(&digest)?;
5044 }
5045 info!("All uncommitted local transactions reverted");
5046 Ok(())
5047 }
5048
5049 #[instrument(level = "error", skip_all)]
5050 async fn reopen_epoch_db(
5051 &self,
5052 cur_epoch_store: &AuthorityPerEpochStore,
5053 new_committee: Committee,
5054 epoch_start_configuration: EpochStartConfiguration,
5055 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5056 epoch_last_checkpoint: CheckpointSequenceNumber,
5057 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5058 let new_epoch = new_committee.epoch;
5059 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5060 assert_eq!(
5061 epoch_start_configuration.epoch_start_state().epoch(),
5062 new_committee.epoch
5063 );
5064 fail_point!("before-open-new-epoch-store");
5065 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5066 self.name,
5067 new_committee,
5068 epoch_start_configuration,
5069 self.get_backing_package_store().clone(),
5070 self.get_object_store().clone(),
5071 expensive_safety_check_config,
5072 epoch_last_checkpoint,
5073 );
5074 self.epoch_store.store(new_epoch_store.clone());
5075 Ok(new_epoch_store)
5076 }
5077
5078 #[cfg(test)]
5079 pub(crate) fn iter_live_object_set_for_testing(
5080 &self,
5081 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5082 self.get_accumulator_store()
5083 .iter_cached_live_object_set_for_testing()
5084 }
5085
5086 #[cfg(test)]
5087 pub(crate) fn shutdown_execution_for_test(&self) {
5088 self.tx_execution_shutdown
5089 .lock()
5090 .take()
5091 .unwrap()
5092 .send(())
5093 .unwrap();
5094 }
5095
5096 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5099 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5100 self.get_object_cache_reader()
5101 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5102 self.get_reconfig_api()
5103 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5104 }
5105}
5106
5107pub struct RandomnessRoundReceiver {
5108 authority_state: Arc<AuthorityState>,
5109 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5110}
5111
5112impl RandomnessRoundReceiver {
5113 pub fn spawn(
5114 authority_state: Arc<AuthorityState>,
5115 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5116 ) -> JoinHandle<()> {
5117 let rrr = RandomnessRoundReceiver {
5118 authority_state,
5119 randomness_rx,
5120 };
5121 spawn_monitored_task!(rrr.run())
5122 }
5123
5124 async fn run(mut self) {
5125 info!("RandomnessRoundReceiver event loop started");
5126
5127 loop {
5128 tokio::select! {
5129 maybe_recv = self.randomness_rx.recv() => {
5130 if let Some((epoch, round, bytes)) = maybe_recv {
5131 self.handle_new_randomness(epoch, round, bytes);
5132 } else {
5133 break;
5134 }
5135 },
5136 }
5137 }
5138
5139 info!("RandomnessRoundReceiver event loop ended");
5140 }
5141
5142 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5143 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5144 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5145 if epoch_store.epoch() != epoch {
5146 warn!(
5147 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5148 epoch_store.epoch()
5149 );
5150 return;
5151 }
5152 let transaction = VerifiedTransaction::new_randomness_state_update(
5153 epoch,
5154 round,
5155 bytes,
5156 epoch_store
5157 .epoch_start_config()
5158 .randomness_obj_initial_shared_version(),
5159 );
5160 debug!(
5161 "created randomness state update transaction with digest: {:?}",
5162 transaction.digest()
5163 );
5164 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5165 let digest = *transaction.digest();
5166
5167 self.authority_state
5172 .get_cache_commit()
5173 .persist_transaction(&transaction);
5174
5175 self.authority_state
5177 .transaction_manager()
5178 .enqueue(vec![transaction], &epoch_store);
5179
5180 let authority_state = self.authority_state.clone();
5181 spawn_monitored_task!(async move {
5182 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5191 let result = tokio::time::timeout(
5192 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5193 authority_state
5194 .get_transaction_cache_reader()
5195 .try_notify_read_executed_effects(&[digest]),
5196 )
5197 .await;
5198 let result = match result {
5199 Ok(result) => result,
5200 Err(_) => {
5201 if cfg!(debug_assertions) {
5202 panic!(
5204 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5205 );
5206 }
5207 warn!(
5208 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5209 );
5210 authority_state
5212 .get_transaction_cache_reader()
5213 .try_notify_read_executed_effects(&[digest])
5214 .await
5215 }
5216 };
5217
5218 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5219 let effects = effects.pop().expect("should return effects");
5220 if *effects.status() != ExecutionStatus::Success {
5221 fatal!(
5222 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5223 );
5224 }
5225 debug!(
5226 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5227 );
5228 });
5229 }
5230}
5231
5232#[async_trait]
5233impl TransactionKeyValueStoreTrait for AuthorityState {
5234 async fn multi_get(
5235 &self,
5236 transaction_keys: &[TransactionDigest],
5237 effects_keys: &[TransactionDigest],
5238 ) -> IotaResult<KVStoreTransactionData> {
5239 let txns = if !transaction_keys.is_empty() {
5240 self.get_transaction_cache_reader()
5241 .try_multi_get_transaction_blocks(transaction_keys)?
5242 .into_iter()
5243 .map(|t| t.map(|t| (*t).clone().into_inner()))
5244 .collect()
5245 } else {
5246 vec![]
5247 };
5248
5249 let fx = if !effects_keys.is_empty() {
5250 self.get_transaction_cache_reader()
5251 .try_multi_get_executed_effects(effects_keys)?
5252 } else {
5253 vec![]
5254 };
5255
5256 Ok((txns, fx))
5257 }
5258
5259 async fn multi_get_checkpoints(
5260 &self,
5261 checkpoint_summaries: &[CheckpointSequenceNumber],
5262 checkpoint_contents: &[CheckpointSequenceNumber],
5263 checkpoint_summaries_by_digest: &[CheckpointDigest],
5264 ) -> IotaResult<(
5265 Vec<Option<CertifiedCheckpointSummary>>,
5266 Vec<Option<CheckpointContents>>,
5267 Vec<Option<CertifiedCheckpointSummary>>,
5268 )> {
5269 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5271 let store = self.get_checkpoint_store();
5272 for seq in checkpoint_summaries {
5273 let checkpoint = store
5274 .get_checkpoint_by_sequence_number(*seq)?
5275 .map(|c| c.into_inner());
5276
5277 summaries.push(checkpoint);
5278 }
5279
5280 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5281 for seq in checkpoint_contents {
5282 let checkpoint = store
5283 .get_checkpoint_by_sequence_number(*seq)?
5284 .and_then(|summary| {
5285 store
5286 .get_checkpoint_contents(&summary.content_digest)
5287 .expect("db read cannot fail")
5288 });
5289 contents.push(checkpoint);
5290 }
5291
5292 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5293 for digest in checkpoint_summaries_by_digest {
5294 let checkpoint = store
5295 .get_checkpoint_by_digest(digest)?
5296 .map(|c| c.into_inner());
5297 summaries_by_digest.push(checkpoint);
5298 }
5299
5300 Ok((summaries, contents, summaries_by_digest))
5301 }
5302
5303 async fn get_transaction_perpetual_checkpoint(
5304 &self,
5305 digest: TransactionDigest,
5306 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5307 self.get_checkpoint_cache()
5308 .try_get_transaction_perpetual_checkpoint(&digest)
5309 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5310 }
5311
5312 async fn get_object(
5313 &self,
5314 object_id: ObjectID,
5315 version: VersionNumber,
5316 ) -> IotaResult<Option<Object>> {
5317 self.get_object_cache_reader()
5318 .try_get_object_by_key(&object_id, version)
5319 }
5320
5321 async fn multi_get_transactions_perpetual_checkpoints(
5322 &self,
5323 digests: &[TransactionDigest],
5324 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5325 let res = self
5326 .get_checkpoint_cache()
5327 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5328
5329 Ok(res
5330 .into_iter()
5331 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5332 .collect())
5333 }
5334
5335 #[instrument(skip(self))]
5336 async fn multi_get_events_by_tx_digests(
5337 &self,
5338 digests: &[TransactionDigest],
5339 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5340 if digests.is_empty() {
5341 return Ok(vec![]);
5342 }
5343 let events_digests: Vec<_> = self
5344 .get_transaction_cache_reader()
5345 .try_multi_get_executed_effects(digests)?
5346 .into_iter()
5347 .map(|t| t.and_then(|t| t.events_digest().cloned()))
5348 .collect();
5349 let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5350 let mut events = self
5351 .get_transaction_cache_reader()
5352 .try_multi_get_events(&non_empty_events)?
5353 .into_iter();
5354 Ok(events_digests
5355 .into_iter()
5356 .map(|ev| ev.and_then(|_| events.next()?))
5357 .collect())
5358 }
5359}
5360
5361#[cfg(msim)]
5362pub mod framework_injection {
5363 use std::{
5364 cell::RefCell,
5365 collections::{BTreeMap, BTreeSet},
5366 };
5367
5368 use iota_framework::{BuiltInFramework, SystemPackage};
5369 use iota_types::{
5370 base_types::{AuthorityName, ObjectID},
5371 is_system_package,
5372 };
5373 use move_binary_format::CompiledModule;
5374
5375 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5376
5377 thread_local! {
5379 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5380 }
5381
5382 type Framework = Vec<CompiledModule>;
5383
5384 pub type PackageUpgradeCallback =
5385 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5386
5387 enum PackageOverrideConfig {
5388 Global(Framework),
5389 PerValidator(PackageUpgradeCallback),
5390 }
5391
5392 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5393 modules
5394 .iter()
5395 .map(|m| {
5396 let mut buf = Vec::new();
5397 m.serialize_with_version(m.version, &mut buf).unwrap();
5398 buf
5399 })
5400 .collect()
5401 }
5402
5403 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5404 OVERRIDE.with(|bs| {
5405 bs.borrow_mut()
5406 .insert(package_id, PackageOverrideConfig::Global(modules))
5407 });
5408 }
5409
5410 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5411 OVERRIDE.with(|bs| {
5412 bs.borrow_mut()
5413 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5414 });
5415 }
5416
5417 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5418 OVERRIDE.with(|cfg| {
5419 cfg.borrow().get(package_id).and_then(|entry| match entry {
5420 PackageOverrideConfig::Global(framework) => {
5421 Some(compiled_modules_to_bytes(framework))
5422 }
5423 PackageOverrideConfig::PerValidator(func) => {
5424 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5425 }
5426 })
5427 })
5428 }
5429
5430 pub fn get_override_modules(
5431 package_id: &ObjectID,
5432 name: AuthorityName,
5433 ) -> Option<Vec<CompiledModule>> {
5434 OVERRIDE.with(|cfg| {
5435 cfg.borrow().get(package_id).and_then(|entry| match entry {
5436 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5437 PackageOverrideConfig::PerValidator(func) => func(name),
5438 })
5439 })
5440 }
5441
5442 pub fn get_override_system_package(
5443 package_id: &ObjectID,
5444 name: AuthorityName,
5445 ) -> Option<SystemPackage> {
5446 let bytes = get_override_bytes(package_id, name)?;
5447 let dependencies = if is_system_package(*package_id) {
5448 BuiltInFramework::get_package_by_id(package_id)
5449 .dependencies
5450 .to_vec()
5451 } else {
5452 BuiltInFramework::all_package_ids()
5455 };
5456 Some(SystemPackage {
5457 id: *package_id,
5458 bytes,
5459 dependencies,
5460 })
5461 }
5462
5463 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5464 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5465 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5466 cfg.borrow()
5467 .keys()
5468 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5469 .collect()
5470 });
5471
5472 extra
5473 .into_iter()
5474 .map(|package| SystemPackage {
5475 id: package,
5476 bytes: get_override_bytes(&package, name).unwrap(),
5477 dependencies: BuiltInFramework::all_package_ids(),
5478 })
5479 .collect()
5480 }
5481}
5482
5483#[derive(Debug, Serialize, Deserialize, Clone)]
5484pub struct ObjDumpFormat {
5485 pub id: ObjectID,
5486 pub version: VersionNumber,
5487 pub digest: ObjectDigest,
5488 pub object: Object,
5489}
5490
5491impl ObjDumpFormat {
5492 fn new(object: Object) -> Self {
5493 let oref = object.compute_object_reference();
5494 Self {
5495 id: oref.0,
5496 version: oref.1,
5497 digest: oref.2,
5498 object,
5499 }
5500 }
5501}
5502
5503#[derive(Debug, Serialize, Deserialize, Clone)]
5504pub struct NodeStateDump {
5505 pub tx_digest: TransactionDigest,
5506 pub sender_signed_data: SenderSignedData,
5507 pub executed_epoch: u64,
5508 pub reference_gas_price: u64,
5509 pub protocol_version: u64,
5510 pub epoch_start_timestamp_ms: u64,
5511 pub computed_effects: TransactionEffects,
5512 pub expected_effects_digest: TransactionEffectsDigest,
5513 pub relevant_system_packages: Vec<ObjDumpFormat>,
5514 pub shared_objects: Vec<ObjDumpFormat>,
5515 pub loaded_child_objects: Vec<ObjDumpFormat>,
5516 pub modified_at_versions: Vec<ObjDumpFormat>,
5517 pub runtime_reads: Vec<ObjDumpFormat>,
5518 pub input_objects: Vec<ObjDumpFormat>,
5519}
5520
5521impl NodeStateDump {
5522 pub fn new(
5523 tx_digest: &TransactionDigest,
5524 effects: &TransactionEffects,
5525 expected_effects_digest: TransactionEffectsDigest,
5526 object_store: &dyn ObjectStore,
5527 epoch_store: &Arc<AuthorityPerEpochStore>,
5528 inner_temporary_store: &InnerTemporaryStore,
5529 certificate: &VerifiedExecutableTransaction,
5530 ) -> IotaResult<Self> {
5531 let executed_epoch = epoch_store.epoch();
5533 let reference_gas_price = epoch_store.reference_gas_price();
5534 let epoch_start_config = epoch_store.epoch_start_config();
5535 let protocol_version = epoch_store.protocol_version().as_u64();
5536 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5537
5538 let mut relevant_system_packages = Vec::new();
5540 for sys_package_id in BuiltInFramework::all_package_ids() {
5541 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5542 relevant_system_packages.push(ObjDumpFormat::new(w))
5543 }
5544 }
5545
5546 let mut shared_objects = Vec::new();
5548 for kind in effects.input_shared_objects() {
5549 match kind {
5550 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5551 if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5552 shared_objects.push(ObjDumpFormat::new(w))
5553 }
5554 }
5555 InputSharedObject::ReadDeleted(..)
5556 | InputSharedObject::MutateDeleted(..)
5557 | InputSharedObject::Cancelled(..) => (), }
5560 }
5561
5562 let mut loaded_child_objects = Vec::new();
5565 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5566 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5567 loaded_child_objects.push(ObjDumpFormat::new(w))
5568 }
5569 }
5570
5571 let mut modified_at_versions = Vec::new();
5573 for (id, ver) in effects.modified_at_versions() {
5574 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5575 modified_at_versions.push(ObjDumpFormat::new(w))
5576 }
5577 }
5578
5579 let mut runtime_reads = Vec::new();
5583 for obj in inner_temporary_store
5584 .runtime_packages_loaded_from_db
5585 .values()
5586 {
5587 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5588 }
5589
5590 Ok(Self {
5593 tx_digest: *tx_digest,
5594 executed_epoch,
5595 reference_gas_price,
5596 epoch_start_timestamp_ms,
5597 protocol_version,
5598 relevant_system_packages,
5599 shared_objects,
5600 loaded_child_objects,
5601 modified_at_versions,
5602 runtime_reads,
5603 sender_signed_data: certificate.clone().into_message(),
5604 input_objects: inner_temporary_store
5605 .input_objects
5606 .values()
5607 .map(|o| ObjDumpFormat::new(o.clone()))
5608 .collect(),
5609 computed_effects: effects.clone(),
5610 expected_effects_digest,
5611 })
5612 }
5613
5614 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5615 let mut objects = Vec::new();
5616 objects.extend(self.relevant_system_packages.clone());
5617 objects.extend(self.shared_objects.clone());
5618 objects.extend(self.loaded_child_objects.clone());
5619 objects.extend(self.modified_at_versions.clone());
5620 objects.extend(self.runtime_reads.clone());
5621 objects.extend(self.input_objects.clone());
5622 objects
5623 }
5624
5625 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5626 let file_name = format!(
5627 "{}_{}_NODE_DUMP.json",
5628 self.tx_digest,
5629 AuthorityState::unixtime_now_ms()
5630 );
5631 let mut path = path.to_path_buf();
5632 path.push(&file_name);
5633 let mut file = File::create(path.clone())?;
5634 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5635 Ok(path)
5636 }
5637
5638 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5639 let file = File::open(path)?;
5640 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5641 }
5642}