1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_storage::{
48 key_value_store::{
49 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
50 },
51 key_value_store_metrics::KeyValueStoreMetrics,
52};
53#[cfg(msim)]
54use iota_types::committee::CommitteeTrait;
55use iota_types::{
56 IOTA_SYSTEM_ADDRESS, TypeTag,
57 authenticator_state::get_authenticator_state,
58 base_types::*,
59 committee::{Committee, EpochId, ProtocolVersion},
60 crypto::{
61 AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
62 default_hash,
63 },
64 deny_list_v1::check_coin_deny_list_v1_during_signing,
65 digests::{ChainIdentifier, Digest, TransactionEventsDigest},
66 dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
67 effects::{
68 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
69 TransactionEvents, VerifiedSignedTransactionEffects,
70 },
71 error::{ExecutionError, IotaError, IotaResult, UserInputError},
72 event::{Event, EventID, SystemEpochInfoEvent},
73 executable_transaction::VerifiedExecutableTransaction,
74 execution_config_utils::to_binary_config,
75 execution_status::ExecutionStatus,
76 gas::{GasCostSummary, IotaGasStatus},
77 gas_coin::NANOS_PER_IOTA,
78 inner_temporary_store::{
79 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
80 WrittenObjects,
81 },
82 iota_system_state::{
83 IotaSystemState, IotaSystemStateTrait,
84 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
85 },
86 is_system_package,
87 layout_resolver::{LayoutResolver, into_struct_layout},
88 message_envelope::Message,
89 messages_checkpoint::{
90 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
91 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
92 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
93 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
94 },
95 messages_consensus::AuthorityCapabilitiesV1,
96 messages_grpc::{
97 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
98 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
99 TransactionStatus,
100 },
101 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
102 object::{
103 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
104 bounded_visitor::BoundedVisitor,
105 },
106 storage::{
107 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
108 },
109 supported_protocol_versions::{
110 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
111 },
112 transaction::*,
113 transaction_executor::SimulateTransactionResult,
114};
115use itertools::Itertools;
116use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
117use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
118use parking_lot::Mutex;
119use prometheus::{
120 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
121 register_histogram_vec_with_registry, register_histogram_with_registry,
122 register_int_counter_vec_with_registry, register_int_counter_with_registry,
123 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
124};
125use serde::{Deserialize, Serialize, de::DeserializeOwned};
126use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
127use tap::TapFallible;
128use tokio::{
129 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
130 task::JoinHandle,
131};
132use tracing::{debug, error, info, instrument, trace, warn};
133use typed_store::TypedStoreError;
134
135use self::{
136 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
137};
138#[cfg(msim)]
139pub use crate::checkpoints::checkpoint_executor::utils::{
140 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
141};
142use crate::{
143 authority::{
144 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
145 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
146 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
147 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
148 authority_store_tables::AuthorityPrunerTables,
149 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
150 },
151 authority_client::NetworkAuthorityClient,
152 checkpoints::CheckpointStore,
153 congestion_tracker::CongestionTracker,
154 consensus_adapter::ConsensusAdapter,
155 epoch::committee_store::CommitteeStore,
156 execution_cache::{
157 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
158 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
159 TransactionCacheRead,
160 },
161 execution_driver::execution_process,
162 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
163 metrics::{LatencyObserver, RateTracker},
164 module_cache_metrics::ResolverMetrics,
165 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
166 rest_index::RestIndexStore,
167 stake_aggregator::StakeAggregator,
168 state_accumulator::{AccumulatorStore, StateAccumulator},
169 subscription_handler::SubscriptionHandler,
170 transaction_input_loader::TransactionInputLoader,
171 transaction_manager::TransactionManager,
172 transaction_outputs::TransactionOutputs,
173 validator_tx_finalizer::ValidatorTxFinalizer,
174 verify_indexes::verify_indexes,
175};
176
177#[cfg(test)]
178#[path = "unit_tests/authority_tests.rs"]
179pub mod authority_tests;
180
181#[cfg(test)]
182#[path = "unit_tests/transaction_tests.rs"]
183pub mod transaction_tests;
184
185#[cfg(test)]
186#[path = "unit_tests/batch_transaction_tests.rs"]
187mod batch_transaction_tests;
188
189#[cfg(test)]
190#[path = "unit_tests/move_integration_tests.rs"]
191pub mod move_integration_tests;
192
193#[cfg(test)]
194#[path = "unit_tests/gas_tests.rs"]
195mod gas_tests;
196
197#[cfg(test)]
198#[path = "unit_tests/batch_verification_tests.rs"]
199mod batch_verification_tests;
200
201#[cfg(test)]
202#[path = "unit_tests/coin_deny_list_tests.rs"]
203mod coin_deny_list_tests;
204
205#[cfg(test)]
206#[path = "unit_tests/auth_unit_test_utils.rs"]
207pub mod auth_unit_test_utils;
208
209pub mod authority_test_utils;
210
211pub mod authority_per_epoch_store;
212pub mod authority_per_epoch_store_pruner;
213
214pub mod authority_store_pruner;
215pub mod authority_store_tables;
216pub mod authority_store_types;
217pub mod epoch_start_configuration;
218pub mod shared_object_congestion_tracker;
219pub mod shared_object_version_manager;
220pub mod suggested_gas_price_calculator;
221pub mod test_authority_builder;
222pub mod transaction_deferral;
223
224pub(crate) mod authority_store;
225pub mod backpressure;
226
227pub struct AuthorityMetrics {
229 tx_orders: IntCounter,
230 total_certs: IntCounter,
231 total_cert_attempts: IntCounter,
232 total_effects: IntCounter,
233 pub shared_obj_tx: IntCounter,
234 sponsored_tx: IntCounter,
235 tx_already_processed: IntCounter,
236 num_input_objs: Histogram,
237 num_shared_objects: Histogram,
238 batch_size: Histogram,
239
240 authority_state_handle_transaction_latency: Histogram,
241
242 execute_certificate_latency_single_writer: Histogram,
243 execute_certificate_latency_shared_object: Histogram,
244
245 internal_execution_latency: Histogram,
246 execution_load_input_objects_latency: Histogram,
247 prepare_certificate_latency: Histogram,
248 commit_certificate_latency: Histogram,
249 db_checkpoint_latency: Histogram,
250
251 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
252 pub(crate) transaction_manager_num_missing_objects: IntGauge,
253 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
254 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
255 pub(crate) transaction_manager_num_ready: IntGauge,
256 pub(crate) transaction_manager_object_cache_size: IntGauge,
257 pub(crate) transaction_manager_object_cache_hits: IntCounter,
258 pub(crate) transaction_manager_object_cache_misses: IntCounter,
259 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
260 pub(crate) transaction_manager_package_cache_size: IntGauge,
261 pub(crate) transaction_manager_package_cache_hits: IntCounter,
262 pub(crate) transaction_manager_package_cache_misses: IntCounter,
263 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
264 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
265
266 pub(crate) execution_driver_executed_transactions: IntCounter,
267 pub(crate) execution_driver_dispatch_queue: IntGauge,
268 pub(crate) execution_queueing_delay_s: Histogram,
269 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
270 pub(crate) execution_gas_latency_ratio: Histogram,
271
272 pub(crate) skipped_consensus_txns: IntCounter,
273 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
274
275 pub(crate) authority_overload_status: IntGauge,
276 pub(crate) authority_load_shedding_percentage: IntGauge,
277
278 pub(crate) transaction_overload_sources: IntCounterVec,
279
280 post_processing_total_events_emitted: IntCounter,
282 post_processing_total_tx_indexed: IntCounter,
283 post_processing_total_tx_had_event_processed: IntCounter,
284 post_processing_total_failures: IntCounter,
285
286 pub consensus_handler_processed: IntCounterVec,
288 pub consensus_handler_transaction_sizes: HistogramVec,
289 pub consensus_handler_num_low_scoring_authorities: IntGauge,
290 pub consensus_handler_scores: IntGaugeVec,
291 pub consensus_handler_deferred_transactions: IntCounter,
292 pub consensus_handler_congested_transactions: IntCounter,
293 pub consensus_handler_cancelled_transactions: IntCounter,
294 pub consensus_handler_max_object_costs: IntGaugeVec,
295 pub consensus_committed_subdags: IntCounterVec,
296 pub consensus_committed_messages: IntGaugeVec,
297 pub consensus_committed_user_transactions: IntGaugeVec,
298 pub consensus_calculated_throughput: IntGauge,
299 pub consensus_calculated_throughput_profile: IntGauge,
300
301 pub limits_metrics: Arc<LimitsMetrics>,
302
303 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
305
306 pub zklogin_sig_count: IntCounter,
308 pub multisig_sig_count: IntCounter,
310
311 pub execution_queueing_latency: LatencyObserver,
314
315 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
322
323 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
326}
327
328const POSITIVE_INT_BUCKETS: &[f64] = &[
330 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
331 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
332 7000000., 10000000.,
333];
334
335const LATENCY_SEC_BUCKETS: &[f64] = &[
336 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
337 10., 20., 30., 60., 90.,
338];
339
340const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
342 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
343 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
344];
345
346const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
347 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
348 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
349];
350
351pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
355 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
356 let execute_certificate_latency = register_histogram_vec_with_registry!(
357 "authority_state_execute_certificate_latency",
358 "Latency of executing certificates, including waiting for inputs",
359 &["tx_type"],
360 LATENCY_SEC_BUCKETS.to_vec(),
361 registry,
362 )
363 .unwrap();
364
365 let execute_certificate_latency_single_writer =
366 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
367 let execute_certificate_latency_shared_object =
368 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
369
370 Self {
371 tx_orders: register_int_counter_with_registry!(
372 "total_transaction_orders",
373 "Total number of transaction orders",
374 registry,
375 )
376 .unwrap(),
377 total_certs: register_int_counter_with_registry!(
378 "total_transaction_certificates",
379 "Total number of transaction certificates handled",
380 registry,
381 )
382 .unwrap(),
383 total_cert_attempts: register_int_counter_with_registry!(
384 "total_handle_certificate_attempts",
385 "Number of calls to handle_certificate",
386 registry,
387 )
388 .unwrap(),
389 total_effects: register_int_counter_with_registry!(
391 "total_transaction_effects",
392 "Total number of transaction effects produced",
393 registry,
394 )
395 .unwrap(),
396
397 shared_obj_tx: register_int_counter_with_registry!(
398 "num_shared_obj_tx",
399 "Number of transactions involving shared objects",
400 registry,
401 )
402 .unwrap(),
403
404 sponsored_tx: register_int_counter_with_registry!(
405 "num_sponsored_tx",
406 "Number of sponsored transactions",
407 registry,
408 )
409 .unwrap(),
410
411 tx_already_processed: register_int_counter_with_registry!(
412 "num_tx_already_processed",
413 "Number of transaction orders already processed previously",
414 registry,
415 )
416 .unwrap(),
417 num_input_objs: register_histogram_with_registry!(
418 "num_input_objects",
419 "Distribution of number of input TX objects per TX",
420 POSITIVE_INT_BUCKETS.to_vec(),
421 registry,
422 )
423 .unwrap(),
424 num_shared_objects: register_histogram_with_registry!(
425 "num_shared_objects",
426 "Number of shared input objects per TX",
427 POSITIVE_INT_BUCKETS.to_vec(),
428 registry,
429 )
430 .unwrap(),
431 batch_size: register_histogram_with_registry!(
432 "batch_size",
433 "Distribution of size of transaction batch",
434 POSITIVE_INT_BUCKETS.to_vec(),
435 registry,
436 )
437 .unwrap(),
438 authority_state_handle_transaction_latency: register_histogram_with_registry!(
439 "authority_state_handle_transaction_latency",
440 "Latency of handling transactions",
441 LATENCY_SEC_BUCKETS.to_vec(),
442 registry,
443 )
444 .unwrap(),
445 execute_certificate_latency_single_writer,
446 execute_certificate_latency_shared_object,
447 internal_execution_latency: register_histogram_with_registry!(
448 "authority_state_internal_execution_latency",
449 "Latency of actual certificate executions",
450 LATENCY_SEC_BUCKETS.to_vec(),
451 registry,
452 )
453 .unwrap(),
454 execution_load_input_objects_latency: register_histogram_with_registry!(
455 "authority_state_execution_load_input_objects_latency",
456 "Latency of loading input objects for execution",
457 LOW_LATENCY_SEC_BUCKETS.to_vec(),
458 registry,
459 )
460 .unwrap(),
461 prepare_certificate_latency: register_histogram_with_registry!(
462 "authority_state_prepare_certificate_latency",
463 "Latency of executing certificates, before committing the results",
464 LATENCY_SEC_BUCKETS.to_vec(),
465 registry,
466 )
467 .unwrap(),
468 commit_certificate_latency: register_histogram_with_registry!(
469 "authority_state_commit_certificate_latency",
470 "Latency of committing certificate execution results",
471 LATENCY_SEC_BUCKETS.to_vec(),
472 registry,
473 )
474 .unwrap(),
475 db_checkpoint_latency: register_histogram_with_registry!(
476 "db_checkpoint_latency",
477 "Latency of checkpointing dbs",
478 LATENCY_SEC_BUCKETS.to_vec(),
479 registry,
480 ).unwrap(),
481 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
482 "transaction_manager_num_enqueued_certificates",
483 "Current number of certificates enqueued to TransactionManager",
484 &["result"],
485 registry,
486 )
487 .unwrap(),
488 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
489 "transaction_manager_num_missing_objects",
490 "Current number of missing objects in TransactionManager",
491 registry,
492 )
493 .unwrap(),
494 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
495 "transaction_manager_num_pending_certificates",
496 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
497 registry,
498 )
499 .unwrap(),
500 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
501 "transaction_manager_num_executing_certificates",
502 "Number of executing certificates, including queued and actually running certificates",
503 registry,
504 )
505 .unwrap(),
506 transaction_manager_num_ready: register_int_gauge_with_registry!(
507 "transaction_manager_num_ready",
508 "Number of ready transactions in TransactionManager",
509 registry,
510 )
511 .unwrap(),
512 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
513 "transaction_manager_object_cache_size",
514 "Current size of object-availability cache in TransactionManager",
515 registry,
516 )
517 .unwrap(),
518 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
519 "transaction_manager_object_cache_hits",
520 "Number of object-availability cache hits in TransactionManager",
521 registry,
522 )
523 .unwrap(),
524 authority_overload_status: register_int_gauge_with_registry!(
525 "authority_overload_status",
526 "Whether authority is current experiencing overload and enters load shedding mode.",
527 registry)
528 .unwrap(),
529 authority_load_shedding_percentage: register_int_gauge_with_registry!(
530 "authority_load_shedding_percentage",
531 "The percentage of transactions is shed when the authority is in load shedding mode.",
532 registry)
533 .unwrap(),
534 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
535 "transaction_manager_object_cache_misses",
536 "Number of object-availability cache misses in TransactionManager",
537 registry,
538 )
539 .unwrap(),
540 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
541 "transaction_manager_object_cache_evictions",
542 "Number of object-availability cache evictions in TransactionManager",
543 registry,
544 )
545 .unwrap(),
546 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
547 "transaction_manager_package_cache_size",
548 "Current size of package-availability cache in TransactionManager",
549 registry,
550 )
551 .unwrap(),
552 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
553 "transaction_manager_package_cache_hits",
554 "Number of package-availability cache hits in TransactionManager",
555 registry,
556 )
557 .unwrap(),
558 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
559 "transaction_manager_package_cache_misses",
560 "Number of package-availability cache misses in TransactionManager",
561 registry,
562 )
563 .unwrap(),
564 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
565 "transaction_manager_package_cache_evictions",
566 "Number of package-availability cache evictions in TransactionManager",
567 registry,
568 )
569 .unwrap(),
570 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
571 "transaction_manager_transaction_queue_age_s",
572 "Time spent in waiting for transaction in the queue",
573 LATENCY_SEC_BUCKETS.to_vec(),
574 registry,
575 )
576 .unwrap(),
577 transaction_overload_sources: register_int_counter_vec_with_registry!(
578 "transaction_overload_sources",
579 "Number of times each source indicates transaction overload.",
580 &["source"],
581 registry)
582 .unwrap(),
583 execution_driver_executed_transactions: register_int_counter_with_registry!(
584 "execution_driver_executed_transactions",
585 "Cumulative number of transaction executed by execution driver",
586 registry,
587 )
588 .unwrap(),
589 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
590 "execution_driver_dispatch_queue",
591 "Number of transaction pending in execution driver dispatch queue",
592 registry,
593 )
594 .unwrap(),
595 execution_queueing_delay_s: register_histogram_with_registry!(
596 "execution_queueing_delay_s",
597 "Queueing delay between a transaction is ready for execution until it starts executing.",
598 LATENCY_SEC_BUCKETS.to_vec(),
599 registry
600 )
601 .unwrap(),
602 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
603 "prepare_cert_gas_latency_ratio",
604 "The ratio of computation gas divided by VM execution latency.",
605 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
606 registry
607 )
608 .unwrap(),
609 execution_gas_latency_ratio: register_histogram_with_registry!(
610 "execution_gas_latency_ratio",
611 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
612 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
613 registry
614 )
615 .unwrap(),
616 skipped_consensus_txns: register_int_counter_with_registry!(
617 "skipped_consensus_txns",
618 "Total number of consensus transactions skipped",
619 registry,
620 )
621 .unwrap(),
622 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
623 "skipped_consensus_txns_cache_hit",
624 "Total number of consensus transactions skipped because of local cache hit",
625 registry,
626 )
627 .unwrap(),
628 post_processing_total_events_emitted: register_int_counter_with_registry!(
629 "post_processing_total_events_emitted",
630 "Total number of events emitted in post processing",
631 registry,
632 )
633 .unwrap(),
634 post_processing_total_tx_indexed: register_int_counter_with_registry!(
635 "post_processing_total_tx_indexed",
636 "Total number of txes indexed in post processing",
637 registry,
638 )
639 .unwrap(),
640 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
641 "post_processing_total_tx_had_event_processed",
642 "Total number of txes finished event processing in post processing",
643 registry,
644 )
645 .unwrap(),
646 post_processing_total_failures: register_int_counter_with_registry!(
647 "post_processing_total_failures",
648 "Total number of failure in post processing",
649 registry,
650 )
651 .unwrap(),
652 consensus_handler_processed: register_int_counter_vec_with_registry!(
653 "consensus_handler_processed",
654 "Number of transactions processed by consensus handler",
655 &["class"],
656 registry
657 ).unwrap(),
658 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
659 "consensus_handler_transaction_sizes",
660 "Sizes of each type of transactions processed by consensus handler",
661 &["class"],
662 POSITIVE_INT_BUCKETS.to_vec(),
663 registry
664 ).unwrap(),
665 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
666 "consensus_handler_num_low_scoring_authorities",
667 "Number of low scoring authorities based on reputation scores from consensus",
668 registry
669 ).unwrap(),
670 consensus_handler_scores: register_int_gauge_vec_with_registry!(
671 "consensus_handler_scores",
672 "scores from consensus for each authority",
673 &["authority"],
674 registry,
675 ).unwrap(),
676 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
677 "consensus_handler_deferred_transactions",
678 "Number of transactions deferred by consensus handler",
679 registry,
680 ).unwrap(),
681 consensus_handler_congested_transactions: register_int_counter_with_registry!(
682 "consensus_handler_congested_transactions",
683 "Number of transactions deferred by consensus handler due to congestion",
684 registry,
685 ).unwrap(),
686 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
687 "consensus_handler_cancelled_transactions",
688 "Number of transactions cancelled by consensus handler",
689 registry,
690 ).unwrap(),
691 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
692 "consensus_handler_max_congestion_control_object_costs",
693 "Max object costs for congestion control in the current consensus commit",
694 &["commit_type"],
695 registry,
696 ).unwrap(),
697 consensus_committed_subdags: register_int_counter_vec_with_registry!(
698 "consensus_committed_subdags",
699 "Number of committed subdags, sliced by author",
700 &["authority"],
701 registry,
702 ).unwrap(),
703 consensus_committed_messages: register_int_gauge_vec_with_registry!(
704 "consensus_committed_messages",
705 "Total number of committed consensus messages, sliced by author",
706 &["authority"],
707 registry,
708 ).unwrap(),
709 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
710 "consensus_committed_user_transactions",
711 "Number of committed user transactions, sliced by submitter",
712 &["authority"],
713 registry,
714 ).unwrap(),
715 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
716 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
717 zklogin_sig_count: register_int_counter_with_registry!(
718 "zklogin_sig_count",
719 "Count of zkLogin signatures",
720 registry,
721 )
722 .unwrap(),
723 multisig_sig_count: register_int_counter_with_registry!(
724 "multisig_sig_count",
725 "Count of zkLogin signatures",
726 registry,
727 )
728 .unwrap(),
729 consensus_calculated_throughput: register_int_gauge_with_registry!(
730 "consensus_calculated_throughput",
731 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
732 registry,
733 ).unwrap(),
734 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
735 "consensus_calculated_throughput_profile",
736 "The current active calculated throughput profile",
737 registry
738 ).unwrap(),
739 execution_queueing_latency: LatencyObserver::new(),
740 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
741 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
742 }
743 }
744
745 pub fn reset_on_reconfigure(&self) {
749 self.consensus_committed_messages.reset();
750 self.consensus_handler_scores.reset();
751 self.consensus_committed_user_transactions.reset();
752 }
753}
754
755pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
762
763pub struct AuthorityState {
764 pub name: AuthorityName,
767 pub secret: StableSyncAuthoritySigner,
769
770 input_loader: TransactionInputLoader,
772 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
773
774 epoch_store: ArcSwap<AuthorityPerEpochStore>,
775
776 execution_lock: RwLock<EpochId>,
782
783 pub indexes: Option<Arc<IndexStore>>,
784 pub rest_index: Option<Arc<RestIndexStore>>,
785
786 pub subscription_handler: Arc<SubscriptionHandler>,
787 pub checkpoint_store: Arc<CheckpointStore>,
788
789 committee_store: Arc<CommitteeStore>,
790
791 transaction_manager: Arc<TransactionManager>,
793
794 #[cfg_attr(not(test), expect(unused))]
796 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
797
798 pub metrics: Arc<AuthorityMetrics>,
799 _pruner: AuthorityStorePruner,
800 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
801
802 db_checkpoint_config: DBCheckpointConfig,
804
805 pub config: NodeConfig,
806
807 pub overload_info: AuthorityOverloadInfo,
809
810 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
811
812 chain_identifier: ChainIdentifier,
815
816 pub(crate) congestion_tracker: Arc<CongestionTracker>,
817}
818
819impl AuthorityState {
828 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
829 epoch_store.committee().authority_exists(&self.name)
830 }
831
832 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
833 epoch_store
834 .active_validators()
835 .iter()
836 .any(|a| AuthorityName::from(a) == self.name)
837 }
838
839 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
840 !self.is_committee_validator(epoch_store)
841 }
842
843 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
844 &self.committee_store
845 }
846
847 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
848 self.committee_store.clone()
849 }
850
851 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
852 &self.config.authority_overload_config
853 }
854
855 pub fn get_epoch_state_commitments(
856 &self,
857 epoch: EpochId,
858 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
859 self.checkpoint_store.get_epoch_state_commitments(epoch)
860 }
861
862 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
866 async fn handle_transaction_impl(
867 &self,
868 transaction: VerifiedTransaction,
869 epoch_store: &Arc<AuthorityPerEpochStore>,
870 ) -> IotaResult<VerifiedSignedTransaction> {
871 let _execution_lock = self.execution_lock_for_signing();
873
874 let tx_digest = transaction.digest();
875 let tx_data = transaction.data().transaction_data();
876
877 let input_object_kinds = tx_data.input_objects()?;
878 let receiving_objects_refs = tx_data.receiving_objects();
879
880 iota_transaction_checks::deny::check_transaction_for_signing(
884 tx_data,
885 transaction.tx_signatures(),
886 &input_object_kinds,
887 &receiving_objects_refs,
888 &self.config.transaction_deny_config,
889 self.get_backing_package_store().as_ref(),
890 )?;
891
892 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
893 Some(tx_digest),
894 &input_object_kinds,
895 &receiving_objects_refs,
896 epoch_store.epoch(),
897 )?;
898
899 let (_gas_status, checked_input_objects) =
900 iota_transaction_checks::check_transaction_input(
901 epoch_store.protocol_config(),
902 epoch_store.reference_gas_price(),
903 tx_data,
904 input_objects,
905 &receiving_objects,
906 &self.metrics.bytecode_verifier_metrics,
907 &self.config.verifier_signing_config,
908 )?;
909
910 check_coin_deny_list_v1_during_signing(
911 tx_data.sender(),
912 &checked_input_objects,
913 &receiving_objects,
914 &self.get_object_store(),
915 )?;
916
917 let owned_objects = checked_input_objects.inner().filter_owned_objects();
918
919 let signed_transaction = VerifiedSignedTransaction::new(
920 epoch_store.epoch(),
921 transaction,
922 self.name,
923 &*self.secret,
924 );
925
926 self.get_cache_writer().try_acquire_transaction_locks(
931 epoch_store,
932 &owned_objects,
933 signed_transaction.clone(),
934 )?;
935
936 Ok(signed_transaction)
937 }
938
939 #[instrument(name = "handle_transaction", level = "trace", skip_all)]
941 pub async fn handle_transaction(
942 &self,
943 epoch_store: &Arc<AuthorityPerEpochStore>,
944 transaction: VerifiedTransaction,
945 ) -> IotaResult<HandleTransactionResponse> {
946 let tx_digest = *transaction.digest();
947 debug!("handle_transaction");
948
949 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
951 return Ok(HandleTransactionResponse { status });
952 }
953
954 let _metrics_guard = self
955 .metrics
956 .authority_state_handle_transaction_latency
957 .start_timer();
958 self.metrics.tx_orders.inc();
959
960 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
961 match signed {
962 Ok(s) => {
963 if self.is_committee_validator(epoch_store) {
964 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
965 let tx = s.clone();
966 let validator_tx_finalizer = validator_tx_finalizer.clone();
967 let cache_reader = self.get_transaction_cache_reader().clone();
968 let epoch_store = epoch_store.clone();
969 spawn_monitored_task!(epoch_store.within_alive_epoch(
970 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
971 ));
972 }
973 }
974 Ok(HandleTransactionResponse {
975 status: TransactionStatus::Signed(s.into_inner().into_sig()),
976 })
977 }
978 Err(err) => Ok(HandleTransactionResponse {
982 status: self
983 .get_transaction_status(&tx_digest, epoch_store)?
984 .ok_or(err)?
985 .1,
986 }),
987 }
988 }
989
990 pub fn check_system_overload_at_signing(&self) -> bool {
991 self.config
992 .authority_overload_config
993 .check_system_overload_at_signing
994 }
995
996 pub fn check_system_overload_at_execution(&self) -> bool {
997 self.config
998 .authority_overload_config
999 .check_system_overload_at_execution
1000 }
1001
1002 pub(crate) fn check_system_overload(
1003 &self,
1004 consensus_adapter: &Arc<ConsensusAdapter>,
1005 tx_data: &SenderSignedData,
1006 do_authority_overload_check: bool,
1007 ) -> IotaResult {
1008 if do_authority_overload_check {
1009 self.check_authority_overload(tx_data).tap_err(|_| {
1010 self.update_overload_metrics("execution_queue");
1011 })?;
1012 }
1013 self.transaction_manager
1014 .check_execution_overload(self.overload_config(), tx_data)
1015 .tap_err(|_| {
1016 self.update_overload_metrics("execution_pending");
1017 })?;
1018 consensus_adapter.check_consensus_overload().tap_err(|_| {
1019 self.update_overload_metrics("consensus");
1020 })?;
1021
1022 let pending_tx_count = self
1023 .get_cache_commit()
1024 .approximate_pending_transaction_count();
1025 if pending_tx_count
1026 > self
1027 .config
1028 .execution_cache_config
1029 .writeback_cache
1030 .backpressure_threshold_for_rpc()
1031 {
1032 return Err(IotaError::ValidatorOverloadedRetryAfter {
1033 retry_after_secs: 10,
1034 });
1035 }
1036
1037 Ok(())
1038 }
1039
1040 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1041 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1042 return Ok(());
1043 }
1044
1045 let load_shedding_percentage = self
1046 .overload_info
1047 .load_shedding_percentage
1048 .load(Ordering::Relaxed);
1049 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1050 }
1051
1052 fn update_overload_metrics(&self, source: &str) {
1053 self.metrics
1054 .transaction_overload_sources
1055 .with_label_values(&[source])
1056 .inc();
1057 }
1058
1059 #[instrument(level = "trace", skip_all)]
1061 pub async fn execute_certificate(
1062 &self,
1063 certificate: &VerifiedCertificate,
1064 epoch_store: &Arc<AuthorityPerEpochStore>,
1065 ) -> IotaResult<TransactionEffects> {
1066 let _metrics_guard = if certificate.contains_shared_object() {
1067 self.metrics
1068 .execute_certificate_latency_shared_object
1069 .start_timer()
1070 } else {
1071 self.metrics
1072 .execute_certificate_latency_single_writer
1073 .start_timer()
1074 };
1075 trace!("execute_certificate");
1076
1077 self.metrics.total_cert_attempts.inc();
1078
1079 if !certificate.contains_shared_object() {
1080 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1085 }
1086
1087 epoch_store
1090 .within_alive_epoch(self.notify_read_effects(certificate))
1091 .await
1092 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1093 .and_then(|r| r)
1094 }
1095
1096 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1111 pub fn try_execute_immediately(
1112 &self,
1113 certificate: &VerifiedExecutableTransaction,
1114 expected_effects_digest: Option<TransactionEffectsDigest>,
1115 epoch_store: &Arc<AuthorityPerEpochStore>,
1116 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1117 let _scope = monitored_scope("Execution::try_execute_immediately");
1118 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1119
1120 let tx_digest = certificate.digest();
1121
1122 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1124
1125 if let Some(effects) = self
1128 .get_transaction_cache_reader()
1129 .try_get_executed_effects(tx_digest)?
1130 {
1131 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1132 assert_eq!(
1133 effects.digest(),
1134 expected_effects_digest_inner,
1135 "Unexpected effects digest for transaction {tx_digest:?}"
1136 );
1137 }
1138 tx_guard.release();
1139 return Ok((effects, None));
1140 }
1141 let input_objects =
1142 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1143
1144 let expected_effects_digest =
1150 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1151
1152 self.process_certificate(
1153 tx_guard,
1154 certificate,
1155 input_objects,
1156 expected_effects_digest,
1157 epoch_store,
1158 )
1159 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1160 .tap_ok(
1161 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1162 )
1163 }
1164
1165 pub fn read_objects_for_execution(
1166 &self,
1167 tx_lock: &CertLockGuard,
1168 certificate: &VerifiedExecutableTransaction,
1169 epoch_store: &Arc<AuthorityPerEpochStore>,
1170 ) -> IotaResult<InputObjects> {
1171 let _scope = monitored_scope("Execution::load_input_objects");
1172 let _metrics_guard = self
1173 .metrics
1174 .execution_load_input_objects_latency
1175 .start_timer();
1176 let input_objects = &certificate.data().transaction_data().input_objects()?;
1177 self.input_loader.read_objects_for_execution(
1178 epoch_store,
1179 &certificate.key(),
1180 tx_lock,
1181 input_objects,
1182 epoch_store.epoch(),
1183 )
1184 }
1185
1186 pub fn try_execute_for_test(
1190 &self,
1191 certificate: &VerifiedCertificate,
1192 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1193 let epoch_store = self.epoch_store_for_testing();
1194 let (effects, execution_error_opt) = self.try_execute_immediately(
1195 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1196 None,
1197 &epoch_store,
1198 )?;
1199 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1200 Ok((signed_effects, execution_error_opt))
1201 }
1202
1203 pub fn execute_for_test(
1205 &self,
1206 certificate: &VerifiedCertificate,
1207 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1208 self.try_execute_for_test(certificate)
1209 .expect("try_execute_for_test should not fail")
1210 }
1211
1212 pub async fn notify_read_effects(
1213 &self,
1214 certificate: &VerifiedCertificate,
1215 ) -> IotaResult<TransactionEffects> {
1216 self.get_transaction_cache_reader()
1217 .try_notify_read_executed_effects(&[*certificate.digest()])
1218 .await
1219 .map(|mut r| r.pop().expect("must return correct number of effects"))
1220 }
1221
1222 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1223 self.get_object_cache_reader()
1224 .try_check_owned_objects_are_live(owned_object_refs)
1225 }
1226
1227 pub(crate) fn debug_dump_transaction_state(
1232 &self,
1233 tx_digest: &TransactionDigest,
1234 effects: &TransactionEffects,
1235 expected_effects_digest: TransactionEffectsDigest,
1236 inner_temporary_store: &InnerTemporaryStore,
1237 certificate: &VerifiedExecutableTransaction,
1238 debug_dump_config: &StateDebugDumpConfig,
1239 ) -> IotaResult<PathBuf> {
1240 let dump_dir = debug_dump_config
1241 .dump_file_directory
1242 .as_ref()
1243 .cloned()
1244 .unwrap_or(std::env::temp_dir());
1245 let epoch_store = self.load_epoch_store_one_call_per_task();
1246
1247 NodeStateDump::new(
1248 tx_digest,
1249 effects,
1250 expected_effects_digest,
1251 self.get_object_store().as_ref(),
1252 &epoch_store,
1253 inner_temporary_store,
1254 certificate,
1255 )?
1256 .write_to_file(&dump_dir)
1257 .map_err(|e| IotaError::FileIO(e.to_string()))
1258 }
1259
1260 #[instrument(level = "trace", skip_all)]
1261 pub(crate) fn process_certificate(
1262 &self,
1263 tx_guard: CertTxGuard,
1264 certificate: &VerifiedExecutableTransaction,
1265 input_objects: InputObjects,
1266 expected_effects_digest: Option<TransactionEffectsDigest>,
1267 epoch_store: &Arc<AuthorityPerEpochStore>,
1268 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1269 let process_certificate_start_time = tokio::time::Instant::now();
1270 let digest = *certificate.digest();
1271
1272 fail_point_if!("correlated-crash-process-certificate", || {
1273 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1274 iota_simulator::task::kill_current_node(None);
1275 }
1276 });
1277
1278 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1279 let execution_guard = match execution_guard {
1285 Ok(execution_guard) => execution_guard,
1286 Err(err) => {
1287 tx_guard.release();
1288 return Err(err);
1289 }
1290 };
1291 if *execution_guard != epoch_store.epoch() {
1295 tx_guard.release();
1296 info!("The epoch of the execution_guard doesn't match the epoch store");
1297 return Err(IotaError::WrongEpoch {
1298 expected_epoch: epoch_store.epoch(),
1299 actual_epoch: *execution_guard,
1300 });
1301 }
1302
1303 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1309 &execution_guard,
1310 certificate,
1311 input_objects,
1312 epoch_store,
1313 ) {
1314 Err(e) => {
1315 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1316 tx_guard.release();
1317 return Err(e);
1318 }
1319 Ok(res) => res,
1320 };
1321
1322 if let Some(expected_effects_digest) = expected_effects_digest {
1323 if effects.digest() != expected_effects_digest {
1324 match self.debug_dump_transaction_state(
1326 &digest,
1327 &effects,
1328 expected_effects_digest,
1329 &inner_temporary_store,
1330 certificate,
1331 &self.config.state_debug_dump_config,
1332 ) {
1333 Ok(out_path) => {
1334 info!(
1335 "Dumped node state for transaction {} to {}",
1336 digest,
1337 out_path.as_path().display().to_string()
1338 );
1339 }
1340 Err(e) => {
1341 error!("Error dumping state for transaction {}: {e}", digest);
1342 }
1343 }
1344 error!(
1345 tx_digest = ?digest,
1346 ?expected_effects_digest,
1347 actual_effects = ?effects,
1348 "fork detected!"
1349 );
1350 panic!(
1351 "Transaction {} is expected to have effects digest {}, but got {}!",
1352 digest,
1353 expected_effects_digest,
1354 effects.digest(),
1355 );
1356 }
1357 }
1358
1359 fail_point!("crash");
1360
1361 self.commit_certificate(
1362 certificate,
1363 inner_temporary_store,
1364 &effects,
1365 tx_guard,
1366 execution_guard,
1367 epoch_store,
1368 )?;
1369
1370 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1371 certificate.data().transaction_data().kind()
1372 {
1373 if let Some(err) = &execution_error_opt {
1374 debug_fatal!("Authenticator state update failed: {:?}", err);
1375 }
1376 epoch_store.update_authenticator_state(auth_state);
1377
1378 if cfg!(debug_assertions) {
1381 let authenticator_state = get_authenticator_state(self.get_object_store())
1382 .expect("Read cannot fail")
1383 .expect("Authenticator state must exist");
1384
1385 let mut sys_jwks: Vec<_> = authenticator_state
1386 .active_jwks
1387 .into_iter()
1388 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1389 .collect();
1390 let mut active_jwks: Vec<_> = epoch_store
1391 .signature_verifier
1392 .get_jwks()
1393 .into_iter()
1394 .collect();
1395 sys_jwks.sort();
1396 active_jwks.sort();
1397
1398 assert_eq!(sys_jwks, active_jwks);
1399 }
1400 }
1401
1402 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1403 if elapsed > 0.0 {
1404 self.metrics
1405 .execution_gas_latency_ratio
1406 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1407 };
1408 Ok((effects, execution_error_opt))
1409 }
1410
1411 #[instrument(level = "trace", skip_all)]
1412 fn commit_certificate(
1413 &self,
1414 certificate: &VerifiedExecutableTransaction,
1415 inner_temporary_store: InnerTemporaryStore,
1416 effects: &TransactionEffects,
1417 tx_guard: CertTxGuard,
1418 _execution_guard: ExecutionLockReadGuard<'_>,
1419 epoch_store: &Arc<AuthorityPerEpochStore>,
1420 ) -> IotaResult {
1421 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1422 monitored_scope("Execution::commit_certificate");
1423 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1424
1425 let tx_key = certificate.key();
1426 let tx_digest = certificate.digest();
1427 let input_object_count = inner_temporary_store.input_objects.len();
1428 let shared_object_count = effects.input_shared_objects().len();
1429
1430 let output_keys = inner_temporary_store.get_output_keys(effects);
1431
1432 let _ = self
1434 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1435 .tap_err(|e| {
1436 self.metrics.post_processing_total_failures.inc();
1437 error!(?tx_digest, "tx post processing failed: {e}");
1438 });
1439
1440 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1444
1445 fail_point!("crash");
1447
1448 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1449 certificate.clone().into_unsigned(),
1450 effects.clone(),
1451 inner_temporary_store,
1452 );
1453 self.get_cache_writer()
1454 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1455
1456 if certificate.transaction_data().is_end_of_epoch_tx() {
1457 self.get_object_cache_reader()
1460 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1461 }
1462
1463 tx_guard.commit_tx();
1465
1466 self.transaction_manager
1470 .notify_commit(tx_digest, output_keys, epoch_store);
1471
1472 self.update_metrics(certificate, input_object_count, shared_object_count);
1473
1474 Ok(())
1475 }
1476
1477 fn update_metrics(
1478 &self,
1479 certificate: &VerifiedExecutableTransaction,
1480 input_object_count: usize,
1481 shared_object_count: usize,
1482 ) {
1483 if certificate.has_zklogin_sig() {
1485 self.metrics.zklogin_sig_count.inc();
1486 } else if certificate.has_upgraded_multisig() {
1487 self.metrics.multisig_sig_count.inc();
1488 }
1489
1490 self.metrics.total_effects.inc();
1491 self.metrics.total_certs.inc();
1492
1493 if shared_object_count > 0 {
1494 self.metrics.shared_obj_tx.inc();
1495 }
1496
1497 if certificate.is_sponsored_tx() {
1498 self.metrics.sponsored_tx.inc();
1499 }
1500
1501 self.metrics
1502 .num_input_objs
1503 .observe(input_object_count as f64);
1504 self.metrics
1505 .num_shared_objects
1506 .observe(shared_object_count as f64);
1507 self.metrics.batch_size.observe(
1508 certificate
1509 .data()
1510 .intent_message()
1511 .value
1512 .kind()
1513 .num_commands() as f64,
1514 );
1515 }
1516
1517 #[instrument(level = "trace", skip_all)]
1529 fn prepare_certificate(
1530 &self,
1531 _execution_guard: &ExecutionLockReadGuard<'_>,
1532 certificate: &VerifiedExecutableTransaction,
1533 input_objects: InputObjects,
1534 epoch_store: &Arc<AuthorityPerEpochStore>,
1535 ) -> IotaResult<(
1536 InnerTemporaryStore,
1537 TransactionEffects,
1538 Option<ExecutionError>,
1539 )> {
1540 let _scope = monitored_scope("Execution::prepare_certificate");
1541 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1542 let prepare_certificate_start_time = tokio::time::Instant::now();
1543
1544 let tx_data = certificate.data().transaction_data();
1547 tx_data.validity_check(epoch_store.protocol_config())?;
1548
1549 let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1552 certificate,
1553 input_objects,
1554 epoch_store.protocol_config(),
1555 epoch_store.reference_gas_price(),
1556 )?;
1557
1558 let owned_object_refs = input_objects.inner().filter_owned_objects();
1559 self.check_owned_locks(&owned_object_refs)?;
1560 let tx_digest = *certificate.digest();
1561 let protocol_config = epoch_store.protocol_config();
1562 let transaction_data = &certificate.data().intent_message().value;
1563 let (kind, signer, gas) = transaction_data.execution_parts();
1564
1565 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1566 let (inner_temp_store, _, mut effects, execution_error_opt) =
1567 epoch_store.executor().execute_transaction_to_effects(
1568 self.get_backing_store().as_ref(),
1569 protocol_config,
1570 self.metrics.limits_metrics.clone(),
1571 self.config
1574 .expensive_safety_check_config
1575 .enable_deep_per_tx_iota_conservation_check(),
1576 self.config.certificate_deny_config.certificate_deny_set(),
1577 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1578 epoch_store
1579 .epoch_start_config()
1580 .epoch_data()
1581 .epoch_start_timestamp(),
1582 input_objects,
1583 gas,
1584 gas_status,
1585 kind,
1586 signer,
1587 tx_digest,
1588 &mut None,
1589 );
1590
1591 fail_point_if!("cp_execution_nondeterminism", || {
1592 #[cfg(msim)]
1593 self.create_fail_state(certificate, epoch_store, &mut effects);
1594 });
1595
1596 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1597 if elapsed > 0.0 {
1598 self.metrics
1599 .prepare_cert_gas_latency_ratio
1600 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1601 }
1602
1603 Ok((inner_temp_store, effects, execution_error_opt.err()))
1604 }
1605
1606 pub fn prepare_certificate_for_benchmark(
1607 &self,
1608 certificate: &VerifiedExecutableTransaction,
1609 input_objects: InputObjects,
1610 epoch_store: &Arc<AuthorityPerEpochStore>,
1611 ) -> IotaResult<(
1612 InnerTemporaryStore,
1613 TransactionEffects,
1614 Option<ExecutionError>,
1615 )> {
1616 let lock = RwLock::new(epoch_store.epoch());
1617 let execution_guard = lock.try_read().unwrap();
1618
1619 self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1620 }
1621
1622 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1623 #[allow(clippy::type_complexity)]
1624 pub fn dry_exec_transaction(
1625 &self,
1626 transaction: TransactionData,
1627 transaction_digest: TransactionDigest,
1628 ) -> IotaResult<(
1629 DryRunTransactionBlockResponse,
1630 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1631 TransactionEffects,
1632 Option<ObjectID>,
1633 )> {
1634 let epoch_store = self.load_epoch_store_one_call_per_task();
1635 if !self.is_fullnode(&epoch_store) {
1636 return Err(IotaError::UnsupportedFeature {
1637 error: "dry-exec is only supported on fullnodes".to_string(),
1638 });
1639 }
1640
1641 if transaction.kind().is_system_tx() {
1642 return Err(IotaError::UnsupportedFeature {
1643 error: "dry-exec does not support system transactions".to_string(),
1644 });
1645 }
1646
1647 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1648 }
1649
1650 #[allow(clippy::type_complexity)]
1651 pub fn dry_exec_transaction_for_benchmark(
1652 &self,
1653 transaction: TransactionData,
1654 transaction_digest: TransactionDigest,
1655 ) -> IotaResult<(
1656 DryRunTransactionBlockResponse,
1657 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1658 TransactionEffects,
1659 Option<ObjectID>,
1660 )> {
1661 let epoch_store = self.load_epoch_store_one_call_per_task();
1662 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1663 }
1664
1665 #[instrument(level = "trace", skip_all)]
1666 #[allow(clippy::type_complexity)]
1667 fn dry_exec_transaction_impl(
1668 &self,
1669 epoch_store: &AuthorityPerEpochStore,
1670 transaction: TransactionData,
1671 transaction_digest: TransactionDigest,
1672 ) -> IotaResult<(
1673 DryRunTransactionBlockResponse,
1674 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1675 TransactionEffects,
1676 Option<ObjectID>,
1677 )> {
1678 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1680
1681 let input_object_kinds = transaction.input_objects()?;
1682 let receiving_object_refs = transaction.receiving_objects();
1683
1684 iota_transaction_checks::deny::check_transaction_for_signing(
1685 &transaction,
1686 &[],
1687 &input_object_kinds,
1688 &receiving_object_refs,
1689 &self.config.transaction_deny_config,
1690 self.get_backing_package_store().as_ref(),
1691 )?;
1692
1693 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1694 None,
1696 &input_object_kinds,
1697 &receiving_object_refs,
1698 epoch_store.epoch(),
1699 )?;
1700
1701 let mut transaction = transaction;
1703 let mut gas_object_refs = transaction.gas().to_vec();
1704 let reference_gas_price = epoch_store.reference_gas_price();
1705 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1706 let sender = transaction.gas_owner();
1707 let gas_object_id = ObjectID::random();
1708 let gas_object = Object::new_move(
1709 MoveObject::new_gas_coin(
1710 OBJECT_START_VERSION,
1711 gas_object_id,
1712 SIMULATION_GAS_COIN_VALUE,
1713 ),
1714 Owner::AddressOwner(sender),
1715 TransactionDigest::genesis_marker(),
1716 );
1717 let gas_object_ref = gas_object.compute_object_reference();
1718 gas_object_refs = vec![gas_object_ref];
1719 transaction.gas_data_mut().payment = gas_object_refs.clone();
1721 (
1722 iota_transaction_checks::check_transaction_input_with_given_gas(
1723 epoch_store.protocol_config(),
1724 reference_gas_price,
1725 &transaction,
1726 input_objects,
1727 receiving_objects,
1728 gas_object,
1729 &self.metrics.bytecode_verifier_metrics,
1730 &self.config.verifier_signing_config,
1731 )?,
1732 Some(gas_object_id),
1733 )
1734 } else {
1735 (
1736 iota_transaction_checks::check_transaction_input(
1737 epoch_store.protocol_config(),
1738 reference_gas_price,
1739 &transaction,
1740 input_objects,
1741 &receiving_objects,
1742 &self.metrics.bytecode_verifier_metrics,
1743 &self.config.verifier_signing_config,
1744 )?,
1745 None,
1746 )
1747 };
1748
1749 let protocol_config = epoch_store.protocol_config();
1750 let (kind, signer, _) = transaction.execution_parts();
1751
1752 let silent = true;
1753 let executor = iota_execution::executor(protocol_config, silent, None)
1754 .expect("Creating an executor should not fail here");
1755
1756 let expensive_checks = false;
1757 let (inner_temp_store, _, effects, _execution_error) = executor
1758 .execute_transaction_to_effects(
1759 self.get_backing_store().as_ref(),
1760 protocol_config,
1761 self.metrics.limits_metrics.clone(),
1762 expensive_checks,
1763 self.config.certificate_deny_config.certificate_deny_set(),
1764 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1765 epoch_store
1766 .epoch_start_config()
1767 .epoch_data()
1768 .epoch_start_timestamp(),
1769 checked_input_objects,
1770 gas_object_refs,
1771 gas_status,
1772 kind,
1773 signer,
1774 transaction_digest,
1775 &mut None,
1776 );
1777 let tx_digest = *effects.transaction_digest();
1778
1779 let module_cache =
1780 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1781
1782 let mut layout_resolver =
1783 epoch_store
1784 .executor()
1785 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1786 &inner_temp_store,
1787 self.get_backing_package_store(),
1788 )));
1789 let object_changes = Vec::new();
1791
1792 let balance_changes = Vec::new();
1794
1795 let written_with_kind = effects
1796 .created()
1797 .into_iter()
1798 .map(|(oref, _)| (oref, WriteKind::Create))
1799 .chain(
1800 effects
1801 .unwrapped()
1802 .into_iter()
1803 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1804 )
1805 .chain(
1806 effects
1807 .mutated()
1808 .into_iter()
1809 .map(|(oref, _)| (oref, WriteKind::Mutate)),
1810 )
1811 .map(|(oref, kind)| {
1812 let obj = inner_temp_store.written.get(&oref.0).unwrap();
1813 (oref.0, (oref, obj.clone(), kind))
1815 })
1816 .collect();
1817
1818 Ok((
1819 DryRunTransactionBlockResponse {
1820 suggested_gas_price: self
1822 .congestion_tracker
1823 .get_prediction_suggested_gas_price(&transaction),
1824 input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1825 .map_err(|e| IotaError::TransactionSerialization {
1826 error: format!(
1827 "Failed to convert transaction to IotaTransactionBlockData: {e}",
1828 ),
1829 })?, effects: effects.clone().try_into()?,
1831 events: IotaTransactionBlockEvents::try_from(
1832 inner_temp_store.events.clone(),
1833 tx_digest,
1834 None,
1835 layout_resolver.as_mut(),
1836 )?,
1837 object_changes,
1838 balance_changes,
1839 },
1840 written_with_kind,
1841 effects,
1842 mock_gas,
1843 ))
1844 }
1845
1846 pub fn simulate_transaction(
1847 &self,
1848 transaction: TransactionData,
1849 ) -> IotaResult<SimulateTransactionResult> {
1850 if transaction.kind().is_system_tx() {
1851 return Err(IotaError::UnsupportedFeature {
1852 error: "simulate does not support system transactions".to_string(),
1853 });
1854 }
1855
1856 let epoch_store = self.load_epoch_store_one_call_per_task();
1857 if !self.is_fullnode(&epoch_store) {
1858 return Err(IotaError::UnsupportedFeature {
1859 error: "simulate is only supported on fullnodes".to_string(),
1860 });
1861 }
1862
1863 self.simulate_transaction_impl(&epoch_store, transaction)
1864 }
1865
1866 fn simulate_transaction_impl(
1867 &self,
1868 epoch_store: &AuthorityPerEpochStore,
1869 transaction: TransactionData,
1870 ) -> IotaResult<SimulateTransactionResult> {
1871 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1873
1874 let input_object_kinds = transaction.input_objects()?;
1875 let receiving_object_refs = transaction.receiving_objects();
1876
1877 iota_transaction_checks::deny::check_transaction_for_signing(
1878 &transaction,
1879 &[],
1880 &input_object_kinds,
1881 &receiving_object_refs,
1882 &self.config.transaction_deny_config,
1883 self.get_backing_package_store().as_ref(),
1884 )?;
1885
1886 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1887 None,
1889 &input_object_kinds,
1890 &receiving_object_refs,
1891 epoch_store.epoch(),
1892 )?;
1893 let mut transaction = transaction;
1895 let mut gas_object_refs = transaction.gas().to_vec();
1896 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1897 let sender = transaction.gas_owner();
1898 let gas_object_id = ObjectID::MAX;
1899 let gas_object = Object::new_move(
1900 MoveObject::new_gas_coin(
1901 OBJECT_START_VERSION,
1902 gas_object_id,
1903 SIMULATION_GAS_COIN_VALUE,
1904 ),
1905 Owner::AddressOwner(sender),
1906 TransactionDigest::genesis_marker(),
1907 );
1908 let gas_object_ref = gas_object.compute_object_reference();
1909 gas_object_refs = vec![gas_object_ref];
1910
1911 transaction.gas_data_mut().payment = gas_object_refs.clone();
1913 (
1914 iota_transaction_checks::check_transaction_input_with_given_gas(
1915 epoch_store.protocol_config(),
1916 epoch_store.reference_gas_price(),
1917 &transaction,
1918 input_objects,
1919 receiving_objects,
1920 gas_object,
1921 &self.metrics.bytecode_verifier_metrics,
1922 &self.config.verifier_signing_config,
1923 )?,
1924 Some(gas_object_id),
1925 )
1926 } else {
1927 (
1928 iota_transaction_checks::check_transaction_input(
1929 epoch_store.protocol_config(),
1930 epoch_store.reference_gas_price(),
1931 &transaction,
1932 input_objects,
1933 &receiving_objects,
1934 &self.metrics.bytecode_verifier_metrics,
1935 &self.config.verifier_signing_config,
1936 )?,
1937 None,
1938 )
1939 };
1940
1941 let protocol_config = epoch_store.protocol_config();
1942 let (kind, signer, _) = transaction.execution_parts();
1943
1944 let silent = true;
1945 let executor = iota_execution::executor(protocol_config, silent, None)
1946 .expect("Creating an executor should not fail here");
1947
1948 let expensive_checks = false;
1949 let (inner_temp_store, _, effects, _execution_error) = executor
1950 .execute_transaction_to_effects(
1951 self.get_backing_store().as_ref(),
1952 protocol_config,
1953 self.metrics.limits_metrics.clone(),
1954 expensive_checks,
1955 self.config.certificate_deny_config.certificate_deny_set(),
1956 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1957 epoch_store
1958 .epoch_start_config()
1959 .epoch_data()
1960 .epoch_start_timestamp(),
1961 checked_input_objects,
1962 gas_object_refs,
1963 gas_status,
1964 kind,
1965 signer,
1966 transaction.digest(),
1967 &mut None,
1968 );
1969
1970 Ok(SimulateTransactionResult {
1971 input_objects: inner_temp_store.input_objects,
1972 output_objects: inner_temp_store.written,
1973 events: effects.events_digest().map(|_| inner_temp_store.events),
1974 effects,
1975 mock_gas_id: mock_gas,
1976 })
1977 }
1978
1979 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
1981 pub async fn dev_inspect_transaction_block(
1982 &self,
1983 sender: IotaAddress,
1984 transaction_kind: TransactionKind,
1985 gas_price: Option<u64>,
1986 gas_budget: Option<u64>,
1987 gas_sponsor: Option<IotaAddress>,
1988 gas_objects: Option<Vec<ObjectRef>>,
1989 show_raw_txn_data_and_effects: Option<bool>,
1990 skip_checks: Option<bool>,
1991 ) -> IotaResult<DevInspectResults> {
1992 let epoch_store = self.load_epoch_store_one_call_per_task();
1993
1994 if !self.is_fullnode(&epoch_store) {
1995 return Err(IotaError::UnsupportedFeature {
1996 error: "dev-inspect is only supported on fullnodes".to_string(),
1997 });
1998 }
1999
2000 if transaction_kind.is_system_tx() {
2001 return Err(IotaError::UnsupportedFeature {
2002 error: "system transactions are not supported".to_string(),
2003 });
2004 }
2005
2006 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2007 let skip_checks = skip_checks.unwrap_or(true);
2008 let reference_gas_price = epoch_store.reference_gas_price();
2009 let protocol_config = epoch_store.protocol_config();
2010 let max_tx_gas = protocol_config.max_tx_gas();
2011
2012 let price = gas_price.unwrap_or(reference_gas_price);
2013 let budget = gas_budget.unwrap_or(max_tx_gas);
2014 let owner = gas_sponsor.unwrap_or(sender);
2015 let payment = gas_objects.unwrap_or_default();
2018 let transaction = TransactionData::V1(TransactionDataV1 {
2019 kind: transaction_kind.clone(),
2020 sender,
2021 gas_data: GasData {
2022 payment,
2023 owner,
2024 price,
2025 budget,
2026 },
2027 expiration: TransactionExpiration::None,
2028 });
2029
2030 let raw_txn_data = if show_raw_txn_data_and_effects {
2031 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2032 error: "Failed to serialize transaction during dev inspect".to_string(),
2033 })?
2034 } else {
2035 vec![]
2036 };
2037
2038 transaction.validity_check_no_gas_check(protocol_config)?;
2039
2040 let input_object_kinds = transaction.input_objects()?;
2041 let receiving_object_refs = transaction.receiving_objects();
2042
2043 iota_transaction_checks::deny::check_transaction_for_signing(
2044 &transaction,
2045 &[],
2046 &input_object_kinds,
2047 &receiving_object_refs,
2048 &self.config.transaction_deny_config,
2049 self.get_backing_package_store().as_ref(),
2050 )?;
2051
2052 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2053 None,
2055 &input_object_kinds,
2056 &receiving_object_refs,
2057 epoch_store.epoch(),
2058 )?;
2059
2060 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2062 SIMULATION_GAS_COIN_VALUE,
2063 transaction.gas_owner(),
2064 );
2065
2066 let gas_objects = if transaction.gas().is_empty() {
2067 let gas_object_ref = dummy_gas_object.compute_object_reference();
2068 vec![gas_object_ref]
2069 } else {
2070 transaction.gas().to_vec()
2071 };
2072
2073 let (gas_status, checked_input_objects) = if skip_checks {
2074 if transaction.gas().is_empty() {
2079 input_objects.push(ObjectReadResult::new(
2080 InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2081 dummy_gas_object.into(),
2082 ));
2083 }
2084 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2085 protocol_config,
2086 &transaction_kind,
2087 input_objects,
2088 receiving_objects,
2089 )?;
2090 let gas_status = IotaGasStatus::new(
2091 max_tx_gas,
2092 transaction.gas_price(),
2093 reference_gas_price,
2094 protocol_config,
2095 )?;
2096
2097 (gas_status, checked_input_objects)
2098 } else {
2099 if transaction.gas().is_empty() {
2103 iota_transaction_checks::check_transaction_input_with_given_gas(
2104 epoch_store.protocol_config(),
2105 reference_gas_price,
2106 &transaction,
2107 input_objects,
2108 receiving_objects,
2109 dummy_gas_object,
2110 &self.metrics.bytecode_verifier_metrics,
2111 &self.config.verifier_signing_config,
2112 )?
2113 } else {
2114 iota_transaction_checks::check_transaction_input(
2115 epoch_store.protocol_config(),
2116 reference_gas_price,
2117 &transaction,
2118 input_objects,
2119 &receiving_objects,
2120 &self.metrics.bytecode_verifier_metrics,
2121 &self.config.verifier_signing_config,
2122 )?
2123 }
2124 };
2125
2126 let executor = iota_execution::executor(protocol_config, true, None)
2127 .expect("Creating an executor should not fail here");
2128 let intent_msg = IntentMessage::new(
2129 Intent {
2130 version: IntentVersion::V0,
2131 scope: IntentScope::TransactionData,
2132 app_id: AppId::Iota,
2133 },
2134 transaction,
2135 );
2136 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2137 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2138 self.get_backing_store().as_ref(),
2139 protocol_config,
2140 self.metrics.limits_metrics.clone(),
2141 false,
2143 self.config.certificate_deny_config.certificate_deny_set(),
2144 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2145 epoch_store
2146 .epoch_start_config()
2147 .epoch_data()
2148 .epoch_start_timestamp(),
2149 checked_input_objects,
2150 gas_objects,
2151 gas_status,
2152 transaction_kind,
2153 sender,
2154 transaction_digest,
2155 skip_checks,
2156 );
2157
2158 let raw_effects = if show_raw_txn_data_and_effects {
2159 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2160 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2161 })?
2162 } else {
2163 vec![]
2164 };
2165
2166 let mut layout_resolver =
2167 epoch_store
2168 .executor()
2169 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2170 &inner_temp_store,
2171 self.get_backing_package_store(),
2172 )));
2173
2174 DevInspectResults::new(
2175 effects,
2176 inner_temp_store.events.clone(),
2177 execution_result,
2178 raw_txn_data,
2179 raw_effects,
2180 layout_resolver.as_mut(),
2181 )
2182 }
2183
2184 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2186 let epoch_store = self.epoch_store_for_testing();
2187 Ok(epoch_store.reference_gas_price())
2188 }
2189
2190 #[instrument(level = "trace", skip_all)]
2191 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2192 self.get_transaction_cache_reader()
2193 .try_is_tx_already_executed(digest)
2194 }
2195
2196 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2198 self.try_is_tx_already_executed(digest)
2199 .expect("storage access failed")
2200 }
2201
2202 #[instrument(level = "debug", skip_all, err)]
2204 fn index_tx(
2205 &self,
2206 indexes: &IndexStore,
2207 digest: &TransactionDigest,
2208 cert: &VerifiedExecutableTransaction,
2210 effects: &TransactionEffects,
2211 events: &TransactionEvents,
2212 timestamp_ms: u64,
2213 tx_coins: Option<TxCoins>,
2214 written: &WrittenObjects,
2215 inner_temporary_store: &InnerTemporaryStore,
2216 ) -> IotaResult<u64> {
2217 let changes = self
2218 .process_object_index(effects, written, inner_temporary_store)
2219 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2220
2221 indexes.index_tx(
2222 cert.data().intent_message().value.sender(),
2223 cert.data()
2224 .intent_message()
2225 .value
2226 .input_objects()?
2227 .iter()
2228 .map(|o| o.object_id()),
2229 effects
2230 .all_changed_objects()
2231 .into_iter()
2232 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2233 cert.data()
2234 .intent_message()
2235 .value
2236 .move_calls()
2237 .into_iter()
2238 .map(|(package, module, function)| {
2239 (*package, module.to_owned(), function.to_owned())
2240 }),
2241 events,
2242 changes,
2243 digest,
2244 timestamp_ms,
2245 tx_coins,
2246 )
2247 }
2248
2249 #[cfg(msim)]
2250 fn create_fail_state(
2251 &self,
2252 certificate: &VerifiedExecutableTransaction,
2253 epoch_store: &Arc<AuthorityPerEpochStore>,
2254 effects: &mut TransactionEffects,
2255 ) {
2256 use std::cell::RefCell;
2257 thread_local! {
2258 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2259 }
2260 if !certificate.data().intent_message().value.is_system_tx() {
2261 let committee = epoch_store.committee();
2262 let cur_stake = (**committee).weight(&self.name);
2263 if cur_stake > 0 {
2264 FAIL_STATE.with_borrow_mut(|fail_state| {
2265 if fail_state.0 < committee.validity_threshold() {
2267 fail_state.0 += cur_stake;
2268 fail_state.1.insert(self.name);
2269 }
2270
2271 if fail_state.1.contains(&self.name) {
2272 info!("cp_exec failing tx");
2273 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2274 }
2275 });
2276 }
2277 }
2278 }
2279
2280 fn process_object_index(
2281 &self,
2282 effects: &TransactionEffects,
2283 written: &WrittenObjects,
2284 inner_temporary_store: &InnerTemporaryStore,
2285 ) -> IotaResult<ObjectIndexChanges> {
2286 let epoch_store = self.load_epoch_store_one_call_per_task();
2287 let mut layout_resolver =
2288 epoch_store
2289 .executor()
2290 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2291 inner_temporary_store,
2292 self.get_backing_package_store(),
2293 )));
2294
2295 let modified_at_version = effects
2296 .modified_at_versions()
2297 .into_iter()
2298 .collect::<HashMap<_, _>>();
2299
2300 let tx_digest = effects.transaction_digest();
2301 let mut deleted_owners = vec![];
2302 let mut deleted_dynamic_fields = vec![];
2303 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2304 let old_version = modified_at_version.get(&id).unwrap();
2305 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2308 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2309 ) {
2310 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2311 Owner::ObjectOwner(object_id) => {
2312 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2313 }
2314 _ => {}
2315 }
2316 }
2317
2318 let mut new_owners = vec![];
2319 let mut new_dynamic_fields = vec![];
2320
2321 for (oref, owner, kind) in effects.all_changed_objects() {
2322 let id = &oref.0;
2323 if let WriteKind::Mutate = kind {
2326 let Some(old_version) = modified_at_version.get(id) else {
2327 panic!(
2328 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2329 );
2330 };
2331 let Some(old_object) = self
2334 .get_object_store()
2335 .try_get_object_by_key(id, *old_version)?
2336 else {
2337 panic!(
2338 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2339 );
2340 };
2341 if old_object.owner != owner {
2342 match old_object.owner {
2343 Owner::AddressOwner(addr) => {
2344 deleted_owners.push((addr, *id));
2345 }
2346 Owner::ObjectOwner(object_id) => {
2347 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2348 }
2349 _ => {}
2350 }
2351 }
2352 }
2353
2354 match owner {
2355 Owner::AddressOwner(addr) => {
2356 let new_object = written.get(id).unwrap_or_else(
2359 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2360 );
2361 assert_eq!(
2362 new_object.version(),
2363 oref.1,
2364 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2365 tx_digest,
2366 id,
2367 new_object.version(),
2368 oref.1
2369 );
2370
2371 let type_ = new_object
2372 .type_()
2373 .map(|type_| ObjectType::Struct(type_.clone()))
2374 .unwrap_or(ObjectType::Package);
2375
2376 new_owners.push((
2377 (addr, *id),
2378 ObjectInfo {
2379 object_id: *id,
2380 version: oref.1,
2381 digest: oref.2,
2382 type_,
2383 owner,
2384 previous_transaction: *effects.transaction_digest(),
2385 },
2386 ));
2387 }
2388 Owner::ObjectOwner(owner) => {
2389 let new_object = written.get(id).unwrap_or_else(
2390 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2391 );
2392 assert_eq!(
2393 new_object.version(),
2394 oref.1,
2395 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2396 tx_digest,
2397 id,
2398 new_object.version(),
2399 oref.1
2400 );
2401
2402 let Some(df_info) = self
2403 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2404 .unwrap_or_else(|e| {
2405 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2406 None
2407 }
2408 )
2409 else {
2410 continue;
2412 };
2413 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2414 }
2415 _ => {}
2416 }
2417 }
2418
2419 Ok(ObjectIndexChanges {
2420 deleted_owners,
2421 deleted_dynamic_fields,
2422 new_owners,
2423 new_dynamic_fields,
2424 })
2425 }
2426
2427 fn try_create_dynamic_field_info(
2428 &self,
2429 o: &Object,
2430 written: &WrittenObjects,
2431 resolver: &mut dyn LayoutResolver,
2432 get_latest_object_version: bool,
2433 ) -> IotaResult<Option<DynamicFieldInfo>> {
2434 let Some(move_object) = o.data.try_as_move().cloned() else {
2436 return Ok(None);
2437 };
2438
2439 if !move_object.type_().is_dynamic_field() {
2441 return Ok(None);
2442 }
2443
2444 let layout = resolver
2445 .get_annotated_layout(&move_object.type_().clone().into())?
2446 .into_layout();
2447
2448 let field =
2449 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2450 IotaError::ObjectDeserialization {
2451 error: e.to_string(),
2452 }
2453 })?;
2454
2455 let type_ = field.kind;
2456 let name_type: TypeTag = field.name_layout.into();
2457 let bcs_name = field.name_bytes.to_owned();
2458
2459 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2460 .map_err(|e| {
2461 warn!("{e}");
2462 IotaError::ObjectDeserialization {
2463 error: e.to_string(),
2464 }
2465 })?;
2466
2467 let name = DynamicFieldName {
2468 type_: name_type,
2469 value: IotaMoveValue::from(name_value).to_json_value(),
2470 };
2471
2472 let value_metadata = field.value_metadata().map_err(|e| {
2473 warn!("{e}");
2474 IotaError::ObjectDeserialization {
2475 error: e.to_string(),
2476 }
2477 })?;
2478
2479 Ok(Some(match value_metadata {
2480 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2481 name,
2482 bcs_name,
2483 type_,
2484 object_type: object_type.to_canonical_string(true),
2485 object_id: o.id(),
2486 version: o.version(),
2487 digest: o.digest(),
2488 },
2489
2490 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2491 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2496 (
2497 object.version(),
2498 object.digest(),
2499 object.data.type_().unwrap().clone(),
2500 )
2501 } else {
2502 let object = if get_latest_object_version {
2504 self.get_object_store()
2511 .try_get_object(&object_id)?
2512 .ok_or_else(|| UserInputError::ObjectNotFound {
2513 object_id,
2514 version: Some(o.version()),
2515 })?
2516 } else {
2517 self.get_object_store()
2519 .try_get_object_by_key(&object_id, o.version())?
2520 .ok_or_else(|| UserInputError::ObjectNotFound {
2521 object_id,
2522 version: Some(o.version()),
2523 })?
2524 };
2525
2526 (
2527 object.version(),
2528 object.digest(),
2529 object.data.type_().unwrap().clone(),
2530 )
2531 };
2532
2533 DynamicFieldInfo {
2534 name,
2535 bcs_name,
2536 type_,
2537 object_type: object_type.to_string(),
2538 object_id,
2539 version,
2540 digest,
2541 }
2542 }
2543 }))
2544 }
2545
2546 #[instrument(level = "trace", skip_all, err)]
2547 fn post_process_one_tx(
2548 &self,
2549 certificate: &VerifiedExecutableTransaction,
2550 effects: &TransactionEffects,
2551 inner_temporary_store: &InnerTemporaryStore,
2552 epoch_store: &Arc<AuthorityPerEpochStore>,
2553 ) -> IotaResult {
2554 if self.indexes.is_none() {
2555 return Ok(());
2556 }
2557
2558 let tx_digest = certificate.digest();
2559 let timestamp_ms = Self::unixtime_now_ms();
2560 let events = &inner_temporary_store.events;
2561 let written = &inner_temporary_store.written;
2562 let tx_coins =
2563 self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2564
2565 if let Some(indexes) = &self.indexes {
2567 let _ = self
2568 .index_tx(
2569 indexes.as_ref(),
2570 tx_digest,
2571 certificate,
2572 effects,
2573 events,
2574 timestamp_ms,
2575 tx_coins,
2576 written,
2577 inner_temporary_store,
2578 )
2579 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2580 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2581 .expect("Indexing tx should not fail");
2582
2583 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2584 let events = self.make_transaction_block_events(
2585 events.clone(),
2586 *tx_digest,
2587 timestamp_ms,
2588 epoch_store,
2589 inner_temporary_store,
2590 )?;
2591 self.subscription_handler
2593 .process_tx(certificate.data().transaction_data(), &effects, &events)
2594 .tap_ok(|_| {
2595 self.metrics
2596 .post_processing_total_tx_had_event_processed
2597 .inc()
2598 })
2599 .tap_err(|e| {
2600 warn!(
2601 ?tx_digest,
2602 "Post processing - Couldn't process events for tx: {}", e
2603 )
2604 })?;
2605
2606 self.metrics
2607 .post_processing_total_events_emitted
2608 .inc_by(events.data.len() as u64);
2609 };
2610 Ok(())
2611 }
2612
2613 fn make_transaction_block_events(
2614 &self,
2615 transaction_events: TransactionEvents,
2616 digest: TransactionDigest,
2617 timestamp_ms: u64,
2618 epoch_store: &Arc<AuthorityPerEpochStore>,
2619 inner_temporary_store: &InnerTemporaryStore,
2620 ) -> IotaResult<IotaTransactionBlockEvents> {
2621 let mut layout_resolver =
2622 epoch_store
2623 .executor()
2624 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2625 inner_temporary_store,
2626 self.get_backing_package_store(),
2627 )));
2628 IotaTransactionBlockEvents::try_from(
2629 transaction_events,
2630 digest,
2631 Some(timestamp_ms),
2632 layout_resolver.as_mut(),
2633 )
2634 }
2635
2636 pub fn unixtime_now_ms() -> u64 {
2637 let now = SystemTime::now()
2638 .duration_since(UNIX_EPOCH)
2639 .expect("Time went backwards")
2640 .as_millis();
2641 u64::try_from(now).expect("Travelling in time machine")
2642 }
2643
2644 #[instrument(level = "trace", skip_all)]
2645 pub async fn handle_transaction_info_request(
2646 &self,
2647 request: TransactionInfoRequest,
2648 ) -> IotaResult<TransactionInfoResponse> {
2649 let epoch_store = self.load_epoch_store_one_call_per_task();
2650 let (transaction, status) = self
2651 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2652 .ok_or(IotaError::TransactionNotFound {
2653 digest: request.transaction_digest,
2654 })?;
2655 Ok(TransactionInfoResponse {
2656 transaction,
2657 status,
2658 })
2659 }
2660
2661 #[instrument(level = "trace", skip_all)]
2662 pub async fn handle_object_info_request(
2663 &self,
2664 request: ObjectInfoRequest,
2665 ) -> IotaResult<ObjectInfoResponse> {
2666 let epoch_store = self.load_epoch_store_one_call_per_task();
2667
2668 let requested_object_seq = match request.request_kind {
2669 ObjectInfoRequestKind::LatestObjectInfo => {
2670 let (_, seq, _) = self
2671 .try_get_object_or_tombstone(request.object_id)
2672 .await?
2673 .ok_or_else(|| {
2674 IotaError::from(UserInputError::ObjectNotFound {
2675 object_id: request.object_id,
2676 version: None,
2677 })
2678 })?;
2679 seq
2680 }
2681 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2682 };
2683
2684 let object = self
2685 .get_object_store()
2686 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2687 .ok_or_else(|| {
2688 IotaError::from(UserInputError::ObjectNotFound {
2689 object_id: request.object_id,
2690 version: Some(requested_object_seq),
2691 })
2692 })?;
2693
2694 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2695 (request.generate_layout, object.data.try_as_move())
2696 {
2697 Some(into_struct_layout(
2698 epoch_store
2699 .executor()
2700 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2701 .get_annotated_layout(&move_obj.type_().clone().into())?,
2702 )?)
2703 } else {
2704 None
2705 };
2706
2707 let lock = if !object.is_address_owned() {
2708 None
2710 } else {
2711 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2712 .await?
2713 .map(|s| s.into_inner())
2714 };
2715
2716 Ok(ObjectInfoResponse {
2717 object,
2718 layout,
2719 lock_for_debugging: lock,
2720 })
2721 }
2722
2723 #[instrument(level = "trace", skip_all)]
2724 pub fn handle_checkpoint_request(
2725 &self,
2726 request: &CheckpointRequest,
2727 ) -> IotaResult<CheckpointResponse> {
2728 let summary = if request.certified {
2729 let summary = match request.sequence_number {
2730 Some(seq) => self
2731 .checkpoint_store
2732 .get_checkpoint_by_sequence_number(seq)?,
2733 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2734 }
2735 .map(|v| v.into_inner());
2736 summary.map(CheckpointSummaryResponse::Certified)
2737 } else {
2738 let summary = match request.sequence_number {
2739 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2740 None => self
2741 .checkpoint_store
2742 .get_latest_locally_computed_checkpoint()?,
2743 };
2744 summary.map(CheckpointSummaryResponse::Pending)
2745 };
2746 let contents = match &summary {
2747 Some(s) => self
2748 .checkpoint_store
2749 .get_checkpoint_contents(&s.content_digest())?,
2750 None => None,
2751 };
2752 Ok(CheckpointResponse {
2753 checkpoint: summary,
2754 contents,
2755 })
2756 }
2757
2758 fn check_protocol_version(
2759 supported_protocol_versions: SupportedProtocolVersions,
2760 current_version: ProtocolVersion,
2761 ) {
2762 info!("current protocol version is now {:?}", current_version);
2763 info!("supported versions are: {:?}", supported_protocol_versions);
2764 if !supported_protocol_versions.is_version_supported(current_version) {
2765 let msg = format!(
2766 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2767 );
2768
2769 error!("{}", msg);
2770 eprintln!("{msg}");
2771
2772 #[cfg(not(msim))]
2773 std::process::exit(1);
2774
2775 #[cfg(msim)]
2776 iota_simulator::task::shutdown_current_node();
2777 }
2778 }
2779
2780 #[expect(clippy::disallowed_methods)] pub async fn new(
2782 name: AuthorityName,
2783 secret: StableSyncAuthoritySigner,
2784 supported_protocol_versions: SupportedProtocolVersions,
2785 store: Arc<AuthorityStore>,
2786 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2787 epoch_store: Arc<AuthorityPerEpochStore>,
2788 committee_store: Arc<CommitteeStore>,
2789 indexes: Option<Arc<IndexStore>>,
2790 rest_index: Option<Arc<RestIndexStore>>,
2791 checkpoint_store: Arc<CheckpointStore>,
2792 prometheus_registry: &Registry,
2793 genesis_objects: &[Object],
2794 db_checkpoint_config: &DBCheckpointConfig,
2795 config: NodeConfig,
2796 archive_readers: ArchiveReaderBalancer,
2797 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2798 chain_identifier: ChainIdentifier,
2799 pruner_db: Option<Arc<AuthorityPrunerTables>>,
2800 ) -> Arc<Self> {
2801 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2802
2803 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2804 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2805 let transaction_manager = Arc::new(TransactionManager::new(
2806 execution_cache_trait_pointers.object_cache_reader.clone(),
2807 execution_cache_trait_pointers
2808 .transaction_cache_reader
2809 .clone(),
2810 &epoch_store,
2811 tx_ready_certificates,
2812 metrics.clone(),
2813 ));
2814 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2815
2816 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2817 epoch_store.get_parent_path(),
2818 &config.authority_store_pruning_config,
2819 );
2820 let _pruner = AuthorityStorePruner::new(
2821 store.perpetual_tables.clone(),
2822 checkpoint_store.clone(),
2823 rest_index.clone(),
2824 config.authority_store_pruning_config.clone(),
2825 epoch_store.committee().authority_exists(&name),
2826 epoch_store.epoch_start_state().epoch_duration_ms(),
2827 prometheus_registry,
2828 archive_readers,
2829 pruner_db,
2830 );
2831 let input_loader =
2832 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2833 let epoch = epoch_store.epoch();
2834 let rgp = epoch_store.reference_gas_price();
2835 let state = Arc::new(AuthorityState {
2836 name,
2837 secret,
2838 execution_lock: RwLock::new(epoch),
2839 epoch_store: ArcSwap::new(epoch_store.clone()),
2840 input_loader,
2841 execution_cache_trait_pointers,
2842 indexes,
2843 rest_index,
2844 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2845 checkpoint_store,
2846 committee_store,
2847 transaction_manager,
2848 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2849 metrics,
2850 _pruner,
2851 _authority_per_epoch_pruner,
2852 db_checkpoint_config: db_checkpoint_config.clone(),
2853 config,
2854 overload_info: AuthorityOverloadInfo::default(),
2855 validator_tx_finalizer,
2856 chain_identifier,
2857 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
2858 });
2859
2860 let authority_state = Arc::downgrade(&state);
2862 spawn_monitored_task!(execution_process(
2863 authority_state,
2864 rx_ready_certificates,
2865 rx_execution_shutdown,
2866 ));
2867
2868 state
2870 .create_owner_index_if_empty(genesis_objects, &epoch_store)
2871 .expect("Error indexing genesis objects.");
2872
2873 state
2874 }
2875
2876 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2878 &self.execution_cache_trait_pointers.object_cache_reader
2879 }
2880
2881 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2882 &self.execution_cache_trait_pointers.transaction_cache_reader
2883 }
2884
2885 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2886 &self.execution_cache_trait_pointers.cache_writer
2887 }
2888
2889 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2890 &self.execution_cache_trait_pointers.backing_store
2891 }
2892
2893 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2894 &self.execution_cache_trait_pointers.backing_package_store
2895 }
2896
2897 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2898 &self.execution_cache_trait_pointers.object_store
2899 }
2900
2901 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2902 &self.execution_cache_trait_pointers.reconfig_api
2903 }
2904
2905 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2906 &self.execution_cache_trait_pointers.accumulator_store
2907 }
2908
2909 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2910 &self.execution_cache_trait_pointers.checkpoint_cache
2911 }
2912
2913 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2914 &self.execution_cache_trait_pointers.state_sync_store
2915 }
2916
2917 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2918 &self.execution_cache_trait_pointers.cache_commit
2919 }
2920
2921 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2922 self.execution_cache_trait_pointers
2923 .testing_api
2924 .database_for_testing()
2925 }
2926
2927 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2928 &self,
2929 config: NodeConfig,
2930 metrics: Arc<AuthorityStorePruningMetrics>,
2931 ) -> anyhow::Result<()> {
2932 let archive_readers =
2933 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
2934 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
2935 &self.database_for_testing().perpetual_tables,
2936 &self.checkpoint_store,
2937 self.rest_index.as_deref(),
2938 None,
2939 config.authority_store_pruning_config,
2940 metrics,
2941 archive_readers,
2942 EPOCH_DURATION_MS_FOR_TESTING,
2943 )
2944 .await
2945 }
2946
2947 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
2948 &self.transaction_manager
2949 }
2950
2951 pub fn enqueue_transactions_for_execution(
2954 &self,
2955 txns: Vec<VerifiedExecutableTransaction>,
2956 epoch_store: &Arc<AuthorityPerEpochStore>,
2957 ) {
2958 self.transaction_manager.enqueue(txns, epoch_store)
2959 }
2960
2961 pub fn enqueue_certificates_for_execution(
2963 &self,
2964 certs: Vec<VerifiedCertificate>,
2965 epoch_store: &Arc<AuthorityPerEpochStore>,
2966 ) {
2967 self.transaction_manager
2968 .enqueue_certificates(certs, epoch_store)
2969 }
2970
2971 pub fn enqueue_with_expected_effects_digest(
2972 &self,
2973 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
2974 epoch_store: &AuthorityPerEpochStore,
2975 ) {
2976 self.transaction_manager
2977 .enqueue_with_expected_effects_digest(certs, epoch_store)
2978 }
2979
2980 fn create_owner_index_if_empty(
2981 &self,
2982 genesis_objects: &[Object],
2983 epoch_store: &Arc<AuthorityPerEpochStore>,
2984 ) -> IotaResult {
2985 let Some(index_store) = &self.indexes else {
2986 return Ok(());
2987 };
2988 if !index_store.is_empty() {
2989 return Ok(());
2990 }
2991
2992 let mut new_owners = vec![];
2993 let mut new_dynamic_fields = vec![];
2994 let mut layout_resolver = epoch_store
2995 .executor()
2996 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
2997 for o in genesis_objects.iter() {
2998 match o.owner {
2999 Owner::AddressOwner(addr) => new_owners.push((
3000 (addr, o.id()),
3001 ObjectInfo::new(&o.compute_object_reference(), o),
3002 )),
3003 Owner::ObjectOwner(object_id) => {
3004 let id = o.id();
3005 let Some(info) = self.try_create_dynamic_field_info(
3006 o,
3007 &BTreeMap::new(),
3008 layout_resolver.as_mut(),
3009 true,
3010 )?
3011 else {
3012 continue;
3013 };
3014 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3015 }
3016 _ => {}
3017 }
3018 }
3019
3020 index_store.insert_genesis_objects(ObjectIndexChanges {
3021 deleted_owners: vec![],
3022 deleted_dynamic_fields: vec![],
3023 new_owners,
3024 new_dynamic_fields,
3025 })
3026 }
3027
3028 pub fn execution_lock_for_executable_transaction(
3032 &self,
3033 transaction: &VerifiedExecutableTransaction,
3034 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3035 let lock = self
3036 .execution_lock
3037 .try_read()
3038 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3039 if *lock == transaction.auth_sig().epoch() {
3040 Ok(lock)
3041 } else {
3042 Err(IotaError::WrongEpoch {
3043 expected_epoch: *lock,
3044 actual_epoch: transaction.auth_sig().epoch(),
3045 })
3046 }
3047 }
3048
3049 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3055 self.execution_lock
3056 .try_read()
3057 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3058 }
3059
3060 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3061 self.execution_lock.write().await
3062 }
3063
3064 #[instrument(level = "error", skip_all)]
3065 pub async fn reconfigure(
3066 &self,
3067 cur_epoch_store: &AuthorityPerEpochStore,
3068 supported_protocol_versions: SupportedProtocolVersions,
3069 new_committee: Committee,
3070 epoch_start_configuration: EpochStartConfiguration,
3071 accumulator: Arc<StateAccumulator>,
3072 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3073 epoch_supply_change: i64,
3074 epoch_last_checkpoint: CheckpointSequenceNumber,
3075 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3076 Self::check_protocol_version(
3077 supported_protocol_versions,
3078 epoch_start_configuration
3079 .epoch_start_state()
3080 .protocol_version(),
3081 );
3082 self.metrics.reset_on_reconfigure();
3083 self.committee_store.insert_new_committee(&new_committee)?;
3084
3085 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3087
3088 cur_epoch_store.epoch_terminated().await;
3090
3091 let highest_locally_built_checkpoint_seq = self
3092 .checkpoint_store
3093 .get_latest_locally_computed_checkpoint()?
3094 .map(|c| *c.sequence_number())
3095 .unwrap_or(0);
3096
3097 assert!(
3098 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3099 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3100 );
3101 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3102 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3106 if num_shared_version_assignments > 1 {
3109 debug_fatal!(
3111 "all shared_version_assignments should have been removed \
3112 (num_shared_version_assignments: {num_shared_version_assignments})"
3113 );
3114 }
3115 }
3116
3117 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3123 .await?;
3124 self.get_reconfig_api()
3125 .clear_state_end_of_epoch(&execution_lock);
3126 self.check_system_consistency(
3127 cur_epoch_store,
3128 accumulator,
3129 expensive_safety_check_config,
3130 epoch_supply_change,
3131 )?;
3132 self.get_reconfig_api()
3133 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3134 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3135 if self
3136 .db_checkpoint_config
3137 .perform_db_checkpoints_at_epoch_end
3138 {
3139 let checkpoint_indexes = self
3140 .db_checkpoint_config
3141 .perform_index_db_checkpoints_at_epoch_end
3142 .unwrap_or(false);
3143 let current_epoch = cur_epoch_store.epoch();
3144 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3145 self.checkpoint_all_dbs(
3146 &epoch_checkpoint_path,
3147 cur_epoch_store,
3148 checkpoint_indexes,
3149 )?;
3150 }
3151 }
3152
3153 self.get_reconfig_api()
3154 .reconfigure_cache(&epoch_start_configuration)
3155 .await;
3156
3157 let new_epoch = new_committee.epoch;
3158 let new_epoch_store = self
3159 .reopen_epoch_db(
3160 cur_epoch_store,
3161 new_committee,
3162 epoch_start_configuration,
3163 expensive_safety_check_config,
3164 epoch_last_checkpoint,
3165 )
3166 .await?;
3167 assert_eq!(new_epoch_store.epoch(), new_epoch);
3168 self.transaction_manager.reconfigure(new_epoch);
3169 *execution_lock = new_epoch;
3170 Ok(new_epoch_store)
3174 }
3175
3176 pub async fn reconfigure_for_testing(&self) {
3181 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3182 let epoch_store = self.epoch_store_for_testing().clone();
3183 let protocol_config = epoch_store.protocol_config().clone();
3184 let _guard =
3192 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3193 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3194 self.get_backing_package_store().clone(),
3195 self.get_object_store().clone(),
3196 &self.config.expensive_safety_check_config,
3197 self.checkpoint_store
3198 .get_epoch_last_checkpoint(epoch_store.epoch())
3199 .unwrap()
3200 .map(|c| *c.sequence_number())
3201 .unwrap_or_default(),
3202 );
3203 let new_epoch = new_epoch_store.epoch();
3204 self.transaction_manager.reconfigure(new_epoch);
3205 self.epoch_store.store(new_epoch_store);
3206 epoch_store.epoch_terminated().await;
3207 *execution_lock = new_epoch;
3208 }
3209
3210 #[instrument(level = "error", skip_all)]
3211 fn check_system_consistency(
3212 &self,
3213 cur_epoch_store: &AuthorityPerEpochStore,
3214 accumulator: Arc<StateAccumulator>,
3215 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3216 epoch_supply_change: i64,
3217 ) -> IotaResult<()> {
3218 info!(
3219 "Performing iota conservation consistency check for epoch {}",
3220 cur_epoch_store.epoch()
3221 );
3222
3223 if cfg!(debug_assertions) {
3224 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3225 }
3226
3227 self.get_reconfig_api()
3228 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3229
3230 if expensive_safety_check_config.enable_state_consistency_check() {
3232 info!(
3233 "Performing state consistency check for epoch {}",
3234 cur_epoch_store.epoch()
3235 );
3236 self.expensive_check_is_consistent_state(
3237 accumulator,
3238 cur_epoch_store,
3239 cfg!(debug_assertions), );
3241 }
3242
3243 if expensive_safety_check_config.enable_secondary_index_checks() {
3244 if let Some(indexes) = self.indexes.clone() {
3245 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3246 .expect("secondary indexes are inconsistent");
3247 }
3248 }
3249
3250 Ok(())
3251 }
3252
3253 fn expensive_check_is_consistent_state(
3254 &self,
3255 accumulator: Arc<StateAccumulator>,
3256 cur_epoch_store: &AuthorityPerEpochStore,
3257 panic: bool,
3258 ) {
3259 let live_object_set_hash = accumulator.digest_live_object_set();
3260
3261 let root_state_hash: ECMHLiveObjectSetDigest = self
3262 .get_accumulator_store()
3263 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3264 .expect("Retrieving root state hash cannot fail")
3265 .expect("Root state hash for epoch must exist")
3266 .1
3267 .digest()
3268 .into();
3269
3270 let is_inconsistent = root_state_hash != live_object_set_hash;
3271 if is_inconsistent {
3272 if panic {
3273 panic!(
3274 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3275 );
3276 } else {
3277 error!(
3278 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3279 root_state_hash, live_object_set_hash
3280 );
3281 }
3282 } else {
3283 info!("State consistency check passed");
3284 }
3285
3286 if !panic {
3287 accumulator.set_inconsistent_state(is_inconsistent);
3288 }
3289 }
3290
3291 pub fn current_epoch_for_testing(&self) -> EpochId {
3292 self.epoch_store_for_testing().epoch()
3293 }
3294
3295 #[instrument(level = "error", skip_all)]
3296 pub fn checkpoint_all_dbs(
3297 &self,
3298 checkpoint_path: &Path,
3299 cur_epoch_store: &AuthorityPerEpochStore,
3300 checkpoint_indexes: bool,
3301 ) -> IotaResult {
3302 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3303 let current_epoch = cur_epoch_store.epoch();
3304
3305 if checkpoint_path.exists() {
3306 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3307 return Ok(());
3308 }
3309
3310 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3311 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3312
3313 if checkpoint_path_tmp.exists() {
3314 fs::remove_dir_all(&checkpoint_path_tmp)
3315 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3316 }
3317
3318 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3319 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3320
3321 self.checkpoint_store
3324 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3325
3326 self.get_reconfig_api()
3327 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3328
3329 self.committee_store
3330 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3331
3332 if checkpoint_indexes {
3333 if let Some(indexes) = self.indexes.as_ref() {
3334 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3335 }
3336 }
3337
3338 fs::rename(checkpoint_path_tmp, checkpoint_path)
3339 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3340 Ok(())
3341 }
3342
3343 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3350 self.epoch_store.load()
3351 }
3352
3353 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3355 self.load_epoch_store_one_call_per_task()
3356 }
3357
3358 pub fn clone_committee_for_testing(&self) -> Committee {
3359 Committee::clone(self.epoch_store_for_testing().committee())
3360 }
3361
3362 #[instrument(level = "trace", skip_all)]
3363 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3364 self.get_object_store()
3365 .try_get_object(object_id)
3366 .map_err(Into::into)
3367 }
3368
3369 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3371 self.try_get_object(object_id)
3372 .await
3373 .expect("storage access failed")
3374 }
3375
3376 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3377 Ok(self
3378 .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3379 .await?
3380 .expect("framework object should always exist")
3381 .compute_object_reference())
3382 }
3383
3384 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3386 self.get_object_cache_reader()
3387 .try_get_iota_system_state_object_unsafe()
3388 }
3389
3390 #[instrument(level = "trace", skip_all)]
3391 pub fn get_checkpoint_by_sequence_number(
3392 &self,
3393 sequence_number: CheckpointSequenceNumber,
3394 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3395 Ok(self
3396 .checkpoint_store
3397 .get_checkpoint_by_sequence_number(sequence_number)?)
3398 }
3399
3400 #[instrument(level = "trace", skip_all)]
3401 pub fn get_transaction_checkpoint_for_tests(
3402 &self,
3403 digest: &TransactionDigest,
3404 epoch_store: &AuthorityPerEpochStore,
3405 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3406 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3407 let Some(checkpoint) = checkpoint else {
3408 return Ok(None);
3409 };
3410 let checkpoint = self
3411 .checkpoint_store
3412 .get_checkpoint_by_sequence_number(checkpoint)?;
3413 Ok(checkpoint)
3414 }
3415
3416 #[instrument(level = "trace", skip_all)]
3417 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3418 Ok(
3419 match self
3420 .get_object_cache_reader()
3421 .try_get_latest_object_or_tombstone(*object_id)?
3422 {
3423 Some((_, ObjectOrTombstone::Object(object))) => {
3424 let layout = self.get_object_layout(&object)?;
3425 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3426 }
3427 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3428 None => ObjectRead::NotExists(*object_id),
3429 },
3430 )
3431 }
3432
3433 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3435 self.chain_identifier
3436 }
3437
3438 #[instrument(level = "trace", skip_all)]
3439 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3440 where
3441 T: DeserializeOwned,
3442 {
3443 let o = self.get_object_read(object_id)?.into_object()?;
3444 if let Some(move_object) = o.data.try_as_move() {
3445 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3446 IotaError::ObjectDeserialization {
3447 error: format!("{e}"),
3448 }
3449 })?)
3450 } else {
3451 Err(IotaError::ObjectDeserialization {
3452 error: format!("Provided object : [{object_id}] is not a Move object."),
3453 })
3454 }
3455 }
3456
3457 #[instrument(level = "trace", skip_all)]
3463 pub fn get_past_object_read(
3464 &self,
3465 object_id: &ObjectID,
3466 version: SequenceNumber,
3467 ) -> IotaResult<PastObjectRead> {
3468 let Some(obj_ref) = self
3470 .get_object_cache_reader()
3471 .try_get_latest_object_ref_or_tombstone(*object_id)?
3472 else {
3473 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3474 };
3475
3476 if version > obj_ref.1 {
3477 return Ok(PastObjectRead::VersionTooHigh {
3478 object_id: *object_id,
3479 asked_version: version,
3480 latest_version: obj_ref.1,
3481 });
3482 }
3483
3484 if version < obj_ref.1 {
3485 return Ok(match self.read_object_at_version(object_id, version)? {
3487 Some((object, layout)) => {
3488 let obj_ref = object.compute_object_reference();
3489 PastObjectRead::VersionFound(obj_ref, object, layout)
3490 }
3491
3492 None => PastObjectRead::VersionNotFound(*object_id, version),
3493 });
3494 }
3495
3496 if !obj_ref.2.is_alive() {
3497 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3498 }
3499
3500 match self.read_object_at_version(object_id, obj_ref.1)? {
3501 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3502 None => {
3503 error!(
3504 "Object with in parent_entry is missing from object store, datastore is \
3505 inconsistent",
3506 );
3507 Err(UserInputError::ObjectNotFound {
3508 object_id: *object_id,
3509 version: Some(obj_ref.1),
3510 }
3511 .into())
3512 }
3513 }
3514 }
3515
3516 #[instrument(level = "trace", skip_all)]
3517 fn read_object_at_version(
3518 &self,
3519 object_id: &ObjectID,
3520 version: SequenceNumber,
3521 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3522 let Some(object) = self
3523 .get_object_cache_reader()
3524 .try_get_object_by_key(object_id, version)?
3525 else {
3526 return Ok(None);
3527 };
3528
3529 let layout = self.get_object_layout(&object)?;
3530 Ok(Some((object, layout)))
3531 }
3532
3533 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3534 let layout = object
3535 .data
3536 .try_as_move()
3537 .map(|object| {
3538 into_struct_layout(
3539 self.load_epoch_store_one_call_per_task()
3540 .executor()
3541 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3543 .get_annotated_layout(&object.type_().clone().into())?,
3544 )
3545 })
3546 .transpose()?;
3547 Ok(layout)
3548 }
3549
3550 fn get_owner_at_version(
3551 &self,
3552 object_id: &ObjectID,
3553 version: SequenceNumber,
3554 ) -> IotaResult<Owner> {
3555 self.get_object_store()
3556 .try_get_object_by_key(object_id, version)?
3557 .ok_or_else(|| {
3558 IotaError::from(UserInputError::ObjectNotFound {
3559 object_id: *object_id,
3560 version: Some(version),
3561 })
3562 })
3563 .map(|o| o.owner)
3564 }
3565
3566 #[instrument(level = "trace", skip_all)]
3567 pub fn get_owner_objects(
3568 &self,
3569 owner: IotaAddress,
3570 cursor: Option<ObjectID>,
3572 limit: usize,
3573 filter: Option<IotaObjectDataFilter>,
3574 ) -> IotaResult<Vec<ObjectInfo>> {
3575 if let Some(indexes) = &self.indexes {
3576 indexes.get_owner_objects(owner, cursor, limit, filter)
3577 } else {
3578 Err(IotaError::IndexStoreNotAvailable)
3579 }
3580 }
3581
3582 #[instrument(level = "trace", skip_all)]
3583 pub fn get_owned_coins_iterator_with_cursor(
3584 &self,
3585 owner: IotaAddress,
3586 cursor: (String, ObjectID),
3588 limit: usize,
3589 one_coin_type_only: bool,
3590 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3591 if let Some(indexes) = &self.indexes {
3592 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3593 } else {
3594 Err(IotaError::IndexStoreNotAvailable)
3595 }
3596 }
3597
3598 #[instrument(level = "trace", skip_all)]
3599 pub fn get_owner_objects_iterator(
3600 &self,
3601 owner: IotaAddress,
3602 cursor: Option<ObjectID>,
3604 filter: Option<IotaObjectDataFilter>,
3605 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3606 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3607 if let Some(indexes) = &self.indexes {
3608 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3609 } else {
3610 Err(IotaError::IndexStoreNotAvailable)
3611 }
3612 }
3613
3614 #[instrument(level = "trace", skip_all)]
3615 pub async fn get_move_objects<T>(
3616 &self,
3617 owner: IotaAddress,
3618 type_: MoveObjectType,
3619 ) -> IotaResult<Vec<T>>
3620 where
3621 T: DeserializeOwned,
3622 {
3623 let object_ids = self
3624 .get_owner_objects_iterator(owner, None, None)?
3625 .filter(|o| match &o.type_ {
3626 ObjectType::Struct(s) => &type_ == s,
3627 ObjectType::Package => false,
3628 })
3629 .map(|info| ObjectKey(info.object_id, info.version))
3630 .collect::<Vec<_>>();
3631 let mut move_objects = vec![];
3632
3633 let objects = self
3634 .get_object_store()
3635 .try_multi_get_objects_by_key(&object_ids)?;
3636
3637 for (o, id) in objects.into_iter().zip(object_ids) {
3638 let object = o.ok_or_else(|| {
3639 IotaError::from(UserInputError::ObjectNotFound {
3640 object_id: id.0,
3641 version: Some(id.1),
3642 })
3643 })?;
3644 let move_object = object.data.try_as_move().ok_or_else(|| {
3645 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3646 })?;
3647 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3648 IotaError::ObjectDeserialization {
3649 error: format!("{e}"),
3650 }
3651 })?);
3652 }
3653 Ok(move_objects)
3654 }
3655
3656 #[instrument(level = "trace", skip_all)]
3657 pub fn get_dynamic_fields(
3658 &self,
3659 owner: ObjectID,
3660 cursor: Option<ObjectID>,
3662 limit: usize,
3663 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3664 Ok(self
3665 .get_dynamic_fields_iterator(owner, cursor)?
3666 .take(limit)
3667 .collect::<Result<Vec<_>, _>>()?)
3668 }
3669
3670 fn get_dynamic_fields_iterator(
3671 &self,
3672 owner: ObjectID,
3673 cursor: Option<ObjectID>,
3675 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3676 {
3677 if let Some(indexes) = &self.indexes {
3678 indexes.get_dynamic_fields_iterator(owner, cursor)
3679 } else {
3680 Err(IotaError::IndexStoreNotAvailable)
3681 }
3682 }
3683
3684 #[instrument(level = "trace", skip_all)]
3685 pub fn get_dynamic_field_object_id(
3686 &self,
3687 owner: ObjectID,
3688 name_type: TypeTag,
3689 name_bcs_bytes: &[u8],
3690 ) -> IotaResult<Option<ObjectID>> {
3691 if let Some(indexes) = &self.indexes {
3692 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3693 } else {
3694 Err(IotaError::IndexStoreNotAvailable)
3695 }
3696 }
3697
3698 #[instrument(level = "trace", skip_all)]
3699 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3700 Ok(self.get_indexes()?.next_sequence_number())
3701 }
3702
3703 #[instrument(level = "trace", skip_all)]
3704 pub async fn get_executed_transaction_and_effects(
3705 &self,
3706 digest: TransactionDigest,
3707 kv_store: Arc<TransactionKeyValueStore>,
3708 ) -> IotaResult<(Transaction, TransactionEffects)> {
3709 let transaction = kv_store.get_tx(digest).await?;
3710 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3711 Ok((transaction, effects))
3712 }
3713
3714 #[instrument(level = "trace", skip_all)]
3715 pub fn multi_get_checkpoint_by_sequence_number(
3716 &self,
3717 sequence_numbers: &[CheckpointSequenceNumber],
3718 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3719 Ok(self
3720 .checkpoint_store
3721 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3722 }
3723
3724 #[instrument(level = "trace", skip_all)]
3725 pub fn get_transaction_events(
3726 &self,
3727 digest: &TransactionEventsDigest,
3728 ) -> IotaResult<TransactionEvents> {
3729 self.get_transaction_cache_reader()
3730 .try_get_events(digest)?
3731 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3732 }
3733
3734 pub fn get_transaction_input_objects(
3735 &self,
3736 effects: &TransactionEffects,
3737 ) -> anyhow::Result<Vec<Object>> {
3738 let input_object_keys = effects
3739 .modified_at_versions()
3740 .into_iter()
3741 .map(|(object_id, version)| ObjectKey(object_id, version))
3742 .collect::<Vec<_>>();
3743
3744 let input_objects = self
3745 .get_object_store()
3746 .try_multi_get_objects_by_key(&input_object_keys)?
3747 .into_iter()
3748 .enumerate()
3749 .map(|(idx, maybe_object)| {
3750 maybe_object.ok_or_else(|| {
3751 anyhow::anyhow!(
3752 "missing input object key {:?} from tx {}",
3753 input_object_keys[idx],
3754 effects.transaction_digest()
3755 )
3756 })
3757 })
3758 .collect::<anyhow::Result<Vec<_>>>()?;
3759 Ok(input_objects)
3760 }
3761
3762 pub fn get_transaction_output_objects(
3763 &self,
3764 effects: &TransactionEffects,
3765 ) -> anyhow::Result<Vec<Object>> {
3766 let output_object_keys = effects
3767 .all_changed_objects()
3768 .into_iter()
3769 .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3770 .collect::<Vec<_>>();
3771
3772 let output_objects = self
3773 .get_object_store()
3774 .try_multi_get_objects_by_key(&output_object_keys)?
3775 .into_iter()
3776 .enumerate()
3777 .map(|(idx, maybe_object)| {
3778 maybe_object.ok_or_else(|| {
3779 anyhow::anyhow!(
3780 "missing output object key {:?} from tx {}",
3781 output_object_keys[idx],
3782 effects.transaction_digest()
3783 )
3784 })
3785 })
3786 .collect::<anyhow::Result<Vec<_>>>()?;
3787 Ok(output_objects)
3788 }
3789
3790 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3791 match &self.indexes {
3792 Some(i) => Ok(i.clone()),
3793 None => Err(IotaError::UnsupportedFeature {
3794 error: "extended object indexing is not enabled on this server".into(),
3795 }),
3796 }
3797 }
3798
3799 pub async fn get_transactions_for_tests(
3800 self: &Arc<Self>,
3801 filter: Option<TransactionFilter>,
3802 cursor: Option<TransactionDigest>,
3803 limit: Option<usize>,
3804 reverse: bool,
3805 ) -> IotaResult<Vec<TransactionDigest>> {
3806 let metrics = KeyValueStoreMetrics::new_for_tests();
3807 let kv_store = Arc::new(TransactionKeyValueStore::new(
3808 "rocksdb",
3809 metrics,
3810 self.clone(),
3811 ));
3812 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3813 .await
3814 }
3815
3816 #[instrument(level = "trace", skip_all)]
3817 pub async fn get_transactions(
3818 &self,
3819 kv_store: &Arc<TransactionKeyValueStore>,
3820 filter: Option<TransactionFilter>,
3821 cursor: Option<TransactionDigest>,
3823 limit: Option<usize>,
3824 reverse: bool,
3825 ) -> IotaResult<Vec<TransactionDigest>> {
3826 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3827 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3828 let iter = checkpoint_contents.iter().map(|c| c.transaction);
3829 if reverse {
3830 let iter = iter
3831 .rev()
3832 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3833 .skip(usize::from(cursor.is_some()));
3834 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3835 } else {
3836 let iter = iter
3837 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3838 .skip(usize::from(cursor.is_some()));
3839 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3840 }
3841 }
3842 self.get_indexes()?
3843 .get_transactions(filter, cursor, limit, reverse)
3844 }
3845
3846 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3847 &self.checkpoint_store
3848 }
3849
3850 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3851 self.get_checkpoint_store()
3852 .get_highest_executed_checkpoint_seq_number()?
3853 .ok_or(IotaError::UserInput {
3854 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3855 })
3856 }
3857
3858 #[cfg(msim)]
3859 pub fn get_highest_pruned_checkpoint_for_testing(
3860 &self,
3861 ) -> IotaResult<CheckpointSequenceNumber> {
3862 self.database_for_testing()
3863 .perpetual_tables
3864 .get_highest_pruned_checkpoint()
3865 }
3866
3867 #[instrument(level = "trace", skip_all)]
3868 pub fn get_checkpoint_summary_by_sequence_number(
3869 &self,
3870 sequence_number: CheckpointSequenceNumber,
3871 ) -> IotaResult<CheckpointSummary> {
3872 let verified_checkpoint = self
3873 .get_checkpoint_store()
3874 .get_checkpoint_by_sequence_number(sequence_number)?;
3875 match verified_checkpoint {
3876 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3877 None => Err(IotaError::UserInput {
3878 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3879 }),
3880 }
3881 }
3882
3883 #[instrument(level = "trace", skip_all)]
3884 pub fn get_checkpoint_summary_by_digest(
3885 &self,
3886 digest: CheckpointDigest,
3887 ) -> IotaResult<CheckpointSummary> {
3888 let verified_checkpoint = self
3889 .get_checkpoint_store()
3890 .get_checkpoint_by_digest(&digest)?;
3891 match verified_checkpoint {
3892 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3893 None => Err(IotaError::UserInput {
3894 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3895 }),
3896 }
3897 }
3898
3899 #[instrument(level = "trace", skip_all)]
3900 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3901 if is_system_package(package_id) {
3902 return self.find_genesis_txn_digest();
3903 }
3904 Ok(self
3905 .get_object_read(&package_id)?
3906 .into_object()?
3907 .previous_transaction)
3908 }
3909
3910 #[instrument(level = "trace", skip_all)]
3911 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3912 let summary = self
3913 .get_verified_checkpoint_by_sequence_number(0)?
3914 .into_message();
3915 let content = self.get_checkpoint_contents(summary.content_digest)?;
3916 let genesis_transaction = content.enumerate_transactions(&summary).next();
3917 Ok(genesis_transaction
3918 .ok_or(IotaError::UserInput {
3919 error: UserInputError::GenesisTransactionNotFound,
3920 })?
3921 .1
3922 .transaction)
3923 }
3924
3925 #[instrument(level = "trace", skip_all)]
3926 pub fn get_verified_checkpoint_by_sequence_number(
3927 &self,
3928 sequence_number: CheckpointSequenceNumber,
3929 ) -> IotaResult<VerifiedCheckpoint> {
3930 let verified_checkpoint = self
3931 .get_checkpoint_store()
3932 .get_checkpoint_by_sequence_number(sequence_number)?;
3933 match verified_checkpoint {
3934 Some(verified_checkpoint) => Ok(verified_checkpoint),
3935 None => Err(IotaError::UserInput {
3936 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3937 }),
3938 }
3939 }
3940
3941 #[instrument(level = "trace", skip_all)]
3942 pub fn get_verified_checkpoint_summary_by_digest(
3943 &self,
3944 digest: CheckpointDigest,
3945 ) -> IotaResult<VerifiedCheckpoint> {
3946 let verified_checkpoint = self
3947 .get_checkpoint_store()
3948 .get_checkpoint_by_digest(&digest)?;
3949 match verified_checkpoint {
3950 Some(verified_checkpoint) => Ok(verified_checkpoint),
3951 None => Err(IotaError::UserInput {
3952 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3953 }),
3954 }
3955 }
3956
3957 #[instrument(level = "trace", skip_all)]
3958 pub fn get_checkpoint_contents(
3959 &self,
3960 digest: CheckpointContentsDigest,
3961 ) -> IotaResult<CheckpointContents> {
3962 self.get_checkpoint_store()
3963 .get_checkpoint_contents(&digest)?
3964 .ok_or(IotaError::UserInput {
3965 error: UserInputError::CheckpointContentsNotFound(digest),
3966 })
3967 }
3968
3969 #[instrument(level = "trace", skip_all)]
3970 pub fn get_checkpoint_contents_by_sequence_number(
3971 &self,
3972 sequence_number: CheckpointSequenceNumber,
3973 ) -> IotaResult<CheckpointContents> {
3974 let verified_checkpoint = self
3975 .get_checkpoint_store()
3976 .get_checkpoint_by_sequence_number(sequence_number)?;
3977 match verified_checkpoint {
3978 Some(verified_checkpoint) => {
3979 let content_digest = verified_checkpoint.into_inner().content_digest;
3980 self.get_checkpoint_contents(content_digest)
3981 }
3982 None => Err(IotaError::UserInput {
3983 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3984 }),
3985 }
3986 }
3987
3988 #[instrument(level = "trace", skip_all)]
3989 pub async fn query_events(
3990 &self,
3991 kv_store: &Arc<TransactionKeyValueStore>,
3992 query: EventFilter,
3993 cursor: Option<EventID>,
3995 limit: usize,
3996 descending: bool,
3997 ) -> IotaResult<Vec<IotaEvent>> {
3998 let index_store = self.get_indexes()?;
3999
4000 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4002 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4003 IotaError::TransactionNotFound {
4004 digest: cursor.tx_digest,
4005 },
4006 )?;
4007 (tx_seq, cursor.event_seq as usize)
4008 } else if descending {
4009 (u64::MAX, usize::MAX)
4010 } else {
4011 (0, 0)
4012 };
4013
4014 let limit = limit + 1;
4015 let mut event_keys = match query {
4016 EventFilter::All(filters) => {
4017 if filters.is_empty() {
4018 index_store.all_events(tx_num, event_num, limit, descending)?
4019 } else {
4020 return Err(IotaError::UserInput {
4021 error: UserInputError::Unsupported(
4022 "This query type does not currently support filter combinations"
4023 .to_string(),
4024 ),
4025 });
4026 }
4027 }
4028 EventFilter::Transaction(digest) => {
4029 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4030 }
4031 EventFilter::MoveModule { package, module } => {
4032 let module_id = ModuleId::new(package.into(), module);
4033 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4034 }
4035 EventFilter::MoveEventType(struct_name) => index_store
4036 .events_by_move_event_struct_name(
4037 &struct_name,
4038 tx_num,
4039 event_num,
4040 limit,
4041 descending,
4042 )?,
4043 EventFilter::Sender(sender) => {
4044 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4045 }
4046 EventFilter::TimeRange {
4047 start_time,
4048 end_time,
4049 } => index_store
4050 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4051 EventFilter::MoveEventModule { package, module } => index_store
4052 .events_by_move_event_module(
4053 &ModuleId::new(package.into(), module),
4054 tx_num,
4055 event_num,
4056 limit,
4057 descending,
4058 )?,
4059 EventFilter::Package(_)
4061 | EventFilter::MoveEventField { .. }
4062 | EventFilter::Any(_)
4063 | EventFilter::And(_, _)
4064 | EventFilter::Or(_, _) => {
4065 return Err(IotaError::UserInput {
4066 error: UserInputError::Unsupported(
4067 "This query type is not supported by the full node.".to_string(),
4068 ),
4069 });
4070 }
4071 };
4072
4073 if cursor.is_some() {
4076 if !event_keys.is_empty() {
4077 event_keys.remove(0);
4078 }
4079 } else {
4080 event_keys.truncate(limit - 1);
4081 }
4082
4083 let transaction_digests = event_keys
4085 .iter()
4086 .map(|(_, digest, _, _)| *digest)
4087 .collect::<HashSet<_>>()
4088 .into_iter()
4089 .collect::<Vec<_>>();
4090
4091 let events = kv_store
4092 .multi_get_events_by_tx_digests(&transaction_digests)
4093 .await?;
4094
4095 let events_map: HashMap<_, _> =
4096 transaction_digests.iter().zip(events.into_iter()).collect();
4097
4098 let stored_events = event_keys
4099 .into_iter()
4100 .map(|k| {
4101 (
4102 k,
4103 events_map
4104 .get(&k.1)
4105 .expect("fetched digest is missing")
4106 .clone()
4107 .and_then(|e| e.data.get(k.2).cloned()),
4108 )
4109 })
4110 .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4111 event
4112 .map(|e| (e, tx_digest, event_seq, timestamp))
4113 .ok_or(IotaError::TransactionEventsNotFound { digest })
4114 })
4115 .collect::<Result<Vec<_>, _>>()?;
4116
4117 let epoch_store = self.load_epoch_store_one_call_per_task();
4118 let backing_store = self.get_backing_package_store().as_ref();
4119 let mut layout_resolver = epoch_store
4120 .executor()
4121 .type_layout_resolver(Box::new(backing_store));
4122 let mut events = vec![];
4123 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4124 events.push(IotaEvent::try_from(
4125 e.clone(),
4126 tx_digest,
4127 event_seq as u64,
4128 Some(timestamp),
4129 layout_resolver.get_annotated_layout(&e.type_)?,
4130 )?)
4131 }
4132 Ok(events)
4133 }
4134
4135 pub async fn insert_genesis_object(&self, object: Object) {
4136 self.get_reconfig_api()
4137 .try_insert_genesis_object(object)
4138 .expect("Cannot insert genesis object")
4139 }
4140
4141 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4142 futures::future::join_all(
4143 objects
4144 .iter()
4145 .map(|o| self.insert_genesis_object(o.clone())),
4146 )
4147 .await;
4148 }
4149
4150 #[instrument(level = "trace", skip_all)]
4152 pub fn get_transaction_status(
4153 &self,
4154 transaction_digest: &TransactionDigest,
4155 epoch_store: &Arc<AuthorityPerEpochStore>,
4156 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4157 if let Some(effects) =
4159 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4160 {
4161 if let Some(transaction) = self
4162 .get_transaction_cache_reader()
4163 .try_get_transaction_block(transaction_digest)?
4164 {
4165 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4166 let events = if let Some(digest) = effects.events_digest() {
4167 self.get_transaction_events(digest)?
4168 } else {
4169 TransactionEvents::default()
4170 };
4171 return Ok(Some((
4172 (*transaction).clone().into_message(),
4173 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4174 )));
4175 } else {
4176 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4181 }
4182 }
4183 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4184 self.metrics.tx_already_processed.inc();
4185 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4186 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4187 } else {
4188 Ok(None)
4189 }
4190 }
4191
4192 #[instrument(level = "trace", skip_all)]
4196 pub fn get_signed_effects_and_maybe_resign(
4197 &self,
4198 transaction_digest: &TransactionDigest,
4199 epoch_store: &Arc<AuthorityPerEpochStore>,
4200 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4201 let effects = self
4202 .get_transaction_cache_reader()
4203 .try_get_executed_effects(transaction_digest)?;
4204 match effects {
4205 Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4206 None => Ok(None),
4207 }
4208 }
4209
4210 #[instrument(level = "trace", skip_all)]
4211 pub(crate) fn sign_effects(
4212 &self,
4213 effects: TransactionEffects,
4214 epoch_store: &Arc<AuthorityPerEpochStore>,
4215 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4216 let tx_digest = *effects.transaction_digest();
4217 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4218 Some(sig) if sig.epoch == epoch_store.epoch() => {
4219 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4220 }
4221 _ => {
4222 debug!(
4245 ?tx_digest,
4246 epoch=?epoch_store.epoch(),
4247 "Re-signing the effects with the current epoch"
4248 );
4249
4250 let sig = AuthoritySignInfo::new(
4251 epoch_store.epoch(),
4252 &effects,
4253 Intent::iota_app(IntentScope::TransactionEffects),
4254 self.name,
4255 &*self.secret,
4256 );
4257
4258 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4259
4260 epoch_store.insert_effects_digest_and_signature(
4261 &tx_digest,
4262 effects.digest(),
4263 &sig,
4264 )?;
4265
4266 effects
4267 }
4268 };
4269
4270 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4271 signed_effects,
4272 ))
4273 }
4274
4275 #[instrument(level = "trace", skip_all)]
4277 fn fullnode_only_get_tx_coins_for_indexing(
4278 &self,
4279 inner_temporary_store: &InnerTemporaryStore,
4280 epoch_store: &Arc<AuthorityPerEpochStore>,
4281 ) -> Option<TxCoins> {
4282 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4283 return None;
4284 }
4285 let written_coin_objects = inner_temporary_store
4286 .written
4287 .iter()
4288 .filter_map(|(k, v)| {
4289 if v.is_coin() {
4290 Some((*k, v.clone()))
4291 } else {
4292 None
4293 }
4294 })
4295 .collect();
4296 let input_coin_objects = inner_temporary_store
4297 .input_objects
4298 .iter()
4299 .filter_map(|(k, v)| {
4300 if v.is_coin() {
4301 Some((*k, v.clone()))
4302 } else {
4303 None
4304 }
4305 })
4306 .collect::<ObjectMap>();
4307 Some((input_coin_objects, written_coin_objects))
4308 }
4309
4310 #[instrument(level = "trace", skip_all)]
4322 pub async fn get_transaction_lock(
4323 &self,
4324 object_ref: &ObjectRef,
4325 epoch_store: &AuthorityPerEpochStore,
4326 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4327 let lock_info = self
4328 .get_object_cache_reader()
4329 .try_get_lock(*object_ref, epoch_store)?;
4330 let lock_info = match lock_info {
4331 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4332 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4333 provided_obj_ref: *object_ref,
4334 current_version: locked_ref.1,
4335 }
4336 .into());
4337 }
4338 ObjectLockStatus::Initialized => {
4339 return Ok(None);
4340 }
4341 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4342 };
4343
4344 epoch_store.get_signed_transaction(&lock_info)
4345 }
4346
4347 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4348 self.get_object_cache_reader().try_get_objects(objects)
4349 }
4350
4351 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4353 self.try_get_objects(objects)
4354 .await
4355 .expect("storage access failed")
4356 }
4357
4358 pub async fn try_get_object_or_tombstone(
4359 &self,
4360 object_id: ObjectID,
4361 ) -> IotaResult<Option<ObjectRef>> {
4362 self.get_object_cache_reader()
4363 .try_get_latest_object_ref_or_tombstone(object_id)
4364 }
4365
4366 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4368 self.try_get_object_or_tombstone(object_id)
4369 .await
4370 .expect("storage access failed")
4371 }
4372
4373 pub fn set_override_protocol_upgrade_buffer_stake(
4383 &self,
4384 expected_epoch: EpochId,
4385 buffer_stake_bps: u64,
4386 ) -> IotaResult {
4387 let epoch_store = self.load_epoch_store_one_call_per_task();
4388 let actual_epoch = epoch_store.epoch();
4389 if actual_epoch != expected_epoch {
4390 return Err(IotaError::WrongEpoch {
4391 expected_epoch,
4392 actual_epoch,
4393 });
4394 }
4395
4396 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4397 }
4398
4399 pub fn clear_override_protocol_upgrade_buffer_stake(
4400 &self,
4401 expected_epoch: EpochId,
4402 ) -> IotaResult {
4403 let epoch_store = self.load_epoch_store_one_call_per_task();
4404 let actual_epoch = epoch_store.epoch();
4405 if actual_epoch != expected_epoch {
4406 return Err(IotaError::WrongEpoch {
4407 expected_epoch,
4408 actual_epoch,
4409 });
4410 }
4411
4412 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4413 }
4414
4415 pub async fn get_available_system_packages(
4419 &self,
4420 binary_config: &BinaryConfig,
4421 ) -> Vec<ObjectRef> {
4422 let mut results = vec![];
4423
4424 let system_packages = BuiltInFramework::iter_system_packages();
4425
4426 #[cfg(msim)]
4428 let extra_packages = framework_injection::get_extra_packages(self.name);
4429 #[cfg(msim)]
4430 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4431
4432 for system_package in system_packages {
4433 let modules = system_package.modules().to_vec();
4434 #[cfg(msim)]
4436 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4437 .unwrap_or(modules);
4438
4439 let Some(obj_ref) = iota_framework::compare_system_package(
4440 &self.get_object_store(),
4441 &system_package.id,
4442 &modules,
4443 system_package.dependencies.to_vec(),
4444 binary_config,
4445 )
4446 .await
4447 else {
4448 return vec![];
4449 };
4450 results.push(obj_ref);
4451 }
4452
4453 results
4454 }
4455
4456 async fn get_system_package_bytes(
4473 &self,
4474 system_packages: Vec<ObjectRef>,
4475 binary_config: &BinaryConfig,
4476 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4477 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4478 let objects = self.get_objects(&ids).await;
4479
4480 let mut res = Vec::with_capacity(system_packages.len());
4481 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4482 let prev_transaction = match object {
4483 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4484 info!("Framework {} does not need updating", system_package_ref.0);
4486 continue;
4487 }
4488
4489 Some(cur_object) => cur_object.previous_transaction,
4490 None => TransactionDigest::genesis_marker(),
4491 };
4492
4493 #[cfg(msim)]
4494 let SystemPackage {
4495 id: _,
4496 bytes,
4497 dependencies,
4498 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4499 .unwrap_or_else(|| {
4500 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4501 });
4502
4503 #[cfg(not(msim))]
4504 let SystemPackage {
4505 id: _,
4506 bytes,
4507 dependencies,
4508 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4509
4510 let modules: Vec<_> = bytes
4511 .iter()
4512 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4513 .collect();
4514
4515 let new_object = Object::new_system_package(
4516 &modules,
4517 system_package_ref.1,
4518 dependencies.clone(),
4519 prev_transaction,
4520 );
4521
4522 let new_ref = new_object.compute_object_reference();
4523 if new_ref != system_package_ref {
4524 error!(
4525 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4526 );
4527 return None;
4528 }
4529
4530 res.push((system_package_ref.1, bytes, dependencies));
4531 }
4532
4533 Some(res)
4534 }
4535
4536 fn is_protocol_version_supported_v1(
4540 proposed_protocol_version: ProtocolVersion,
4541 committee: &Committee,
4542 capabilities: Vec<AuthorityCapabilitiesV1>,
4543 mut buffer_stake_bps: u64,
4544 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4545 if buffer_stake_bps > 10000 {
4546 warn!("clamping buffer_stake_bps to 10000");
4547 buffer_stake_bps = 10000;
4548 }
4549
4550 let mut desired_upgrades: Vec<_> = capabilities
4553 .into_iter()
4554 .filter_map(|mut cap| {
4555 if cap.available_system_packages.is_empty() {
4557 return None;
4558 }
4559
4560 cap.available_system_packages.sort();
4561
4562 info!(
4563 "validator {:?} supports {:?} with system packages: {:?}",
4564 cap.authority.concise(),
4565 cap.supported_protocol_versions,
4566 cap.available_system_packages,
4567 );
4568
4569 cap.supported_protocol_versions
4573 .get_version_digest(proposed_protocol_version)
4574 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4575 })
4576 .collect();
4577
4578 desired_upgrades.sort();
4581 desired_upgrades
4582 .into_iter()
4583 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4584 .into_iter()
4585 .find_map(|((digest, packages), group)| {
4586 assert!(!packages.is_empty());
4588
4589 let mut stake_aggregator: StakeAggregator<(), true> =
4590 StakeAggregator::new(Arc::new(committee.clone()));
4591
4592 for (_, _, authority) in group {
4593 stake_aggregator.insert_generic(authority, ());
4594 }
4595
4596 let total_votes = stake_aggregator.total_votes();
4597 let quorum_threshold = committee.quorum_threshold();
4598 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4599
4600 info!(
4601 protocol_config_digest = ?digest,
4602 ?total_votes,
4603 ?quorum_threshold,
4604 ?buffer_stake_bps,
4605 ?effective_threshold,
4606 ?proposed_protocol_version,
4607 ?packages,
4608 "support for upgrade"
4609 );
4610
4611 let has_support = total_votes >= effective_threshold;
4612 has_support.then_some((proposed_protocol_version, digest, packages))
4613 })
4614 }
4615
4616 fn choose_protocol_version_and_system_packages_v1(
4620 current_protocol_version: ProtocolVersion,
4621 current_protocol_digest: Digest,
4622 committee: &Committee,
4623 capabilities: Vec<AuthorityCapabilitiesV1>,
4624 buffer_stake_bps: u64,
4625 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4626 let mut next_protocol_version = current_protocol_version;
4627 let mut system_packages = vec![];
4628 let mut protocol_version_digest = current_protocol_digest;
4629
4630 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4634 next_protocol_version + 1,
4635 committee,
4636 capabilities.clone(),
4637 buffer_stake_bps,
4638 ) {
4639 next_protocol_version = version;
4640 protocol_version_digest = digest;
4641 system_packages = packages;
4642 }
4643
4644 (
4645 next_protocol_version,
4646 protocol_version_digest,
4647 system_packages,
4648 )
4649 }
4650
4651 fn get_validators_supporting_protocol_version(
4656 target_protocol_version: ProtocolVersion,
4657 target_digest: Digest,
4658 active_validators: &[AuthorityPublicKey],
4659 capabilities: &[AuthorityCapabilitiesV1],
4660 ) -> Vec<u64> {
4661 let mut eligible_validators = Vec::new();
4662
4663 for capability in capabilities {
4664 if let Some(digest) = capability
4666 .supported_protocol_versions
4667 .get_version_digest(target_protocol_version)
4668 {
4669 if digest == target_digest {
4670 if let Some(index) = active_validators
4672 .iter()
4673 .position(|name| AuthorityName::from(name) == capability.authority)
4674 {
4675 eligible_validators.push(index as u64);
4676 }
4677 }
4678 }
4679 }
4680
4681 eligible_validators.sort();
4683 eligible_validators
4684 }
4685
4686 fn calculate_eligible_validators_weight(
4691 eligible_validator_indices: &[u64],
4692 active_validators: &[AuthorityPublicKey],
4693 committee: &Committee,
4694 ) -> u64 {
4695 let mut total_weight = 0u64;
4696
4697 for &index in eligible_validator_indices {
4698 let authority_pubkey = &active_validators[index as usize];
4699 if let Some((_, weight)) = committee
4701 .members()
4702 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4703 {
4704 total_weight += weight;
4705 }
4706 }
4707
4708 total_weight
4709 }
4710
4711 #[instrument(level = "debug", skip_all)]
4712 fn create_authenticator_state_tx(
4713 &self,
4714 epoch_store: &Arc<AuthorityPerEpochStore>,
4715 ) -> Option<EndOfEpochTransactionKind> {
4716 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4717 info!("authenticator state transactions not enabled");
4718 return None;
4719 }
4720
4721 let authenticator_state_exists = epoch_store.authenticator_state_exists();
4722 let tx = if authenticator_state_exists {
4723 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4724 let min_epoch =
4725 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4726 let authenticator_obj_initial_shared_version = epoch_store
4727 .epoch_start_config()
4728 .authenticator_obj_initial_shared_version()
4729 .expect("initial version must exist");
4730
4731 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4732 min_epoch,
4733 authenticator_obj_initial_shared_version,
4734 );
4735
4736 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4737
4738 tx
4739 } else {
4740 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4741 info!("Creating AuthenticatorStateCreate tx");
4742 tx
4743 };
4744 Some(tx)
4745 }
4746
4747 #[instrument(level = "error", skip_all)]
4760 pub async fn create_and_execute_advance_epoch_tx(
4761 &self,
4762 epoch_store: &Arc<AuthorityPerEpochStore>,
4763 gas_cost_summary: &GasCostSummary,
4764 checkpoint: CheckpointSequenceNumber,
4765 epoch_start_timestamp_ms: CheckpointTimestamp,
4766 ) -> anyhow::Result<(
4767 IotaSystemState,
4768 Option<SystemEpochInfoEvent>,
4769 TransactionEffects,
4770 )> {
4771 let mut txns = Vec::new();
4772
4773 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4774 txns.push(tx);
4775 }
4776
4777 let next_epoch = epoch_store.epoch() + 1;
4778
4779 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4780 let authority_capabilities = epoch_store
4781 .get_capabilities_v1()
4782 .expect("read capabilities from db cannot fail");
4783 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4784 Self::choose_protocol_version_and_system_packages_v1(
4785 epoch_store.protocol_version(),
4786 SupportedProtocolVersionsWithHashes::protocol_config_digest(
4787 epoch_store.protocol_config(),
4788 ),
4789 epoch_store.committee(),
4790 authority_capabilities.clone(),
4791 buffer_stake_bps,
4792 );
4793
4794 let config = epoch_store.protocol_config();
4798 let binary_config = to_binary_config(config);
4799 let Some(next_epoch_system_package_bytes) = self
4800 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4801 .await
4802 else {
4803 error!(
4804 "upgraded system packages {:?} are not locally available, cannot create \
4805 ChangeEpochTx. validator binary must be upgraded to the correct version!",
4806 next_epoch_system_packages
4807 );
4808 bail!("missing system packages: cannot form ChangeEpochTx");
4818 };
4819
4820 if config.select_committee_from_eligible_validators() {
4824 let active_validators = epoch_store.epoch_start_state().get_active_validators();
4826
4827 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
4828
4829 if config.select_committee_supporting_next_epoch_version() {
4833 eligible_active_validators = Self::get_validators_supporting_protocol_version(
4834 next_epoch_protocol_version,
4835 next_epoch_protocol_digest,
4836 &active_validators,
4837 &authority_capabilities,
4838 );
4839
4840 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
4842 &eligible_active_validators,
4843 &active_validators,
4844 epoch_store.committee(),
4845 );
4846
4847 let committee = epoch_store.committee();
4851 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4852
4853 if eligible_validators_weight < effective_threshold {
4854 error!(
4855 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
4856 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
4857 );
4858 eligible_active_validators = (0..active_validators.len() as u64).collect();
4861 }
4862 }
4863
4864 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
4865 next_epoch,
4866 next_epoch_protocol_version,
4867 gas_cost_summary.storage_cost,
4868 gas_cost_summary.computation_cost,
4869 gas_cost_summary.computation_cost_burned,
4870 gas_cost_summary.storage_rebate,
4871 gas_cost_summary.non_refundable_storage_fee,
4872 epoch_start_timestamp_ms,
4873 next_epoch_system_package_bytes,
4874 eligible_active_validators,
4875 ));
4876 } else if config.protocol_defined_base_fee()
4877 && config.max_committee_members_count_as_option().is_some()
4878 {
4879 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4880 next_epoch,
4881 next_epoch_protocol_version,
4882 gas_cost_summary.storage_cost,
4883 gas_cost_summary.computation_cost,
4884 gas_cost_summary.computation_cost_burned,
4885 gas_cost_summary.storage_rebate,
4886 gas_cost_summary.non_refundable_storage_fee,
4887 epoch_start_timestamp_ms,
4888 next_epoch_system_package_bytes,
4889 ));
4890 } else {
4891 txns.push(EndOfEpochTransactionKind::new_change_epoch(
4892 next_epoch,
4893 next_epoch_protocol_version,
4894 gas_cost_summary.storage_cost,
4895 gas_cost_summary.computation_cost,
4896 gas_cost_summary.storage_rebate,
4897 gas_cost_summary.non_refundable_storage_fee,
4898 epoch_start_timestamp_ms,
4899 next_epoch_system_package_bytes,
4900 ));
4901 }
4902
4903 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4904
4905 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4906 tx.clone(),
4907 epoch_store.epoch(),
4908 checkpoint,
4909 );
4910
4911 let tx_digest = executable_tx.digest();
4912
4913 info!(
4914 ?next_epoch,
4915 ?next_epoch_protocol_version,
4916 ?next_epoch_system_packages,
4917 computation_cost=?gas_cost_summary.computation_cost,
4918 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4919 storage_cost=?gas_cost_summary.storage_cost,
4920 storage_rebate=?gas_cost_summary.storage_rebate,
4921 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4922 ?tx_digest,
4923 "Creating advance epoch transaction"
4924 );
4925
4926 fail_point_async!("change_epoch_tx_delay");
4927 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
4928
4929 if self
4933 .get_transaction_cache_reader()
4934 .try_is_tx_already_executed(tx_digest)?
4935 {
4936 warn!("change epoch tx has already been executed via state sync");
4937 bail!("change epoch tx has already been executed via state sync",);
4938 }
4939
4940 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
4941
4942 epoch_store.assign_shared_object_versions_idempotent(
4946 self.get_object_cache_reader().as_ref(),
4947 std::slice::from_ref(&executable_tx),
4948 )?;
4949
4950 let input_objects =
4951 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4952
4953 let (temporary_store, effects, _execution_error_opt) =
4954 self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4955 let system_obj = get_iota_system_state(&temporary_store.written)
4956 .expect("change epoch tx must write to system object");
4957 let system_epoch_info_event = temporary_store
4959 .events
4960 .data
4961 .into_iter()
4962 .find(|event| event.is_system_epoch_info_event())
4963 .map(SystemEpochInfoEvent::from);
4964 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4967
4968 self.get_state_sync_store()
4972 .try_insert_transaction_and_effects(&tx, &effects)
4973 .map_err(|err| {
4974 let err: anyhow::Error = err.into();
4975 err
4976 })?;
4977
4978 info!(
4979 "Effects summary of the change epoch transaction: {:?}",
4980 effects.summary_for_debug()
4981 );
4982 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
4983 assert!(effects.status().is_ok());
4985 Ok((system_obj, system_epoch_info_event, effects))
4986 }
4987
4988 #[instrument(level = "error", skip_all)]
4992 async fn revert_uncommitted_epoch_transactions(
4993 &self,
4994 epoch_store: &AuthorityPerEpochStore,
4995 ) -> IotaResult {
4996 {
4997 let state = epoch_store.get_reconfig_state_write_lock_guard();
4998 if state.should_accept_user_certs() {
4999 epoch_store.close_user_certs(state);
5008 }
5009 }
5011 let pending_certificates = epoch_store.pending_consensus_certificates();
5012 info!(
5013 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5014 pending_certificates.len(),
5015 pending_certificates,
5016 );
5017 for digest in pending_certificates {
5018 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5019 info!(
5020 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5021 digest
5022 );
5023 continue;
5024 }
5025 info!("Reverting {:?} at the end of epoch", digest);
5026 epoch_store.revert_executed_transaction(&digest)?;
5027 self.get_reconfig_api().try_revert_state_update(&digest)?;
5028 }
5029 info!("All uncommitted local transactions reverted");
5030 Ok(())
5031 }
5032
5033 #[instrument(level = "error", skip_all)]
5034 async fn reopen_epoch_db(
5035 &self,
5036 cur_epoch_store: &AuthorityPerEpochStore,
5037 new_committee: Committee,
5038 epoch_start_configuration: EpochStartConfiguration,
5039 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5040 epoch_last_checkpoint: CheckpointSequenceNumber,
5041 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5042 let new_epoch = new_committee.epoch;
5043 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5044 assert_eq!(
5045 epoch_start_configuration.epoch_start_state().epoch(),
5046 new_committee.epoch
5047 );
5048 fail_point!("before-open-new-epoch-store");
5049 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5050 self.name,
5051 new_committee,
5052 epoch_start_configuration,
5053 self.get_backing_package_store().clone(),
5054 self.get_object_store().clone(),
5055 expensive_safety_check_config,
5056 cur_epoch_store.get_chain_identifier(),
5057 epoch_last_checkpoint,
5058 );
5059 self.epoch_store.store(new_epoch_store.clone());
5060 Ok(new_epoch_store)
5061 }
5062
5063 #[cfg(test)]
5064 pub(crate) fn iter_live_object_set_for_testing(
5065 &self,
5066 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5067 self.get_accumulator_store()
5068 .iter_cached_live_object_set_for_testing()
5069 }
5070
5071 #[cfg(test)]
5072 pub(crate) fn shutdown_execution_for_test(&self) {
5073 self.tx_execution_shutdown
5074 .lock()
5075 .take()
5076 .unwrap()
5077 .send(())
5078 .unwrap();
5079 }
5080
5081 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5084 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5085 self.get_object_cache_reader()
5086 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5087 self.get_reconfig_api()
5088 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5089 }
5090}
5091
5092pub struct RandomnessRoundReceiver {
5093 authority_state: Arc<AuthorityState>,
5094 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5095}
5096
5097impl RandomnessRoundReceiver {
5098 pub fn spawn(
5099 authority_state: Arc<AuthorityState>,
5100 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5101 ) -> JoinHandle<()> {
5102 let rrr = RandomnessRoundReceiver {
5103 authority_state,
5104 randomness_rx,
5105 };
5106 spawn_monitored_task!(rrr.run())
5107 }
5108
5109 async fn run(mut self) {
5110 info!("RandomnessRoundReceiver event loop started");
5111
5112 loop {
5113 tokio::select! {
5114 maybe_recv = self.randomness_rx.recv() => {
5115 if let Some((epoch, round, bytes)) = maybe_recv {
5116 self.handle_new_randomness(epoch, round, bytes);
5117 } else {
5118 break;
5119 }
5120 },
5121 }
5122 }
5123
5124 info!("RandomnessRoundReceiver event loop ended");
5125 }
5126
5127 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5128 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5129 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5130 if epoch_store.epoch() != epoch {
5131 warn!(
5132 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5133 epoch_store.epoch()
5134 );
5135 return;
5136 }
5137 let transaction = VerifiedTransaction::new_randomness_state_update(
5138 epoch,
5139 round,
5140 bytes,
5141 epoch_store
5142 .epoch_start_config()
5143 .randomness_obj_initial_shared_version(),
5144 );
5145 debug!(
5146 "created randomness state update transaction with digest: {:?}",
5147 transaction.digest()
5148 );
5149 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5150 let digest = *transaction.digest();
5151
5152 self.authority_state
5157 .get_cache_commit()
5158 .persist_transaction(&transaction);
5159
5160 self.authority_state
5162 .transaction_manager()
5163 .enqueue(vec![transaction], &epoch_store);
5164
5165 let authority_state = self.authority_state.clone();
5166 spawn_monitored_task!(async move {
5167 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5176 let result = tokio::time::timeout(
5177 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5178 authority_state
5179 .get_transaction_cache_reader()
5180 .try_notify_read_executed_effects(&[digest]),
5181 )
5182 .await;
5183 let result = match result {
5184 Ok(result) => result,
5185 Err(_) => {
5186 if cfg!(debug_assertions) {
5187 panic!(
5189 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5190 );
5191 }
5192 warn!(
5193 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5194 );
5195 authority_state
5197 .get_transaction_cache_reader()
5198 .try_notify_read_executed_effects(&[digest])
5199 .await
5200 }
5201 };
5202
5203 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5204 let effects = effects.pop().expect("should return effects");
5205 if *effects.status() != ExecutionStatus::Success {
5206 fatal!(
5207 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5208 );
5209 }
5210 debug!(
5211 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5212 );
5213 });
5214 }
5215}
5216
5217#[async_trait]
5218impl TransactionKeyValueStoreTrait for AuthorityState {
5219 async fn multi_get(
5220 &self,
5221 transaction_keys: &[TransactionDigest],
5222 effects_keys: &[TransactionDigest],
5223 ) -> IotaResult<KVStoreTransactionData> {
5224 let txns = if !transaction_keys.is_empty() {
5225 self.get_transaction_cache_reader()
5226 .try_multi_get_transaction_blocks(transaction_keys)?
5227 .into_iter()
5228 .map(|t| t.map(|t| (*t).clone().into_inner()))
5229 .collect()
5230 } else {
5231 vec![]
5232 };
5233
5234 let fx = if !effects_keys.is_empty() {
5235 self.get_transaction_cache_reader()
5236 .try_multi_get_executed_effects(effects_keys)?
5237 } else {
5238 vec![]
5239 };
5240
5241 Ok((txns, fx))
5242 }
5243
5244 async fn multi_get_checkpoints(
5245 &self,
5246 checkpoint_summaries: &[CheckpointSequenceNumber],
5247 checkpoint_contents: &[CheckpointSequenceNumber],
5248 checkpoint_summaries_by_digest: &[CheckpointDigest],
5249 ) -> IotaResult<(
5250 Vec<Option<CertifiedCheckpointSummary>>,
5251 Vec<Option<CheckpointContents>>,
5252 Vec<Option<CertifiedCheckpointSummary>>,
5253 )> {
5254 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5256 let store = self.get_checkpoint_store();
5257 for seq in checkpoint_summaries {
5258 let checkpoint = store
5259 .get_checkpoint_by_sequence_number(*seq)?
5260 .map(|c| c.into_inner());
5261
5262 summaries.push(checkpoint);
5263 }
5264
5265 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5266 for seq in checkpoint_contents {
5267 let checkpoint = store
5268 .get_checkpoint_by_sequence_number(*seq)?
5269 .and_then(|summary| {
5270 store
5271 .get_checkpoint_contents(&summary.content_digest)
5272 .expect("db read cannot fail")
5273 });
5274 contents.push(checkpoint);
5275 }
5276
5277 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5278 for digest in checkpoint_summaries_by_digest {
5279 let checkpoint = store
5280 .get_checkpoint_by_digest(digest)?
5281 .map(|c| c.into_inner());
5282 summaries_by_digest.push(checkpoint);
5283 }
5284
5285 Ok((summaries, contents, summaries_by_digest))
5286 }
5287
5288 async fn get_transaction_perpetual_checkpoint(
5289 &self,
5290 digest: TransactionDigest,
5291 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5292 self.get_checkpoint_cache()
5293 .try_get_transaction_perpetual_checkpoint(&digest)
5294 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5295 }
5296
5297 async fn get_object(
5298 &self,
5299 object_id: ObjectID,
5300 version: VersionNumber,
5301 ) -> IotaResult<Option<Object>> {
5302 self.get_object_cache_reader()
5303 .try_get_object_by_key(&object_id, version)
5304 }
5305
5306 async fn multi_get_transactions_perpetual_checkpoints(
5307 &self,
5308 digests: &[TransactionDigest],
5309 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5310 let res = self
5311 .get_checkpoint_cache()
5312 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5313
5314 Ok(res
5315 .into_iter()
5316 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5317 .collect())
5318 }
5319
5320 #[instrument(skip(self))]
5321 async fn multi_get_events_by_tx_digests(
5322 &self,
5323 digests: &[TransactionDigest],
5324 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5325 if digests.is_empty() {
5326 return Ok(vec![]);
5327 }
5328 let events_digests: Vec<_> = self
5329 .get_transaction_cache_reader()
5330 .try_multi_get_executed_effects(digests)?
5331 .into_iter()
5332 .map(|t| t.and_then(|t| t.events_digest().cloned()))
5333 .collect();
5334 let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5335 let mut events = self
5336 .get_transaction_cache_reader()
5337 .try_multi_get_events(&non_empty_events)?
5338 .into_iter();
5339 Ok(events_digests
5340 .into_iter()
5341 .map(|ev| ev.and_then(|_| events.next()?))
5342 .collect())
5343 }
5344}
5345
5346#[cfg(msim)]
5347pub mod framework_injection {
5348 use std::{
5349 cell::RefCell,
5350 collections::{BTreeMap, BTreeSet},
5351 };
5352
5353 use iota_framework::{BuiltInFramework, SystemPackage};
5354 use iota_types::{
5355 base_types::{AuthorityName, ObjectID},
5356 is_system_package,
5357 };
5358 use move_binary_format::CompiledModule;
5359
5360 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5361
5362 thread_local! {
5364 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5365 }
5366
5367 type Framework = Vec<CompiledModule>;
5368
5369 pub type PackageUpgradeCallback =
5370 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5371
5372 enum PackageOverrideConfig {
5373 Global(Framework),
5374 PerValidator(PackageUpgradeCallback),
5375 }
5376
5377 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5378 modules
5379 .iter()
5380 .map(|m| {
5381 let mut buf = Vec::new();
5382 m.serialize_with_version(m.version, &mut buf).unwrap();
5383 buf
5384 })
5385 .collect()
5386 }
5387
5388 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5389 OVERRIDE.with(|bs| {
5390 bs.borrow_mut()
5391 .insert(package_id, PackageOverrideConfig::Global(modules))
5392 });
5393 }
5394
5395 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5396 OVERRIDE.with(|bs| {
5397 bs.borrow_mut()
5398 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5399 });
5400 }
5401
5402 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5403 OVERRIDE.with(|cfg| {
5404 cfg.borrow().get(package_id).and_then(|entry| match entry {
5405 PackageOverrideConfig::Global(framework) => {
5406 Some(compiled_modules_to_bytes(framework))
5407 }
5408 PackageOverrideConfig::PerValidator(func) => {
5409 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5410 }
5411 })
5412 })
5413 }
5414
5415 pub fn get_override_modules(
5416 package_id: &ObjectID,
5417 name: AuthorityName,
5418 ) -> Option<Vec<CompiledModule>> {
5419 OVERRIDE.with(|cfg| {
5420 cfg.borrow().get(package_id).and_then(|entry| match entry {
5421 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5422 PackageOverrideConfig::PerValidator(func) => func(name),
5423 })
5424 })
5425 }
5426
5427 pub fn get_override_system_package(
5428 package_id: &ObjectID,
5429 name: AuthorityName,
5430 ) -> Option<SystemPackage> {
5431 let bytes = get_override_bytes(package_id, name)?;
5432 let dependencies = if is_system_package(*package_id) {
5433 BuiltInFramework::get_package_by_id(package_id)
5434 .dependencies
5435 .to_vec()
5436 } else {
5437 BuiltInFramework::all_package_ids()
5440 };
5441 Some(SystemPackage {
5442 id: *package_id,
5443 bytes,
5444 dependencies,
5445 })
5446 }
5447
5448 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5449 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5450 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5451 cfg.borrow()
5452 .keys()
5453 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5454 .collect()
5455 });
5456
5457 extra
5458 .into_iter()
5459 .map(|package| SystemPackage {
5460 id: package,
5461 bytes: get_override_bytes(&package, name).unwrap(),
5462 dependencies: BuiltInFramework::all_package_ids(),
5463 })
5464 .collect()
5465 }
5466}
5467
5468#[derive(Debug, Serialize, Deserialize, Clone)]
5469pub struct ObjDumpFormat {
5470 pub id: ObjectID,
5471 pub version: VersionNumber,
5472 pub digest: ObjectDigest,
5473 pub object: Object,
5474}
5475
5476impl ObjDumpFormat {
5477 fn new(object: Object) -> Self {
5478 let oref = object.compute_object_reference();
5479 Self {
5480 id: oref.0,
5481 version: oref.1,
5482 digest: oref.2,
5483 object,
5484 }
5485 }
5486}
5487
5488#[derive(Debug, Serialize, Deserialize, Clone)]
5489pub struct NodeStateDump {
5490 pub tx_digest: TransactionDigest,
5491 pub sender_signed_data: SenderSignedData,
5492 pub executed_epoch: u64,
5493 pub reference_gas_price: u64,
5494 pub protocol_version: u64,
5495 pub epoch_start_timestamp_ms: u64,
5496 pub computed_effects: TransactionEffects,
5497 pub expected_effects_digest: TransactionEffectsDigest,
5498 pub relevant_system_packages: Vec<ObjDumpFormat>,
5499 pub shared_objects: Vec<ObjDumpFormat>,
5500 pub loaded_child_objects: Vec<ObjDumpFormat>,
5501 pub modified_at_versions: Vec<ObjDumpFormat>,
5502 pub runtime_reads: Vec<ObjDumpFormat>,
5503 pub input_objects: Vec<ObjDumpFormat>,
5504}
5505
5506impl NodeStateDump {
5507 pub fn new(
5508 tx_digest: &TransactionDigest,
5509 effects: &TransactionEffects,
5510 expected_effects_digest: TransactionEffectsDigest,
5511 object_store: &dyn ObjectStore,
5512 epoch_store: &Arc<AuthorityPerEpochStore>,
5513 inner_temporary_store: &InnerTemporaryStore,
5514 certificate: &VerifiedExecutableTransaction,
5515 ) -> IotaResult<Self> {
5516 let executed_epoch = epoch_store.epoch();
5518 let reference_gas_price = epoch_store.reference_gas_price();
5519 let epoch_start_config = epoch_store.epoch_start_config();
5520 let protocol_version = epoch_store.protocol_version().as_u64();
5521 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5522
5523 let mut relevant_system_packages = Vec::new();
5525 for sys_package_id in BuiltInFramework::all_package_ids() {
5526 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5527 relevant_system_packages.push(ObjDumpFormat::new(w))
5528 }
5529 }
5530
5531 let mut shared_objects = Vec::new();
5533 for kind in effects.input_shared_objects() {
5534 match kind {
5535 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5536 if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5537 shared_objects.push(ObjDumpFormat::new(w))
5538 }
5539 }
5540 InputSharedObject::ReadDeleted(..)
5541 | InputSharedObject::MutateDeleted(..)
5542 | InputSharedObject::Cancelled(..) => (), }
5545 }
5546
5547 let mut loaded_child_objects = Vec::new();
5550 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5551 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5552 loaded_child_objects.push(ObjDumpFormat::new(w))
5553 }
5554 }
5555
5556 let mut modified_at_versions = Vec::new();
5558 for (id, ver) in effects.modified_at_versions() {
5559 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5560 modified_at_versions.push(ObjDumpFormat::new(w))
5561 }
5562 }
5563
5564 let mut runtime_reads = Vec::new();
5568 for obj in inner_temporary_store
5569 .runtime_packages_loaded_from_db
5570 .values()
5571 {
5572 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5573 }
5574
5575 Ok(Self {
5578 tx_digest: *tx_digest,
5579 executed_epoch,
5580 reference_gas_price,
5581 epoch_start_timestamp_ms,
5582 protocol_version,
5583 relevant_system_packages,
5584 shared_objects,
5585 loaded_child_objects,
5586 modified_at_versions,
5587 runtime_reads,
5588 sender_signed_data: certificate.clone().into_message(),
5589 input_objects: inner_temporary_store
5590 .input_objects
5591 .values()
5592 .map(|o| ObjDumpFormat::new(o.clone()))
5593 .collect(),
5594 computed_effects: effects.clone(),
5595 expected_effects_digest,
5596 })
5597 }
5598
5599 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5600 let mut objects = Vec::new();
5601 objects.extend(self.relevant_system_packages.clone());
5602 objects.extend(self.shared_objects.clone());
5603 objects.extend(self.loaded_child_objects.clone());
5604 objects.extend(self.modified_at_versions.clone());
5605 objects.extend(self.runtime_reads.clone());
5606 objects.extend(self.input_objects.clone());
5607 objects
5608 }
5609
5610 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5611 let file_name = format!(
5612 "{}_{}_NODE_DUMP.json",
5613 self.tx_digest,
5614 AuthorityState::unixtime_now_ms()
5615 );
5616 let mut path = path.to_path_buf();
5617 path.push(&file_name);
5618 let mut file = File::create(path.clone())?;
5619 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5620 Ok(path)
5621 }
5622
5623 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5624 let file = File::open(path)?;
5625 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5626 }
5627}