1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::TxLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage as FrameworkSystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::{
48 Address, EndOfEpochTransactionKind, Event, ExecutionStatus, ObjectId, Owner, RandomnessRound,
49 StructTag, TransactionExpiration, TransactionKind, TypeTag,
50 crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion},
51 gas::GasCostSummary,
52};
53use iota_storage::{
54 key_value_store::{
55 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
56 },
57 key_value_store_metrics::KeyValueStoreMetrics,
58};
59#[cfg(msim)]
60use iota_types::committee::CommitteeTrait;
61use iota_types::{
62 account_abstraction::{
63 account::AuthenticatorFunctionRefV1Key,
64 authenticator_function::{
65 AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
66 AuthenticatorFunctionRefV1, extract_auth_fun_refs,
67 },
68 },
69 auth_context::AuthContextData,
70 base_types::{
71 AuthorityName, ConciseableName, ObjectInfo, ObjectRef, ObjectType, SequenceNumber,
72 VersionNumber,
73 },
74 committee::{Committee, EpochId, ProtocolVersion},
75 crypto::{AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, Signer},
76 deny_list_v1::check_coin_deny_list_v1,
77 digests::{ChainIdentifier, Digest, ObjectDigest, TransactionDigest, TransactionEffectsDigest},
78 dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
79 effects::{
80 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
81 TransactionEffectsExt, TransactionEvents, VerifiedSignedTransactionEffects,
82 },
83 error::{ExecutionError, IotaError, IotaResult, UserInputError},
84 event::{EventID, SystemEpochInfoEvent},
85 executable_transaction::VerifiedExecutableTransaction,
86 execution_config_utils::to_binary_config,
87 fp_ensure,
88 gas::IotaGasStatus,
89 gas_coin::NANOS_PER_IOTA,
90 inner_temporary_store::{
91 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
92 WrittenObjects,
93 },
94 iota_sdk_types_conversions::type_tag_core_to_sdk,
95 iota_system_state::{
96 IotaSystemState, IotaSystemStateTrait,
97 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
98 },
99 layout_resolver::{LayoutResolver, into_struct_layout},
100 message_envelope::Message,
101 messages_checkpoint::{
102 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
103 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
104 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
105 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
106 },
107 messages_consensus::AuthorityCapabilitiesV1,
108 messages_grpc::{
109 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
110 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
111 TransactionStatus,
112 },
113 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
114 move_authenticator::{MoveAuthenticator, MoveAuthenticatorExt},
115 object::{
116 MoveObject, MoveObjectExt, OBJECT_START_VERSION, Object, ObjectRead, PastObjectRead,
117 bounded_visitor::BoundedVisitor,
118 },
119 storage::{
120 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
121 },
122 supported_protocol_versions::{
123 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
124 },
125 traffic_control::{PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams},
126 transaction::*,
127 transaction_executor::{SimulateTransactionResult, VmChecks},
128};
129use itertools::Itertools;
130use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
131use move_core_types::{
132 account_address::AccountAddress, annotated_value::MoveStructLayout, language_storage::ModuleId,
133};
134use parking_lot::Mutex;
135use prometheus_filtered::{
136 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
137 register_histogram_vec_with_registry, register_histogram_with_registry,
138 register_int_counter_vec_with_registry, register_int_counter_with_registry,
139 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
140};
141use serde::{Deserialize, Serialize, de::DeserializeOwned};
142use tap::TapFallible;
143use tokio::{
144 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
145 task::JoinHandle,
146};
147use tracing::{debug, error, info, instrument, trace, warn};
148use typed_store::TypedStoreError;
149
150use self::{
151 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
152};
153#[cfg(msim)]
154pub use crate::checkpoints::checkpoint_executor::utils::{
155 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
156};
157use crate::{
158 authority::{
159 authority_per_epoch_store::{AuthorityPerEpochStore, TxGuard},
160 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
161 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
162 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
163 authority_store_tables::AuthorityPrunerTables,
164 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
165 },
166 authority_client::NetworkAuthorityClient,
167 checkpoint_progress_tracker::CheckpointProgressTracker,
168 checkpoints::CheckpointStore,
169 congestion_tracker::CongestionTracker,
170 consensus_adapter::ConsensusAdapter,
171 epoch::committee_store::CommitteeStore,
172 execution_cache::{
173 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
174 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
175 TransactionCacheRead,
176 },
177 execution_driver::execution_process,
178 global_state_hasher::{GlobalStateHashStore, GlobalStateHasher},
179 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
180 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
181 metrics::{LatencyObserver, RateTracker},
182 module_cache_metrics::ResolverMetrics,
183 overload_monitor::{
184 AuthorityOverloadInfo, compute_graduated_load_shedding_percentage,
185 overload_monitor_accept_tx,
186 },
187 stake_aggregator::StakeAggregator,
188 subscription_handler::SubscriptionHandler,
189 traffic_controller::{TrafficController, metrics::TrafficControllerMetrics},
190 transaction_input_loader::TransactionInputLoader,
191 transaction_manager::TransactionManager,
192 transaction_outputs::TransactionOutputs,
193 validator_tx_finalizer::ValidatorTxFinalizer,
194 verify_indexes::verify_indexes,
195};
196
197#[cfg(test)]
198#[path = "unit_tests/authority_tests.rs"]
199pub mod authority_tests;
200
201#[cfg(test)]
202#[path = "unit_tests/transaction_tests.rs"]
203pub mod transaction_tests;
204
205#[cfg(test)]
206#[path = "unit_tests/batch_transaction_tests.rs"]
207mod batch_transaction_tests;
208
209#[cfg(test)]
210#[path = "unit_tests/move_integration_tests.rs"]
211pub mod move_integration_tests;
212
213#[cfg(test)]
214#[path = "unit_tests/gas_tests.rs"]
215mod gas_tests;
216
217#[cfg(test)]
218#[path = "unit_tests/batch_verification_tests.rs"]
219mod batch_verification_tests;
220
221#[cfg(test)]
222#[path = "unit_tests/coin_deny_list_tests.rs"]
223mod coin_deny_list_tests;
224
225#[cfg(test)]
226#[path = "unit_tests/auth_unit_test_utils.rs"]
227pub mod auth_unit_test_utils;
228
229pub mod authority_test_utils;
230
231pub mod authority_per_epoch_store;
232pub mod authority_per_epoch_store_pruner;
233
234mod authority_store_migrations;
235pub mod authority_store_pruner;
236pub mod authority_store_tables;
237pub mod authority_store_types;
238pub mod epoch_start_configuration;
239pub mod shared_object_congestion_tracker;
240pub mod shared_object_version_manager;
241pub mod suggested_gas_price_calculator;
242pub mod test_authority_builder;
243pub mod transaction_deferral;
244
245pub(crate) mod authority_store;
246pub mod backpressure;
247pub(crate) mod dropped_tx_status_cache;
248
249pub struct AuthorityMetrics {
251 tx_orders: IntCounter,
252 total_certs: IntCounter,
253 total_cert_attempts: IntCounter,
254 total_effects: IntCounter,
255 pub shared_obj_tx: IntCounter,
256 sponsored_tx: IntCounter,
257 tx_already_processed: IntCounter,
258 num_input_objs: Histogram,
259 num_shared_objects: Histogram,
260 batch_size: Histogram,
261
262 authority_state_handle_transaction_latency: Histogram,
263
264 execute_certificate_latency_single_writer: Histogram,
265 execute_certificate_latency_shared_object: Histogram,
266
267 internal_execution_latency: Histogram,
268 execution_load_input_objects_latency: Histogram,
269 prepare_certificate_latency: Histogram,
270 commit_certificate_latency: Histogram,
271 db_checkpoint_latency: Histogram,
272
273 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
274 pub(crate) transaction_manager_num_missing_objects: IntGauge,
275 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
276 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
277 pub(crate) transaction_manager_num_ready: IntGauge,
278 pub(crate) transaction_manager_object_cache_size: IntGauge,
279 pub(crate) transaction_manager_object_cache_hits: IntCounter,
280 pub(crate) transaction_manager_object_cache_misses: IntCounter,
281 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
282 pub(crate) transaction_manager_package_cache_size: IntGauge,
283 pub(crate) transaction_manager_package_cache_hits: IntCounter,
284 pub(crate) transaction_manager_package_cache_misses: IntCounter,
285 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
286 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
287
288 pub(crate) execution_driver_executed_transactions: IntCounter,
289 pub(crate) execution_driver_dispatch_queue: IntGauge,
290 pub(crate) execution_queueing_delay_s: Histogram,
291 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
292 pub(crate) execution_gas_latency_ratio: Histogram,
293
294 pub(crate) skipped_consensus_txns: IntCounter,
295 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
296
297 pub(crate) authority_overload_status: IntGauge,
298 pub(crate) consensus_queue_load_shedding_percentage: IntGauge,
300 pub(crate) local_post_consensus_load_shedding_percentage: IntGauge,
304
305 pub(crate) transaction_overload_sources: IntCounterVec,
306
307 post_processing_total_events_emitted: IntCounter,
309 post_processing_total_tx_indexed: IntCounter,
310 post_processing_total_tx_had_event_processed: IntCounter,
311 post_processing_total_failures: IntCounter,
312
313 pub consensus_handler_processed: IntCounterVec,
315 pub consensus_handler_transaction_sizes: HistogramVec,
316 pub consensus_handler_num_low_scoring_authorities: IntGauge,
317 pub consensus_handler_scores: IntGaugeVec,
318 pub consensus_handler_deferred_transactions: IntCounter,
319 pub consensus_handler_congested_transactions: IntCounter,
320 pub consensus_handler_cancelled_transactions: IntCounter,
321 pub consensus_handler_validation_dropped_transactions: IntCounter,
325 pub consensus_handler_load_shedding_dropped_transactions: IntCounter,
329 pub consensus_handler_load_shedding_percentage: IntGauge,
335 pub consensus_handler_max_object_costs: IntGaugeVec,
336 pub consensus_committed_subdags: IntCounterVec,
337 pub consensus_committed_messages: IntGaugeVec,
338 pub consensus_committed_user_transactions: IntGaugeVec,
339 pub consensus_handler_leader_round: IntGauge,
340 pub consensus_calculated_throughput: IntGauge,
341 pub consensus_calculated_throughput_profile: IntGauge,
342
343 pub validator_scoreboard_scores: IntGaugeVec,
344 pub invalid_misbehavior_reports_by_authority: IntGaugeVec,
345
346 pub limits_metrics: Arc<LimitsMetrics>,
347
348 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
350
351 pub multisig_sig_count: IntCounter,
353
354 pub execution_queueing_latency: LatencyObserver,
357
358 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
365
366 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
369}
370
371const POSITIVE_INT_BUCKETS: &[f64] = &[
373 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
374 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
375 7000000., 10000000.,
376];
377
378const LATENCY_SEC_BUCKETS: &[f64] = &[
379 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
380 10., 20., 30., 60., 90.,
381];
382
383const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
385 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
386 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
387];
388
389const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
390 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
391 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
392];
393
394pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
398 pub fn new(registry: &prometheus_filtered::Registry) -> AuthorityMetrics {
399 let execute_certificate_latency = register_histogram_vec_with_registry!(
400 "authority_state_execute_certificate_latency",
401 "Latency of executing certificates, including waiting for inputs",
402 &["tx_type"],
403 LATENCY_SEC_BUCKETS.to_vec(),
404 registry,
405 )
406 .unwrap();
407
408 let execute_certificate_latency_single_writer =
409 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
410 let execute_certificate_latency_shared_object =
411 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
412
413 Self {
414 tx_orders: register_int_counter_with_registry!(
415 "total_transaction_orders",
416 "Total number of transaction orders",
417 registry,
418 )
419 .unwrap(),
420 total_certs: register_int_counter_with_registry!(
421 "total_transaction_certificates",
422 "Total number of transaction certificates handled",
423 registry,
424 )
425 .unwrap(),
426 total_cert_attempts: register_int_counter_with_registry!(
427 "total_handle_certificate_attempts",
428 "Number of calls to handle_certificate",
429 registry,
430 )
431 .unwrap(),
432 total_effects: register_int_counter_with_registry!(
434 "total_transaction_effects",
435 "Total number of transaction effects produced",
436 registry,
437 )
438 .unwrap(),
439
440 shared_obj_tx: register_int_counter_with_registry!(
441 "num_shared_obj_tx",
442 "Number of transactions involving shared objects",
443 registry,
444 )
445 .unwrap(),
446
447 sponsored_tx: register_int_counter_with_registry!(
448 "num_sponsored_tx",
449 "Number of sponsored transactions",
450 registry,
451 )
452 .unwrap(),
453
454 tx_already_processed: register_int_counter_with_registry!(
455 "num_tx_already_processed",
456 "Number of transaction orders already processed previously",
457 registry,
458 )
459 .unwrap(),
460 num_input_objs: register_histogram_with_registry!(
461 "num_input_objects",
462 "Distribution of number of input TX objects per TX",
463 POSITIVE_INT_BUCKETS.to_vec(),
464 registry,
465 )
466 .unwrap(),
467 num_shared_objects: register_histogram_with_registry!(
468 "num_shared_objects",
469 "Number of shared input objects per TX",
470 POSITIVE_INT_BUCKETS.to_vec(),
471 registry,
472 )
473 .unwrap(),
474 batch_size: register_histogram_with_registry!(
475 "batch_size",
476 "Distribution of size of transaction batch",
477 POSITIVE_INT_BUCKETS.to_vec(),
478 registry,
479 )
480 .unwrap(),
481 authority_state_handle_transaction_latency: register_histogram_with_registry!(
482 "authority_state_handle_transaction_latency",
483 "Latency of handling transactions",
484 LATENCY_SEC_BUCKETS.to_vec(),
485 registry,
486 )
487 .unwrap(),
488 execute_certificate_latency_single_writer,
489 execute_certificate_latency_shared_object,
490 internal_execution_latency: register_histogram_with_registry!(
491 "authority_state_internal_execution_latency",
492 "Latency of actual certificate executions",
493 LATENCY_SEC_BUCKETS.to_vec(),
494 registry,
495 )
496 .unwrap(),
497 execution_load_input_objects_latency: register_histogram_with_registry!(
498 "authority_state_execution_load_input_objects_latency",
499 "Latency of loading input objects for execution",
500 LOW_LATENCY_SEC_BUCKETS.to_vec(),
501 registry,
502 )
503 .unwrap(),
504 prepare_certificate_latency: register_histogram_with_registry!(
505 "authority_state_prepare_certificate_latency",
506 "Latency of executing certificates, before committing the results",
507 LATENCY_SEC_BUCKETS.to_vec(),
508 registry,
509 )
510 .unwrap(),
511 commit_certificate_latency: register_histogram_with_registry!(
512 "authority_state_commit_certificate_latency",
513 "Latency of committing certificate execution results",
514 LATENCY_SEC_BUCKETS.to_vec(),
515 registry,
516 )
517 .unwrap(),
518 db_checkpoint_latency: register_histogram_with_registry!(
519 "db_checkpoint_latency",
520 "Latency of checkpointing dbs",
521 LATENCY_SEC_BUCKETS.to_vec(),
522 registry,
523 ).unwrap(),
524 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
525 "transaction_manager_num_enqueued_certificates",
526 "Current number of certificates enqueued to TransactionManager",
527 &["result"],
528 registry,
529 )
530 .unwrap(),
531 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
532 "transaction_manager_num_missing_objects",
533 "Current number of missing objects in TransactionManager",
534 registry,
535 )
536 .unwrap(),
537 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
538 "transaction_manager_num_pending_certificates",
539 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
540 registry,
541 )
542 .unwrap(),
543 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
544 "transaction_manager_num_executing_certificates",
545 "Number of executing certificates, including queued and actually running certificates",
546 registry,
547 )
548 .unwrap(),
549 transaction_manager_num_ready: register_int_gauge_with_registry!(
550 "transaction_manager_num_ready",
551 "Number of ready transactions in TransactionManager",
552 registry,
553 )
554 .unwrap(),
555 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
556 "transaction_manager_object_cache_size",
557 "Current size of object-availability cache in TransactionManager",
558 registry,
559 )
560 .unwrap(),
561 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
562 "transaction_manager_object_cache_hits",
563 "Number of object-availability cache hits in TransactionManager",
564 registry,
565 )
566 .unwrap(),
567 authority_overload_status: register_int_gauge_with_registry!(
568 "authority_overload_status",
569 "Whether authority is current experiencing overload and enters load shedding mode.",
570 registry)
571 .unwrap(),
572 local_post_consensus_load_shedding_percentage: register_int_gauge_with_registry!(
573 "authority_load_shedding_percentage",
574 "This authority's locally computed load shedding percentage. In the P-COOL flow this is the value broadcast to peers, not necessarily the rate enforced (see consensus_handler_load_shedding_percentage).",
575 registry)
576 .unwrap(),
577 consensus_queue_load_shedding_percentage: register_int_gauge_with_registry!(
578 "consensus_queue_load_shedding_percentage",
579 "Percentage of transactions shed due to consensus queue length. Separate admission-control signal, not an input to authority_load_shedding_percentage.",
580 registry)
581 .unwrap(),
582 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
583 "transaction_manager_object_cache_misses",
584 "Number of object-availability cache misses in TransactionManager",
585 registry,
586 )
587 .unwrap(),
588 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
589 "transaction_manager_object_cache_evictions",
590 "Number of object-availability cache evictions in TransactionManager",
591 registry,
592 )
593 .unwrap(),
594 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
595 "transaction_manager_package_cache_size",
596 "Current size of package-availability cache in TransactionManager",
597 registry,
598 )
599 .unwrap(),
600 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
601 "transaction_manager_package_cache_hits",
602 "Number of package-availability cache hits in TransactionManager",
603 registry,
604 )
605 .unwrap(),
606 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
607 "transaction_manager_package_cache_misses",
608 "Number of package-availability cache misses in TransactionManager",
609 registry,
610 )
611 .unwrap(),
612 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
613 "transaction_manager_package_cache_evictions",
614 "Number of package-availability cache evictions in TransactionManager",
615 registry,
616 )
617 .unwrap(),
618 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
619 "transaction_manager_transaction_queue_age_s",
620 "Time spent in waiting for transaction in the queue",
621 LATENCY_SEC_BUCKETS.to_vec(),
622 registry,
623 )
624 .unwrap(),
625 transaction_overload_sources: register_int_counter_vec_with_registry!(
626 "transaction_overload_sources",
627 "Number of times each source indicates transaction overload.",
628 &["source"],
629 registry)
630 .unwrap(),
631 execution_driver_executed_transactions: register_int_counter_with_registry!(
632 "execution_driver_executed_transactions",
633 "Cumulative number of transaction executed by execution driver",
634 registry,
635 )
636 .unwrap(),
637 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
638 "execution_driver_dispatch_queue",
639 "Number of transaction pending in execution driver dispatch queue",
640 registry,
641 )
642 .unwrap(),
643 execution_queueing_delay_s: register_histogram_with_registry!(
644 "execution_queueing_delay_s",
645 "Queueing delay between a transaction is ready for execution until it starts executing.",
646 LATENCY_SEC_BUCKETS.to_vec(),
647 registry
648 )
649 .unwrap(),
650 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
651 "prepare_cert_gas_latency_ratio",
652 "The ratio of computation gas divided by VM execution latency.",
653 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
654 registry
655 )
656 .unwrap(),
657 execution_gas_latency_ratio: register_histogram_with_registry!(
658 "execution_gas_latency_ratio",
659 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
660 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
661 registry
662 )
663 .unwrap(),
664 skipped_consensus_txns: register_int_counter_with_registry!(
665 "skipped_consensus_txns",
666 "Total number of consensus transactions skipped",
667 registry,
668 )
669 .unwrap(),
670 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
671 "skipped_consensus_txns_cache_hit",
672 "Total number of consensus transactions skipped because of local cache hit",
673 registry,
674 )
675 .unwrap(),
676 post_processing_total_events_emitted: register_int_counter_with_registry!(
677 "post_processing_total_events_emitted",
678 "Total number of events emitted in post processing",
679 registry,
680 )
681 .unwrap(),
682 post_processing_total_tx_indexed: register_int_counter_with_registry!(
683 "post_processing_total_tx_indexed",
684 "Total number of txes indexed in post processing",
685 registry,
686 )
687 .unwrap(),
688 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
689 "post_processing_total_tx_had_event_processed",
690 "Total number of txes finished event processing in post processing",
691 registry,
692 )
693 .unwrap(),
694 post_processing_total_failures: register_int_counter_with_registry!(
695 "post_processing_total_failures",
696 "Total number of failure in post processing",
697 registry,
698 )
699 .unwrap(),
700 consensus_handler_processed: register_int_counter_vec_with_registry!(
701 "consensus_handler_processed",
702 "Number of transactions processed by consensus handler",
703 &["class"],
704 registry
705 ).unwrap(),
706 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
707 "consensus_handler_transaction_sizes",
708 "Sizes of each type of transactions processed by consensus handler",
709 &["class"],
710 POSITIVE_INT_BUCKETS.to_vec(),
711 registry
712 ).unwrap(),
713 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
714 "consensus_handler_num_low_scoring_authorities",
715 "Number of low scoring authorities based on reputation scores from consensus",
716 registry
717 ).unwrap(),
718 consensus_handler_scores: register_int_gauge_vec_with_registry!(
719 "consensus_handler_scores",
720 "scores from consensus for each authority",
721 &["authority"],
722 registry,
723 ).unwrap(),
724 validator_scoreboard_scores: register_int_gauge_vec_with_registry!(
725 "validator_scoreboard_scores",
726 "Per-authority validator scores published by the local Scoreboard after each consensus commit. Range [0, MAX_SCORE].",
727 &["authority"],
728 registry,
729 ).unwrap(),
730 invalid_misbehavior_reports_by_authority: register_int_gauge_vec_with_registry!(
731 "invalid_misbehavior_reports_by_authority",
732 "Cumulative count of invalid misbehavior reports received from each reporting authority in the current epoch. Bumped when a `MisbehaviorReport` consensus transaction fails sender/authority match or payload validation. Snapshot republished after each consensus commit.",
733 &["authority"],
734 registry,
735 ).unwrap(),
736 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
737 "consensus_handler_deferred_transactions",
738 "Number of transactions deferred by consensus handler",
739 registry,
740 ).unwrap(),
741 consensus_handler_congested_transactions: register_int_counter_with_registry!(
742 "consensus_handler_congested_transactions",
743 "Number of transactions deferred by consensus handler due to congestion",
744 registry,
745 ).unwrap(),
746 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
747 "consensus_handler_cancelled_transactions",
748 "Number of transactions cancelled by consensus handler",
749 registry,
750 ).unwrap(),
751 consensus_handler_validation_dropped_transactions: register_int_counter_with_registry!(
752 "consensus_handler_validation_dropped_transactions",
753 "Number of UserTransactionV1 transactions dropped by post-consensus validation",
754 registry,
755 ).unwrap(),
756 consensus_handler_load_shedding_dropped_transactions: register_int_counter_with_registry!(
757 "consensus_handler_load_shedding_dropped_transactions",
758 "Number of user transactions dropped by post-consensus load shedding, based on the quorum load shedding percentage",
759 registry,
760 ).unwrap(),
761 consensus_handler_load_shedding_percentage: register_int_gauge_with_registry!(
762 "consensus_handler_load_shedding_percentage",
763 "Stake-weighted quorum (2f+1) load shedding percentage enforced on user transactions in the most recent consensus commit. 0 when the P-COOL flow is disabled.",
764 registry,
765 ).unwrap(),
766 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
767 "consensus_handler_max_congestion_control_object_costs",
768 "Max object costs for congestion control in the current consensus commit",
769 &["commit_type"],
770 registry,
771 ).unwrap(),
772 consensus_committed_subdags: register_int_counter_vec_with_registry!(
773 "consensus_committed_subdags",
774 "Number of committed subdags, sliced by leader",
775 &["authority"],
776 registry,
777 ).unwrap(),
778 consensus_committed_messages: register_int_gauge_vec_with_registry!(
779 "consensus_committed_messages",
780 "Total number of committed consensus messages, sliced by author",
781 &["authority"],
782 registry,
783 ).unwrap(),
784 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
785 "consensus_committed_user_transactions",
786 "Number of committed user transactions, sliced by submitter",
787 &["authority"],
788 registry,
789 ).unwrap(),
790 consensus_handler_leader_round: register_int_gauge_with_registry!(
791 "consensus_handler_leader_round",
792 "The leader round of the current consensus output being processed in the consensus handler",
793 registry,
794 ).unwrap(),
795 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
796 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
797 multisig_sig_count: register_int_counter_with_registry!(
798 "multisig_sig_count",
799 "Count of multisig signatures",
800 registry,
801 )
802 .unwrap(),
803 consensus_calculated_throughput: register_int_gauge_with_registry!(
804 "consensus_calculated_throughput",
805 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
806 registry,
807 ).unwrap(),
808 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
809 "consensus_calculated_throughput_profile",
810 "The current active calculated throughput profile",
811 registry
812 ).unwrap(),
813 execution_queueing_latency: LatencyObserver::new(),
814 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
815 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
816 }
817 }
818
819 pub fn reset_on_reconfigure(&self) {
823 self.consensus_committed_messages.reset();
824 self.consensus_handler_scores.reset();
825 self.validator_scoreboard_scores.reset();
826 self.invalid_misbehavior_reports_by_authority.reset();
827 self.consensus_committed_user_transactions.reset();
828 }
829}
830
831pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
838
839pub struct AuthorityState {
840 pub name: AuthorityName,
843 pub secret: StableSyncAuthoritySigner,
845
846 input_loader: TransactionInputLoader,
848 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
849
850 epoch_store: ArcSwap<AuthorityPerEpochStore>,
851
852 execution_lock: RwLock<EpochId>,
858
859 pub indexes: Option<Arc<IndexStore>>,
860 pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
861
862 pub subscription_handler: Arc<SubscriptionHandler>,
863 pub checkpoint_store: Arc<CheckpointStore>,
864
865 committee_store: Arc<CommitteeStore>,
866
867 transaction_manager: Arc<TransactionManager>,
869
870 #[cfg_attr(not(test), expect(unused))]
872 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
873
874 pub metrics: Arc<AuthorityMetrics>,
875 _pruner: AuthorityStorePruner,
876 authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
877 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
878
879 db_checkpoint_config: DBCheckpointConfig,
881
882 pub config: NodeConfig,
883
884 pub overload_info: AuthorityOverloadInfo,
886
887 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
888
889 chain_identifier: ChainIdentifier,
892
893 pub(crate) congestion_tracker: Arc<CongestionTracker>,
894
895 pub traffic_controller: Option<Arc<TrafficController>>,
897}
898
899impl AuthorityState {
908 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
909 epoch_store.committee().authority_exists(&self.name)
910 }
911
912 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
913 epoch_store
914 .active_validators()
915 .iter()
916 .any(|a| AuthorityName::from(a) == self.name)
917 }
918
919 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
920 !self.is_committee_validator(epoch_store)
921 }
922
923 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
924 &self.committee_store
925 }
926
927 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
928 self.committee_store.clone()
929 }
930
931 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
932 &self.config.authority_overload_config
933 }
934
935 pub fn get_epoch_state_commitments(
936 &self,
937 epoch: EpochId,
938 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
939 self.checkpoint_store.get_epoch_state_commitments(epoch)
940 }
941
942 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
946 pub(crate) async fn handle_transaction_validation_checks(
947 &self,
948 transaction: &VerifiedTransaction,
949 epoch_store: &Arc<AuthorityPerEpochStore>,
950 ) -> IotaResult<Vec<ObjectRef>> {
951 let protocol_config = epoch_store.protocol_config();
952 let reference_gas_price = epoch_store.reference_gas_price();
953
954 let epoch = epoch_store.epoch();
955
956 let tx_data = transaction.data().transaction_data();
957
958 iota_transaction_checks::deny::check_transaction_for_validation(
962 tx_data,
963 transaction.tx_signatures(),
964 &transaction.input_objects()?,
965 &tx_data.receiving_objects(),
966 &self.config.transaction_deny_config,
967 self.get_backing_package_store().as_ref(),
968 )?;
969
970 let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
975 self.read_objects_for_validation(transaction, epoch)?;
976
977 let move_authenticators = transaction.move_authenticators();
978
979 let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
985 .check_transaction_inputs_for_validation(
986 protocol_config,
987 reference_gas_price,
988 tx_data,
989 tx_input_objects,
990 &tx_receiving_objects,
991 &move_authenticators,
992 per_authenticator_inputs,
993 )?;
994
995 let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
998 .iter()
999 .map(|i| &i.0)
1000 .collect();
1001
1002 check_coin_deny_list_v1(
1006 tx_data.sender(),
1007 &tx_checked_input_objects,
1008 &tx_receiving_objects,
1009 &per_authenticator_checked_input_objects,
1010 &self.get_object_store(),
1011 )?;
1012
1013 let (kind, signer, gas_data) = tx_data.execution_parts();
1014
1015 let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
1016 extract_auth_fun_refs(signer, gas_data.owner, |address| {
1017 move_authenticators
1018 .iter()
1019 .zip(per_authenticator_checked_inputs.iter())
1020 .find(|(move_authenticator, _)| move_authenticator.address() == address)
1021 .map(|(_, (_, auth_fun_ref))| auth_fun_ref.clone())
1022 });
1023
1024 let pre_consensus_move_authenticators =
1029 pre_consensus_move_authenticators(transaction, protocol_config);
1030 let (move_authenticators, per_authenticator_checked_inputs): (Vec<_>, Vec<_>) =
1031 move_authenticators
1032 .into_iter()
1033 .zip(per_authenticator_checked_inputs)
1034 .filter(|(a, _)| pre_consensus_move_authenticators.contains(a))
1035 .unzip();
1036 let per_authenticator_checked_input_objects: Vec<_> = per_authenticator_checked_inputs
1037 .iter()
1038 .map(|i| &i.0)
1039 .collect();
1040
1041 if !move_authenticators.is_empty() {
1044 let aggregated_authenticator_input_objects =
1045 iota_transaction_checks::aggregate_authenticator_input_objects(
1046 &per_authenticator_checked_input_objects,
1047 )?;
1048
1049 debug_assert_eq!(
1050 move_authenticators.len(),
1051 per_authenticator_checked_inputs.len(),
1052 "Move authenticators amount must match the number of checked authenticator inputs"
1053 );
1054
1055 let move_authenticators = move_authenticators
1056 .into_iter()
1057 .zip(per_authenticator_checked_inputs)
1058 .map(
1059 |(
1060 move_authenticator,
1061 (authenticator_checked_input_objects, authenticator_function_ref),
1062 )| {
1063 (
1064 move_authenticator.to_owned(),
1065 authenticator_function_ref,
1066 authenticator_checked_input_objects,
1067 )
1068 },
1069 )
1070 .collect();
1071
1072 let tx_data_bytes =
1077 bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
1078
1079 let (sender_auth_digest, sponsor_auth_digest) =
1080 transaction.data().compute_auth_digests()?;
1081
1082 let auth_context_data = AuthContextData {
1083 transaction_data_bytes: tx_data_bytes,
1084 sender_auth_digest,
1085 sponsor_auth_digest,
1086 sender_authenticator_function_ref,
1087 sponsor_authenticator_function_ref,
1088 };
1089
1090 let validation_result = epoch_store.executor().authenticate_transaction(
1092 self.get_backing_store().as_ref(),
1093 protocol_config,
1094 self.metrics.limits_metrics.clone(),
1095 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1096 epoch_store
1097 .epoch_start_config()
1098 .epoch_data()
1099 .epoch_start_timestamp(),
1100 gas_data,
1101 gas_status,
1102 move_authenticators,
1103 aggregated_authenticator_input_objects,
1104 kind,
1105 signer,
1106 transaction.digest().to_owned(),
1107 auth_context_data,
1108 &mut None,
1109 );
1110
1111 if let Err(validation_error) = validation_result {
1112 return Err(IotaError::MoveAuthenticatorExecutionFailure {
1113 error: validation_error.to_string(),
1114 });
1115 }
1116 }
1117
1118 Ok(tx_checked_input_objects.inner().filter_owned_objects())
1119 }
1120
1121 async fn handle_transaction_impl(
1125 &self,
1126 transaction: VerifiedTransaction,
1127 epoch_store: &Arc<AuthorityPerEpochStore>,
1128 ) -> IotaResult<VerifiedSignedTransaction> {
1129 let _execution_lock = self.execution_lock_for_signing()?;
1131
1132 let owned_objects = self
1133 .handle_transaction_validation_checks(&transaction, epoch_store)
1134 .await?;
1135
1136 let epoch = epoch_store.epoch();
1137 let signed_transaction =
1138 VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1139
1140 self.get_cache_writer().try_acquire_transaction_locks(
1145 epoch_store,
1146 &owned_objects,
1147 signed_transaction.clone(),
1148 )?;
1149
1150 Ok(signed_transaction)
1151 }
1152
1153 #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()
1155 ))]
1156 pub async fn handle_transaction(
1157 &self,
1158 epoch_store: &Arc<AuthorityPerEpochStore>,
1159 transaction: VerifiedTransaction,
1160 ) -> IotaResult<HandleTransactionResponse> {
1161 let tx_digest = *transaction.digest();
1162 debug!("handle_transaction");
1163
1164 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1166 return Ok(HandleTransactionResponse { status });
1167 }
1168
1169 let _metrics_guard = self
1170 .metrics
1171 .authority_state_handle_transaction_latency
1172 .start_timer();
1173 self.metrics.tx_orders.inc();
1174
1175 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1176 match signed {
1177 Ok(s) => {
1178 if self.is_committee_validator(epoch_store) {
1179 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1180 let tx = s.clone();
1181 let validator_tx_finalizer = validator_tx_finalizer.clone();
1182 let cache_reader = self.get_transaction_cache_reader().clone();
1183 let epoch_store = epoch_store.clone();
1184 spawn_monitored_task!(epoch_store.within_alive_epoch(
1185 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1186 ));
1187 }
1188 }
1189 Ok(HandleTransactionResponse {
1190 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1191 })
1192 }
1193 Err(err) => Ok(HandleTransactionResponse {
1197 status: self
1198 .get_transaction_status(&tx_digest, epoch_store)?
1199 .ok_or(err)?
1200 .1,
1201 }),
1202 }
1203 }
1204
1205 pub fn check_system_overload_at_signing(&self) -> bool {
1206 self.config
1207 .authority_overload_config
1208 .check_system_overload_at_signing
1209 }
1210
1211 pub fn check_system_overload_at_execution(&self) -> bool {
1212 self.config
1213 .authority_overload_config
1214 .check_system_overload_at_execution
1215 }
1216
1217 pub(crate) fn check_system_overload(
1227 &self,
1228 consensus_adapter: &Arc<ConsensusAdapter>,
1229 tx_data: &SenderSignedData,
1230 do_authority_overload_check: bool,
1231 pcool_flow_enabled: bool,
1232 ) -> IotaResult {
1233 if pcool_flow_enabled {
1234 self.check_consensus_queue_graduated_limits(consensus_adapter, tx_data)
1237 .tap_err(|_| {
1238 self.update_overload_metrics("consensus");
1239 })?;
1240
1241 consensus_adapter.check_consensus_overload().tap_err(|_| {
1248 self.update_overload_metrics("consensus");
1249 })?;
1250 } else {
1251 if do_authority_overload_check {
1252 self.check_authority_overload(tx_data).tap_err(|_| {
1253 self.update_overload_metrics("execution_queue");
1254 })?;
1255 }
1256 self.transaction_manager
1257 .check_execution_overload(self.overload_config(), tx_data)
1258 .tap_err(|_| {
1259 self.update_overload_metrics("execution_pending");
1260 })?;
1261 consensus_adapter.check_consensus_overload().tap_err(|_| {
1262 self.update_overload_metrics("consensus");
1263 })?;
1264
1265 let pending_tx_count = self
1266 .get_cache_commit()
1267 .approximate_pending_transaction_count();
1268 if pending_tx_count
1269 > self
1270 .config
1271 .execution_cache_config
1272 .writeback_cache
1273 .backpressure_threshold_for_rpc()
1274 {
1275 return Err(IotaError::ValidatorOverloadedRetryAfter {
1276 retry_after_secs: 10,
1277 });
1278 }
1279 }
1280
1281 Ok(())
1282 }
1283
1284 fn check_consensus_queue_graduated_limits(
1292 &self,
1293 consensus_adapter: &Arc<ConsensusAdapter>,
1294 tx_data: &SenderSignedData,
1295 ) -> IotaResult {
1296 let num_inflight_txs = consensus_adapter.num_inflight_transactions() as usize;
1297
1298 let shedding_pct = compute_graduated_load_shedding_percentage(
1299 num_inflight_txs,
1300 consensus_adapter.max_pending_transactions(),
1301 consensus_adapter.graduated_load_shedding_soft_limit_pct(),
1302 );
1303
1304 self.metrics
1305 .consensus_queue_load_shedding_percentage
1306 .set(shedding_pct as i64);
1307
1308 if shedding_pct == 0 {
1309 return Ok(());
1310 }
1311
1312 if shedding_pct >= 100 {
1317 return Err(IotaError::TooManyTransactionsPendingConsensus);
1318 }
1319
1320 overload_monitor_accept_tx(shedding_pct, tx_data.digest())
1321 }
1322
1323 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1324 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1325 return Ok(());
1326 }
1327
1328 let load_shedding_percentage = self
1329 .overload_info
1330 .local_load_shedding_percentage
1331 .load(Ordering::Relaxed);
1332 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1333 }
1334
1335 fn update_overload_metrics(&self, source: &str) {
1336 self.metrics
1337 .transaction_overload_sources
1338 .with_label_values(&[source])
1339 .inc();
1340 }
1341
1342 #[instrument(level = "trace", skip_all)]
1347 pub async fn wait_for_certificate_execution(
1348 &self,
1349 certificate: &VerifiedCertificate,
1350 epoch_store: &Arc<AuthorityPerEpochStore>,
1351 ) -> IotaResult<TransactionEffects> {
1352 let _metrics_guard = if certificate.contains_shared_object() {
1353 self.metrics
1354 .execute_certificate_latency_shared_object
1355 .start_timer()
1356 } else {
1357 self.metrics
1358 .execute_certificate_latency_single_writer
1359 .start_timer()
1360 };
1361 trace!("wait_for_certificate_execution");
1362
1363 self.metrics.total_cert_attempts.inc();
1364
1365 if !certificate.contains_shared_object() {
1366 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1371 }
1372
1373 epoch_store
1376 .within_alive_epoch(self.notify_read_effects(certificate))
1377 .await
1378 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1379 .and_then(|r| r)
1380 }
1381
1382 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1397 pub fn try_execute_immediately(
1398 &self,
1399 transaction: &VerifiedExecutableTransaction,
1400 expected_effects_digest: Option<TransactionEffectsDigest>,
1401 epoch_store: &Arc<AuthorityPerEpochStore>,
1402 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1403 let _scope = monitored_scope("Execution::try_execute_immediately");
1404 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1405
1406 let tx_digest = transaction.digest();
1407
1408 let tx_guard = epoch_store.acquire_tx_guard(transaction)?;
1410
1411 if let Some(effects) = self
1414 .get_transaction_cache_reader()
1415 .try_get_executed_effects(tx_digest)?
1416 {
1417 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1418 assert_eq!(
1419 effects.digest(),
1420 expected_effects_digest_inner,
1421 "Unexpected effects digest for transaction {tx_digest}"
1422 );
1423 }
1424 tx_guard.release();
1425 return Ok((effects, None));
1426 }
1427
1428 let (tx_input_objects, per_authenticator_inputs) =
1429 self.read_objects_for_execution(tx_guard.as_lock_guard(), transaction, epoch_store)?;
1430
1431 let expected_effects_digest =
1437 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1438
1439 self.process_transaction(
1440 tx_guard,
1441 transaction,
1442 tx_input_objects,
1443 per_authenticator_inputs,
1444 expected_effects_digest,
1445 epoch_store,
1446 )
1447 .tap_err(|e| info!(?tx_digest, "process_transaction failed: {e}"))
1448 .tap_ok(
1449 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_transaction succeeded"),
1450 )
1451 }
1452
1453 pub fn read_objects_for_execution(
1454 &self,
1455 tx_lock: &TxLockGuard,
1456 transaction: &VerifiedExecutableTransaction,
1457 epoch_store: &Arc<AuthorityPerEpochStore>,
1458 ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1459 let _scope = monitored_scope("Execution::load_input_objects");
1460 let _metrics_guard = self
1461 .metrics
1462 .execution_load_input_objects_latency
1463 .start_timer();
1464
1465 let input_objects = transaction.collect_all_input_object_kind_for_reading()?;
1466
1467 let input_objects = self.input_loader.read_objects_for_execution(
1468 epoch_store,
1469 &transaction.key(),
1470 tx_lock,
1471 &input_objects,
1472 epoch_store.epoch(),
1473 )?;
1474
1475 transaction.split_input_objects_into_groups_for_reading(input_objects)
1476 }
1477
1478 pub fn try_execute_for_test(
1482 &self,
1483 certificate: &VerifiedCertificate,
1484 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1485 let epoch_store = self.epoch_store_for_testing();
1486 let (effects, execution_error_opt) = self.try_execute_immediately(
1487 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1488 None,
1489 &epoch_store,
1490 )?;
1491 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1492 Ok((signed_effects, execution_error_opt))
1493 }
1494
1495 pub fn execute_for_test(
1497 &self,
1498 certificate: &VerifiedCertificate,
1499 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1500 self.try_execute_for_test(certificate)
1501 .expect("try_execute_for_test should not fail")
1502 }
1503
1504 pub async fn notify_read_effects(
1505 &self,
1506 certificate: &VerifiedCertificate,
1507 ) -> IotaResult<TransactionEffects> {
1508 self.get_transaction_cache_reader()
1509 .try_notify_read_executed_effects(&[*certificate.digest()])
1510 .await
1511 .map(|mut r| r.pop().expect("must return correct number of effects"))
1512 }
1513
1514 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1515 self.get_object_cache_reader()
1516 .try_check_owned_objects_are_live(owned_object_refs)
1517 }
1518
1519 pub(crate) fn debug_dump_transaction_state(
1524 &self,
1525 tx_digest: &TransactionDigest,
1526 effects: &TransactionEffects,
1527 expected_effects_digest: TransactionEffectsDigest,
1528 inner_temporary_store: &InnerTemporaryStore,
1529 transaction: &VerifiedExecutableTransaction,
1530 debug_dump_config: &StateDebugDumpConfig,
1531 ) -> IotaResult<PathBuf> {
1532 let dump_dir = debug_dump_config
1535 .dump_file_directory
1536 .as_ref()
1537 .cloned()
1538 .unwrap_or(std::env::temp_dir());
1539 let epoch_store = self.load_epoch_store_one_call_per_task();
1540
1541 NodeStateDump::new(
1542 tx_digest,
1543 effects,
1544 expected_effects_digest,
1545 self.get_object_store().as_ref(),
1546 &epoch_store,
1547 inner_temporary_store,
1548 transaction,
1549 )?
1550 .write_to_file(&dump_dir)
1551 .map_err(|e| IotaError::FileIO(e.to_string()))
1552 }
1553
1554 #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = ?transaction.data().transaction_data().gas_owner().to_string()))]
1555 pub(crate) fn process_transaction(
1556 &self,
1557 tx_guard: TxGuard,
1558 transaction: &VerifiedExecutableTransaction,
1559 tx_input_objects: InputObjects,
1560 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1561 expected_effects_digest: Option<TransactionEffectsDigest>,
1562 epoch_store: &Arc<AuthorityPerEpochStore>,
1563 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1564 let process_transaction_start_time = tokio::time::Instant::now();
1565 let digest = *transaction.digest();
1566
1567 let _scope = monitored_scope("Execution::process_certificate");
1568
1569 fail_point_if!("correlated-crash-process-transaction", || {
1570 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1571 iota_simulator::task::kill_current_node(None);
1572 }
1573 });
1574
1575 let execution_guard = self.execution_lock_for_executable_transaction(transaction);
1576 let execution_guard = match execution_guard {
1582 Ok(execution_guard) => execution_guard,
1583 Err(err) => {
1584 tx_guard.release();
1585 return Err(err);
1586 }
1587 };
1588 if *execution_guard != epoch_store.epoch() {
1592 tx_guard.release();
1593 info!("The epoch of the execution_guard doesn't match the epoch store");
1594 return Err(IotaError::WrongEpoch {
1595 expected_epoch: epoch_store.epoch(),
1596 actual_epoch: *execution_guard,
1597 });
1598 }
1599
1600 let (inner_temporary_store, effects, execution_error_opt) = match self.execute_transaction(
1606 &execution_guard,
1607 transaction,
1608 tx_input_objects,
1609 per_authenticator_inputs,
1610 epoch_store,
1611 ) {
1612 Err(e) => {
1613 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1614 tx_guard.release();
1615 return Err(e);
1616 }
1617 Ok(res) => res,
1618 };
1619
1620 if let Some(expected_effects_digest) = expected_effects_digest {
1621 if effects.digest() != expected_effects_digest {
1622 match self.debug_dump_transaction_state(
1624 &digest,
1625 &effects,
1626 expected_effects_digest,
1627 &inner_temporary_store,
1628 transaction,
1629 &self.config.state_debug_dump_config,
1630 ) {
1631 Ok(out_path) => {
1632 info!(
1633 "Dumped node state for transaction {} to {}",
1634 digest,
1635 out_path.as_path().display().to_string()
1636 );
1637 }
1638 Err(e) => {
1639 error!("Error dumping state for transaction {}: {e}", digest);
1640 }
1641 }
1642 error!(
1643 tx_digest = ?digest,
1644 ?expected_effects_digest,
1645 actual_effects = ?effects,
1646 "fork detected!"
1647 );
1648 panic!(
1649 "Transaction {} is expected to have effects digest {}, but got {}!",
1650 digest,
1651 expected_effects_digest,
1652 effects.digest(),
1653 );
1654 }
1655 }
1656
1657 fail_point!("crash");
1658
1659 self.commit_transaction(
1660 transaction,
1661 inner_temporary_store,
1662 &effects,
1663 tx_guard,
1664 execution_guard,
1665 epoch_store,
1666 )?;
1667
1668 let elapsed = process_transaction_start_time.elapsed().as_micros() as f64;
1669 if elapsed > 0.0 {
1670 self.metrics
1671 .execution_gas_latency_ratio
1672 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1673 };
1674 Ok((effects, execution_error_opt))
1675 }
1676
1677 pub async fn reconfigure_traffic_control(
1678 &self,
1679 params: TrafficControlReconfigParams,
1680 ) -> Result<TrafficControlReconfigParams, IotaError> {
1681 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1682 traffic_controller.admin_reconfigure(params).await
1683 } else {
1684 Err(IotaError::InvalidAdminRequest(
1685 "Traffic controller is not configured on this node".to_string(),
1686 ))
1687 }
1688 }
1689
1690 #[instrument(level = "trace", skip_all)]
1691 fn commit_transaction(
1692 &self,
1693 transaction: &VerifiedExecutableTransaction,
1694 inner_temporary_store: InnerTemporaryStore,
1695 effects: &TransactionEffects,
1696 tx_guard: TxGuard,
1697 _execution_guard: ExecutionLockReadGuard<'_>,
1698 epoch_store: &Arc<AuthorityPerEpochStore>,
1699 ) -> IotaResult {
1700 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1701 monitored_scope("Execution::commit_certificate");
1702 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1703
1704 let tx_key = transaction.key();
1705 let tx_digest = transaction.digest();
1706 let input_object_count = inner_temporary_store.input_objects.len();
1707 let shared_object_count = effects.input_shared_objects().len();
1708
1709 let output_keys = inner_temporary_store.get_output_keys(effects);
1710
1711 let _ = self
1713 .post_process_one_tx(transaction, effects, &inner_temporary_store, epoch_store)
1714 .tap_err(|e| {
1715 self.metrics.post_processing_total_failures.inc();
1716 error!(?tx_digest, "tx post processing failed: {e}");
1717 });
1718
1719 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1723
1724 fail_point!("crash");
1726
1727 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1728 transaction.clone().into_unsigned(),
1729 effects.clone(),
1730 inner_temporary_store,
1731 );
1732 self.get_cache_writer()
1733 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1734
1735 if transaction.transaction_data().is_end_of_epoch_tx() {
1736 self.get_object_cache_reader()
1739 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1740 }
1741
1742 tx_guard.commit_tx();
1744
1745 self.transaction_manager
1749 .notify_commit(tx_digest, output_keys, epoch_store);
1750
1751 self.update_metrics(transaction, input_object_count, shared_object_count);
1752
1753 Ok(())
1754 }
1755
1756 fn update_metrics(
1757 &self,
1758 transaction: &VerifiedExecutableTransaction,
1759 input_object_count: usize,
1760 shared_object_count: usize,
1761 ) {
1762 if transaction.has_upgraded_multisig() {
1764 self.metrics.multisig_sig_count.inc();
1765 }
1766
1767 self.metrics.total_effects.inc();
1768 self.metrics.total_certs.inc();
1769
1770 if shared_object_count > 0 {
1771 self.metrics.shared_obj_tx.inc();
1772 }
1773
1774 if transaction.is_sponsored_tx() {
1775 self.metrics.sponsored_tx.inc();
1776 }
1777
1778 self.metrics
1779 .num_input_objs
1780 .observe(input_object_count as f64);
1781 self.metrics
1782 .num_shared_objects
1783 .observe(shared_object_count as f64);
1784 self.metrics.batch_size.observe(
1785 transaction
1786 .data()
1787 .intent_message()
1788 .value
1789 .kind()
1790 .num_commands() as f64,
1791 );
1792 }
1793
1794 #[instrument(level = "trace", skip_all)]
1806 fn execute_transaction(
1807 &self,
1808 _execution_guard: &ExecutionLockReadGuard<'_>,
1809 transaction: &VerifiedExecutableTransaction,
1810 tx_input_objects: InputObjects,
1811 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1812 epoch_store: &Arc<AuthorityPerEpochStore>,
1813 ) -> IotaResult<(
1814 InnerTemporaryStore,
1815 TransactionEffects,
1816 Option<ExecutionError>,
1817 )> {
1818 let _scope = monitored_scope("Execution::execute_certificate");
1819 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1820 let prepare_transaction_start_time = tokio::time::Instant::now();
1821
1822 let protocol_config = epoch_store.protocol_config();
1823
1824 let reference_gas_price = epoch_store.reference_gas_price();
1825
1826 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1827 let epoch_start_timestamp = epoch_store
1828 .epoch_start_config()
1829 .epoch_data()
1830 .epoch_start_timestamp();
1831
1832 let backing_store = self.get_backing_store().as_ref();
1833
1834 let tx_digest = *transaction.digest();
1835
1836 let tx_data = transaction.data().transaction_data();
1839 tx_data.validity_check(protocol_config)?;
1840
1841 let (kind, signer, gas_data) = tx_data.execution_parts();
1842
1843 let move_authenticators = transaction.move_authenticators();
1844
1845 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1846 let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1847 .is_empty()
1848 {
1849 let (tx_gas_status, tx_checked_input_objects) =
1854 iota_transaction_checks::check_certificate_input(
1855 transaction,
1856 tx_input_objects,
1857 protocol_config,
1858 reference_gas_price,
1859 )?;
1860
1861 let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1862 self.check_owned_locks(&owned_object_refs)?;
1863 epoch_store.executor().execute_transaction_to_effects(
1864 backing_store,
1865 protocol_config,
1866 self.metrics.limits_metrics.clone(),
1867 self.config
1870 .expensive_safety_check_config
1871 .enable_deep_per_tx_iota_conservation_check(),
1872 self.config.certificate_deny_config.certificate_deny_set(),
1873 &epoch_id,
1874 epoch_start_timestamp,
1875 tx_checked_input_objects,
1876 gas_data,
1877 tx_gas_status,
1878 kind,
1879 signer,
1880 tx_digest,
1881 &mut None,
1882 )
1883 } else {
1884 debug_assert_eq!(
1890 move_authenticators.len(),
1891 per_authenticator_inputs.len(),
1892 "Move authenticators amount must match the number of authenticator inputs"
1893 );
1894
1895 let per_authenticator_inputs = move_authenticators
1896 .iter()
1897 .zip(per_authenticator_inputs)
1898 .map(
1899 |(move_authenticator, (authenticator_input_objects, account_object))| {
1900 let (
1903 auth_account_object_id,
1904 auth_account_object_seq_number,
1905 auth_account_object_digest,
1906 ) = move_authenticator.object_to_authenticate_components()?;
1907
1908 let signer = move_authenticator.address();
1909
1910 let authenticator_function_ref_for_execution = self.check_move_account(
1911 auth_account_object_id,
1912 auth_account_object_seq_number,
1913 auth_account_object_digest,
1914 account_object,
1915 &signer,
1916 )?;
1917
1918 Ok((
1919 authenticator_input_objects,
1920 authenticator_function_ref_for_execution,
1921 ))
1922 },
1923 )
1924 .collect::<IotaResult<Vec<_>>>()?;
1925
1926 let per_authenticator_input_objects = per_authenticator_inputs
1927 .iter()
1928 .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1929 .collect::<Vec<_>>();
1930
1931 let tx_data_bytes =
1933 bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1934
1935 let (sender_auth_digest, sponsor_auth_digest) =
1936 transaction.data().compute_auth_digests()?;
1937
1938 let authenticator_gas_budget = protocol_config.max_auth_gas();
1943 let (
1944 gas_status,
1945 per_authenticator_checked_input_objects,
1946 authenticator_and_tx_checked_input_objects,
1947 ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1948 transaction,
1949 tx_input_objects,
1950 per_authenticator_input_objects,
1951 authenticator_gas_budget,
1952 protocol_config,
1953 reference_gas_price,
1954 )?;
1955
1956 debug_assert_eq!(
1957 move_authenticators.len(),
1958 per_authenticator_checked_input_objects.len(),
1959 "Move authenticators amount must match the number of checked authenticator inputs"
1960 );
1961
1962 let move_authenticators = move_authenticators
1963 .into_iter()
1964 .zip(per_authenticator_inputs)
1965 .zip(per_authenticator_checked_input_objects)
1966 .map(
1967 |(
1968 (move_authenticator, (_, authenticator_function_ref_for_execution)),
1969 authenticator_checked_input_objects,
1970 )| {
1971 (
1972 move_authenticator.to_owned(),
1973 authenticator_function_ref_for_execution,
1974 authenticator_checked_input_objects,
1975 )
1976 },
1977 )
1978 .collect::<Vec<_>>();
1979
1980 let owned_object_refs = authenticator_and_tx_checked_input_objects
1981 .inner()
1982 .filter_owned_objects();
1983 self.check_owned_locks(&owned_object_refs)?;
1984
1985 let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
1986 extract_auth_fun_refs(signer, gas_data.owner, |address| {
1987 move_authenticators
1988 .iter()
1989 .find(|t| t.0.address() == address)
1990 .map(|t| t.1.authenticator_function_ref.clone())
1991 });
1992
1993 let auth_context_data = AuthContextData {
1994 transaction_data_bytes: tx_data_bytes,
1995 sender_auth_digest,
1996 sponsor_auth_digest,
1997 sender_authenticator_function_ref,
1998 sponsor_authenticator_function_ref,
1999 };
2000
2001 epoch_store
2002 .executor()
2003 .authenticate_then_execute_transaction_to_effects(
2004 backing_store,
2005 protocol_config,
2006 self.metrics.limits_metrics.clone(),
2007 self.config
2008 .expensive_safety_check_config
2009 .enable_deep_per_tx_iota_conservation_check(),
2010 self.config.certificate_deny_config.certificate_deny_set(),
2011 &epoch_id,
2012 epoch_start_timestamp,
2013 gas_data,
2014 gas_status,
2015 move_authenticators,
2016 authenticator_and_tx_checked_input_objects,
2017 kind,
2018 signer,
2019 tx_digest,
2020 auth_context_data,
2021 &mut None,
2022 )
2023 };
2024
2025 fail_point_if!("cp_execution_nondeterminism", || {
2026 #[cfg(msim)]
2027 self.create_fail_state(transaction, epoch_store, &mut effects);
2028 });
2029
2030 let elapsed = prepare_transaction_start_time.elapsed().as_micros() as f64;
2031 if elapsed > 0.0 {
2032 self.metrics
2033 .prepare_cert_gas_latency_ratio
2034 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
2035 }
2036
2037 Ok((inner_temp_store, effects, execution_error_opt.err()))
2038 }
2039
2040 pub fn prepare_transaction_for_benchmark(
2041 &self,
2042 transaction: &VerifiedExecutableTransaction,
2043 input_objects: InputObjects,
2044 epoch_store: &Arc<AuthorityPerEpochStore>,
2045 ) -> IotaResult<(
2046 InnerTemporaryStore,
2047 TransactionEffects,
2048 Option<ExecutionError>,
2049 )> {
2050 let lock = RwLock::new(epoch_store.epoch());
2051 let execution_guard = lock.try_read().unwrap();
2052
2053 self.execute_transaction(
2054 &execution_guard,
2055 transaction,
2056 input_objects,
2057 vec![],
2058 epoch_store,
2059 )
2060 }
2061
2062 #[instrument("dry_exec_tx", level = "trace", skip_all)]
2065 #[allow(clippy::type_complexity)]
2066 pub fn dry_exec_transaction(
2067 &self,
2068 transaction: TransactionData,
2069 transaction_digest: TransactionDigest,
2070 ) -> IotaResult<(
2071 DryRunTransactionBlockResponse,
2072 BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
2073 TransactionEffects,
2074 Option<ObjectId>,
2075 )> {
2076 let epoch_store = self.load_epoch_store_one_call_per_task();
2077 if !self.is_fullnode(&epoch_store) {
2078 return Err(IotaError::UnsupportedFeature {
2079 error: "dry-exec is only supported on fullnodes".to_string(),
2080 });
2081 }
2082
2083 if transaction.kind().is_system() {
2084 return Err(IotaError::UnsupportedFeature {
2085 error: "dry-exec does not support system transactions".to_string(),
2086 });
2087 }
2088
2089 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2090 }
2091
2092 #[allow(clippy::type_complexity)]
2093 pub fn dry_exec_transaction_for_benchmark(
2094 &self,
2095 transaction: TransactionData,
2096 transaction_digest: TransactionDigest,
2097 ) -> IotaResult<(
2098 DryRunTransactionBlockResponse,
2099 BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
2100 TransactionEffects,
2101 Option<ObjectId>,
2102 )> {
2103 let epoch_store = self.load_epoch_store_one_call_per_task();
2104 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2105 }
2106
2107 #[instrument(level = "trace", skip_all)]
2108 #[allow(clippy::type_complexity)]
2109 fn dry_exec_transaction_impl(
2110 &self,
2111 epoch_store: &AuthorityPerEpochStore,
2112 transaction: TransactionData,
2113 transaction_digest: TransactionDigest,
2114 ) -> IotaResult<(
2115 DryRunTransactionBlockResponse,
2116 BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
2117 TransactionEffects,
2118 Option<ObjectId>,
2119 )> {
2120 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2122
2123 let input_object_kinds = transaction.input_objects()?;
2124 let receiving_object_refs = transaction.receiving_objects();
2125
2126 iota_transaction_checks::deny::check_transaction_for_validation(
2127 &transaction,
2128 &[],
2129 &input_object_kinds,
2130 &receiving_object_refs,
2131 &self.config.transaction_deny_config,
2132 self.get_backing_package_store().as_ref(),
2133 )?;
2134
2135 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2136 None,
2138 &input_object_kinds,
2139 &receiving_object_refs,
2140 epoch_store.epoch(),
2141 )?;
2142
2143 let mut transaction = transaction;
2145 let reference_gas_price = epoch_store.reference_gas_price();
2146 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2147 let sender = transaction.gas_owner();
2148 let gas_object_id = ObjectId::random();
2149 let gas_object = Object::new_move(
2150 MoveObject::new_gas_coin(
2151 OBJECT_START_VERSION,
2152 gas_object_id,
2153 SIMULATION_GAS_COIN_VALUE,
2154 ),
2155 Owner::Address(sender),
2156 TransactionDigest::GENESIS_MARKER,
2157 );
2158 let gas_object_ref = gas_object.object_ref();
2159 transaction.gas_data_mut().objects = vec![gas_object_ref];
2161 (
2162 iota_transaction_checks::check_transaction_input_with_given_gas(
2163 epoch_store.protocol_config(),
2164 reference_gas_price,
2165 &transaction,
2166 input_objects,
2167 receiving_objects,
2168 gas_object,
2169 &self.metrics.bytecode_verifier_metrics,
2170 &self.config.verifier_signing_config,
2171 )?,
2172 Some(gas_object_id),
2173 )
2174 } else {
2175 let authenticator_gas_budget = 0;
2178
2179 (
2180 iota_transaction_checks::check_transaction_input(
2181 epoch_store.protocol_config(),
2182 reference_gas_price,
2183 &transaction,
2184 input_objects,
2185 &receiving_objects,
2186 &self.metrics.bytecode_verifier_metrics,
2187 &self.config.verifier_signing_config,
2188 authenticator_gas_budget,
2189 )?,
2190 None,
2191 )
2192 };
2193
2194 let protocol_config = epoch_store.protocol_config();
2195 let (kind, signer, gas_data) = transaction.execution_parts();
2196
2197 let silent = true;
2198 let executor = iota_execution::executor(protocol_config, silent, None)
2199 .expect("Creating an executor should not fail here");
2200
2201 let expensive_checks = false;
2202 let (inner_temp_store, _, effects, execution_error) = executor
2203 .execute_transaction_to_effects(
2204 self.get_backing_store().as_ref(),
2205 protocol_config,
2206 self.metrics.limits_metrics.clone(),
2207 expensive_checks,
2208 self.config.certificate_deny_config.certificate_deny_set(),
2209 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2210 epoch_store
2211 .epoch_start_config()
2212 .epoch_data()
2213 .epoch_start_timestamp(),
2214 checked_input_objects,
2215 gas_data,
2216 gas_status,
2217 kind,
2218 signer,
2219 transaction_digest,
2220 &mut None,
2221 );
2222 let tx_digest = *effects.transaction_digest();
2223
2224 let module_cache =
2225 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2226
2227 let mut layout_resolver =
2228 epoch_store
2229 .executor()
2230 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2231 &inner_temp_store,
2232 self.get_backing_package_store(),
2233 )));
2234 let object_changes = Vec::new();
2236
2237 let balance_changes = Vec::new();
2239
2240 let written_with_kind = effects
2241 .created()
2242 .into_iter()
2243 .map(|(oref, _)| (oref, WriteKind::Create))
2244 .chain(
2245 effects
2246 .unwrapped()
2247 .into_iter()
2248 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2249 )
2250 .chain(
2251 effects
2252 .mutated()
2253 .into_iter()
2254 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2255 )
2256 .map(|(oref, kind)| {
2257 let obj = inner_temp_store.written.get(&oref.object_id).unwrap();
2258 (oref.object_id, (oref, obj.clone(), kind))
2260 })
2261 .collect();
2262
2263 let execution_error_source = execution_error
2264 .as_ref()
2265 .err()
2266 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2267
2268 Ok((
2269 DryRunTransactionBlockResponse {
2270 suggested_gas_price: self
2272 .congestion_tracker
2273 .get_prediction_suggested_gas_price(&transaction),
2274 input: IotaTransactionBlockData::try_from_with_module_cache(
2275 transaction,
2276 &module_cache,
2277 tx_digest,
2278 )
2279 .map_err(|e| IotaError::TransactionSerialization {
2280 error: format!(
2281 "Failed to convert transaction to IotaTransactionBlockData: {e}",
2282 ),
2283 })?, effects: effects.clone().try_into()?,
2285 events: IotaTransactionBlockEvents::try_from(
2286 inner_temp_store.events.clone(),
2287 tx_digest,
2288 None,
2289 layout_resolver.as_mut(),
2290 )?,
2291 object_changes,
2292 balance_changes,
2293 execution_error_source,
2294 },
2295 written_with_kind,
2296 effects,
2297 mock_gas,
2298 ))
2299 }
2300
2301 pub fn simulate_transaction(
2302 &self,
2303 mut transaction: TransactionData,
2304 checks: VmChecks,
2305 ) -> IotaResult<SimulateTransactionResult> {
2306 if transaction.kind().is_system() {
2307 return Err(IotaError::UnsupportedFeature {
2308 error: "simulate does not support system transactions".to_string(),
2309 });
2310 }
2311
2312 let epoch_store = self.load_epoch_store_one_call_per_task();
2313 if !self.is_fullnode(&epoch_store) {
2314 return Err(IotaError::UnsupportedFeature {
2315 error: "simulate is only supported on fullnodes".to_string(),
2316 });
2317 }
2318
2319 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2323
2324 let input_object_kinds = transaction.input_objects()?;
2325 let receiving_object_refs = transaction.receiving_objects();
2326
2327 iota_transaction_checks::deny::check_transaction_for_validation(
2330 &transaction,
2331 &[],
2332 &input_object_kinds,
2333 &receiving_object_refs,
2334 &self.config.transaction_deny_config,
2335 self.get_backing_package_store().as_ref(),
2336 )?;
2337
2338 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2340 None,
2342 &input_object_kinds,
2343 &receiving_object_refs,
2344 epoch_store.epoch(),
2345 )?;
2346
2347 let mock_gas_id = if transaction.gas().is_empty() {
2349 let mock_gas_object = Object::new_move(
2350 MoveObject::new_gas_coin(
2351 OBJECT_START_VERSION,
2352 ObjectId::MAX,
2353 SIMULATION_GAS_COIN_VALUE,
2354 ),
2355 Owner::Address(transaction.gas_data().owner),
2356 TransactionDigest::GENESIS_MARKER,
2357 );
2358 let mock_gas_object_ref = mock_gas_object.object_ref();
2359 transaction.gas_data_mut().objects = vec![mock_gas_object_ref];
2360 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2361 Some(mock_gas_object.id())
2362 } else {
2363 None
2364 };
2365
2366 let protocol_config = epoch_store.protocol_config();
2367
2368 let authenticator_gas_budget = 0;
2371
2372 let (gas_status, checked_input_objects) = if checks.enabled() {
2375 iota_transaction_checks::check_transaction_input(
2376 protocol_config,
2377 epoch_store.reference_gas_price(),
2378 &transaction,
2379 input_objects,
2380 &receiving_objects,
2381 &self.metrics.bytecode_verifier_metrics,
2382 &self.config.verifier_signing_config,
2383 authenticator_gas_budget,
2384 )?
2385 } else {
2386 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2387 protocol_config,
2388 transaction.kind(),
2389 input_objects,
2390 receiving_objects,
2391 )?;
2392 let gas_status = IotaGasStatus::new(
2393 transaction.gas_budget(),
2394 transaction.gas_price(),
2395 epoch_store.reference_gas_price(),
2396 protocol_config,
2397 )?;
2398
2399 (gas_status, checked_input_objects)
2400 };
2401
2402 let executor = iota_execution::executor(
2404 protocol_config,
2405 true, None,
2407 )
2408 .expect("Creating an executor should not fail here");
2409
2410 let (kind, signer, gas_data) = transaction.execution_parts();
2412 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2413 self.get_backing_store().as_ref(),
2414 protocol_config,
2415 self.metrics.limits_metrics.clone(),
2416 false, self.config.certificate_deny_config.certificate_deny_set(),
2418 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2419 epoch_store
2420 .epoch_start_config()
2421 .epoch_data()
2422 .epoch_start_timestamp(),
2423 checked_input_objects,
2424 gas_data,
2425 gas_status,
2426 kind,
2427 signer,
2428 transaction.digest(),
2429 checks.disabled(),
2430 );
2431
2432 Ok(SimulateTransactionResult {
2435 input_objects: inner_temp_store.input_objects,
2436 output_objects: inner_temp_store.written,
2437 events: effects.events_digest().map(|_| inner_temp_store.events),
2438 effects,
2439 execution_result,
2440 suggested_gas_price: self
2441 .congestion_tracker
2442 .get_prediction_suggested_gas_price(&transaction),
2443 mock_gas_id,
2444 })
2445 }
2446
2447 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2452 pub async fn dev_inspect_transaction_block(
2453 &self,
2454 sender: Address,
2455 transaction_kind: TransactionKind,
2456 gas_price: Option<u64>,
2457 gas_budget: Option<u64>,
2458 gas_sponsor: Option<Address>,
2459 gas_objects: Option<Vec<ObjectRef>>,
2460 show_raw_txn_data_and_effects: Option<bool>,
2461 skip_checks: Option<bool>,
2462 ) -> IotaResult<DevInspectResults> {
2463 let epoch_store = self.load_epoch_store_one_call_per_task();
2464
2465 if !self.is_fullnode(&epoch_store) {
2466 return Err(IotaError::UnsupportedFeature {
2467 error: "dev-inspect is only supported on fullnodes".to_string(),
2468 });
2469 }
2470
2471 if transaction_kind.is_system() {
2472 return Err(IotaError::UnsupportedFeature {
2473 error: "system transactions are not supported".to_string(),
2474 });
2475 }
2476
2477 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2478 let skip_checks = skip_checks.unwrap_or(true);
2479 let reference_gas_price = epoch_store.reference_gas_price();
2480 let protocol_config = epoch_store.protocol_config();
2481 let max_tx_gas = protocol_config.max_tx_gas();
2482
2483 let price = gas_price.unwrap_or(reference_gas_price);
2484 let budget = gas_budget.unwrap_or(max_tx_gas);
2485 let owner = gas_sponsor.unwrap_or(sender);
2486 let payment = gas_objects.unwrap_or_default();
2489 let mut transaction = TransactionData::V1(TransactionDataV1 {
2490 kind: transaction_kind.clone(),
2491 sender,
2492 gas_payment: GasData {
2493 objects: payment,
2494 owner,
2495 price,
2496 budget,
2497 },
2498 expiration: TransactionExpiration::None,
2499 });
2500
2501 let raw_txn_data = if show_raw_txn_data_and_effects {
2502 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2503 error: "Failed to serialize transaction during dev inspect".to_string(),
2504 })?
2505 } else {
2506 vec![]
2507 };
2508
2509 transaction.validity_check_no_gas_check(protocol_config)?;
2510
2511 let input_object_kinds = transaction.input_objects()?;
2512 let receiving_object_refs = transaction.receiving_objects();
2513
2514 iota_transaction_checks::deny::check_transaction_for_validation(
2515 &transaction,
2516 &[],
2517 &input_object_kinds,
2518 &receiving_object_refs,
2519 &self.config.transaction_deny_config,
2520 self.get_backing_package_store().as_ref(),
2521 )?;
2522
2523 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2524 None,
2526 &input_object_kinds,
2527 &receiving_object_refs,
2528 epoch_store.epoch(),
2529 )?;
2530
2531 let (gas_status, checked_input_objects) = if skip_checks {
2532 if transaction.gas().is_empty() {
2537 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2539 SIMULATION_GAS_COIN_VALUE,
2540 transaction.gas_owner(),
2541 );
2542 let gas_object_ref = dummy_gas_object.object_ref();
2543 transaction.gas_data_mut().objects = vec![gas_object_ref];
2544 input_objects.push(ObjectReadResult::new(
2545 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2546 dummy_gas_object.into(),
2547 ));
2548 }
2549 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2550 protocol_config,
2551 &transaction_kind,
2552 input_objects,
2553 receiving_objects,
2554 )?;
2555 let gas_status = IotaGasStatus::new(
2556 max_tx_gas,
2557 transaction.gas_price(),
2558 reference_gas_price,
2559 protocol_config,
2560 )?;
2561
2562 (gas_status, checked_input_objects)
2563 } else {
2564 if transaction.gas().is_empty() {
2568 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2570 SIMULATION_GAS_COIN_VALUE,
2571 transaction.gas_owner(),
2572 );
2573 let gas_object_ref = dummy_gas_object.object_ref();
2574 transaction.gas_data_mut().objects = vec![gas_object_ref];
2575 iota_transaction_checks::check_transaction_input_with_given_gas(
2576 epoch_store.protocol_config(),
2577 reference_gas_price,
2578 &transaction,
2579 input_objects,
2580 receiving_objects,
2581 dummy_gas_object,
2582 &self.metrics.bytecode_verifier_metrics,
2583 &self.config.verifier_signing_config,
2584 )?
2585 } else {
2586 let authenticator_gas_budget = 0;
2589
2590 iota_transaction_checks::check_transaction_input(
2591 epoch_store.protocol_config(),
2592 reference_gas_price,
2593 &transaction,
2594 input_objects,
2595 &receiving_objects,
2596 &self.metrics.bytecode_verifier_metrics,
2597 &self.config.verifier_signing_config,
2598 authenticator_gas_budget,
2599 )?
2600 }
2601 };
2602
2603 let executor = iota_execution::executor(protocol_config, true, None)
2604 .expect("Creating an executor should not fail here");
2605 let gas_data = transaction.gas_data().clone();
2606 let intent_msg = IntentMessage::new(
2607 Intent {
2608 version: IntentVersion::V0,
2609 scope: IntentScope::TransactionData,
2610 app_id: IntentAppId::Iota,
2611 },
2612 transaction,
2613 );
2614 let transaction_digest = TransactionDigest::new(intent_msg.value.digest().into_inner());
2615 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2616 self.get_backing_store().as_ref(),
2617 protocol_config,
2618 self.metrics.limits_metrics.clone(),
2619 false,
2621 self.config.certificate_deny_config.certificate_deny_set(),
2622 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2623 epoch_store
2624 .epoch_start_config()
2625 .epoch_data()
2626 .epoch_start_timestamp(),
2627 checked_input_objects,
2628 gas_data,
2629 gas_status,
2630 transaction_kind,
2631 sender,
2632 transaction_digest,
2633 skip_checks,
2634 );
2635
2636 let raw_effects = if show_raw_txn_data_and_effects {
2637 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2638 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2639 })?
2640 } else {
2641 vec![]
2642 };
2643
2644 let mut layout_resolver =
2645 epoch_store
2646 .executor()
2647 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2648 &inner_temp_store,
2649 self.get_backing_package_store(),
2650 )));
2651
2652 DevInspectResults::new(
2653 effects,
2654 inner_temp_store.events.clone(),
2655 execution_result,
2656 raw_txn_data,
2657 raw_effects,
2658 layout_resolver.as_mut(),
2659 )
2660 }
2661
2662 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2664 let epoch_store = self.epoch_store_for_testing();
2665 Ok(epoch_store.reference_gas_price())
2666 }
2667
2668 #[instrument(level = "trace", skip_all)]
2669 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2670 self.get_transaction_cache_reader()
2671 .try_is_tx_already_executed(digest)
2672 }
2673
2674 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2676 self.try_is_tx_already_executed(digest)
2677 .expect("storage access failed")
2678 }
2679
2680 #[instrument(level = "debug", skip_all, err)]
2682 fn index_tx(
2683 &self,
2684 indexes: &IndexStore,
2685 digest: &TransactionDigest,
2686 transaction: &VerifiedExecutableTransaction,
2688 effects: &TransactionEffects,
2689 events: &TransactionEvents,
2690 timestamp_ms: u64,
2691 tx_coins: Option<TxCoins>,
2692 written: &WrittenObjects,
2693 inner_temporary_store: &InnerTemporaryStore,
2694 epoch_store: &Arc<AuthorityPerEpochStore>,
2695 ) -> IotaResult<u64> {
2696 let changes = self
2697 .process_object_index(effects, written, inner_temporary_store, epoch_store)
2698 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2699
2700 indexes.index_tx(
2701 transaction.data().intent_message().value.sender(),
2702 transaction
2703 .data()
2704 .intent_message()
2705 .value
2706 .input_objects()?
2707 .iter()
2708 .map(|o| o.object_id()),
2709 effects
2710 .all_changed_objects()
2711 .into_iter()
2712 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2713 transaction
2714 .data()
2715 .intent_message()
2716 .value
2717 .move_calls()
2718 .into_iter()
2719 .map(|(package, module, function)| {
2720 (*package, module.to_owned(), function.to_owned())
2721 }),
2722 events,
2723 changes,
2724 digest,
2725 timestamp_ms,
2726 tx_coins,
2727 )
2728 }
2729
2730 #[cfg(msim)]
2731 fn create_fail_state(
2732 &self,
2733 transaction: &VerifiedExecutableTransaction,
2734 epoch_store: &Arc<AuthorityPerEpochStore>,
2735 effects: &mut TransactionEffects,
2736 ) {
2737 use std::cell::RefCell;
2738
2739 use iota_types::effects::TransactionEffectsAPIForTesting;
2740 thread_local! {
2741 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2742 }
2743 if !transaction.data().intent_message().value.is_system_tx() {
2744 let committee = epoch_store.committee();
2745 let cur_stake = (**committee).weight(&self.name);
2746 if cur_stake > 0 {
2747 FAIL_STATE.with_borrow_mut(|fail_state| {
2748 if fail_state.0 < committee.validity_threshold() {
2750 fail_state.0 += cur_stake;
2751 fail_state.1.insert(self.name);
2752 }
2753
2754 if fail_state.1.contains(&self.name) {
2755 info!("cp_exec failing tx");
2756 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2757 }
2758 });
2759 }
2760 }
2761 }
2762
2763 fn process_object_index(
2764 &self,
2765 effects: &TransactionEffects,
2766 written: &WrittenObjects,
2767 inner_temporary_store: &InnerTemporaryStore,
2768 epoch_store: &Arc<AuthorityPerEpochStore>,
2769 ) -> IotaResult<ObjectIndexChanges> {
2770 let mut layout_resolver =
2771 epoch_store
2772 .executor()
2773 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2774 inner_temporary_store,
2775 self.get_backing_package_store(),
2776 )));
2777
2778 let modified_at_version = effects
2779 .modified_at_versions()
2780 .into_iter()
2781 .collect::<HashMap<_, _>>();
2782
2783 let tx_digest = effects.transaction_digest();
2784 let mut deleted_owners = vec![];
2785 let mut deleted_dynamic_fields = vec![];
2786 for object_ref in effects.deleted().into_iter().chain(effects.wrapped()) {
2787 let old_version = modified_at_version.get(&object_ref.object_id).unwrap();
2788 match self.get_owner_at_version(&object_ref.object_id, *old_version).unwrap_or_else(
2791 |e| panic!("tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {} at version {old_version:?}. Err: {e:?}", object_ref.object_id)
2792 ) {
2793 Owner::Address(addr) => deleted_owners.push((addr, object_ref.object_id)),
2794 Owner::Object(object_id) => {
2795 deleted_dynamic_fields.push((object_id, object_ref.object_id))
2796 }
2797 _ => {}
2798 }
2799 }
2800
2801 let mut new_owners = vec![];
2802 let mut new_dynamic_fields = vec![];
2803
2804 for (oref, owner, kind) in effects.all_changed_objects() {
2805 let id = &oref.object_id;
2806 if let WriteKind::Mutate = kind {
2809 let Some(old_version) = modified_at_version.get(id) else {
2810 panic!(
2811 "tx_digest={tx_digest}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2812 );
2813 };
2814 let Some(old_object) = self
2817 .get_object_store()
2818 .try_get_object_by_key(id, *old_version)?
2819 else {
2820 panic!(
2821 "tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {id} at version {old_version:?}"
2822 );
2823 };
2824 if old_object.owner != owner {
2825 match old_object.owner {
2826 Owner::Address(addr) => {
2827 deleted_owners.push((addr, *id));
2828 }
2829 Owner::Object(object_id) => deleted_dynamic_fields.push((object_id, *id)),
2830 _ => {}
2831 }
2832 }
2833 }
2834
2835 match owner {
2836 Owner::Address(addr) => {
2837 let new_object = written.get(id).unwrap_or_else(
2840 || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2841 );
2842 assert_eq!(
2843 new_object.version(),
2844 oref.version,
2845 "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2846 tx_digest,
2847 id,
2848 new_object.version(),
2849 oref.version
2850 );
2851
2852 let type_ = new_object
2853 .type_()
2854 .map(|type_| ObjectType::Struct(type_.clone()))
2855 .unwrap_or(ObjectType::Package);
2856
2857 new_owners.push((
2858 (addr, *id),
2859 ObjectInfo {
2860 object_id: *id,
2861 version: oref.version,
2862 digest: oref.digest,
2863 type_,
2864 owner,
2865 previous_transaction: *effects.transaction_digest(),
2866 },
2867 ));
2868 }
2869 Owner::Object(owner) => {
2870 let new_object = written.get(id).unwrap_or_else(
2871 || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2872 );
2873 assert_eq!(
2874 new_object.version(),
2875 oref.version,
2876 "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2877 tx_digest,
2878 id,
2879 new_object.version(),
2880 oref.version
2881 );
2882
2883 let Some(df_info) = self
2884 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2885 .unwrap_or_else(|e| {
2886 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2887 None
2888 }
2889 )
2890 else {
2891 continue;
2893 };
2894 new_dynamic_fields.push(((owner, *id), df_info))
2895 }
2896 _ => {}
2897 }
2898 }
2899
2900 Ok(ObjectIndexChanges {
2901 deleted_owners,
2902 deleted_dynamic_fields,
2903 new_owners,
2904 new_dynamic_fields,
2905 })
2906 }
2907
2908 fn try_create_dynamic_field_info(
2909 &self,
2910 o: &Object,
2911 written: &WrittenObjects,
2912 resolver: &mut dyn LayoutResolver,
2913 ) -> IotaResult<Option<DynamicFieldInfo>> {
2914 let Some(move_object) = o.data.as_opt_struct().cloned() else {
2916 return Ok(None);
2917 };
2918
2919 if !move_object.struct_tag().is_dynamic_field() {
2921 return Ok(None);
2922 }
2923
2924 let layout = match resolver.get_annotated_layout(move_object.struct_tag()) {
2925 Ok(annotated_layout) => annotated_layout.into_layout(),
2926 Err(e) => {
2927 error!(
2928 "unable to load layout for type `{:?}`: {e}",
2929 move_object.struct_tag()
2930 );
2931 return Ok(None);
2932 }
2933 };
2934
2935 let field =
2936 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2937 IotaError::ObjectDeserialization {
2938 error: e.to_string(),
2939 }
2940 })?;
2941
2942 let type_ = field.kind;
2943 let name_type: TypeTag = type_tag_core_to_sdk(&field.name_layout.into());
2944 let bcs_name = field.name_bytes.to_owned();
2945
2946 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2947 .map_err(|e| {
2948 warn!("{e}");
2949 IotaError::ObjectDeserialization {
2950 error: e.to_string(),
2951 }
2952 })?;
2953
2954 let name = DynamicFieldName {
2955 type_: name_type,
2956 value: IotaMoveValue::from(name_value).to_json_value(),
2957 };
2958
2959 let value_metadata = field.value_metadata().map_err(|e| {
2960 warn!("{e}");
2961 IotaError::ObjectDeserialization {
2962 error: e.to_string(),
2963 }
2964 })?;
2965
2966 Ok(Some(match value_metadata {
2967 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2968 name,
2969 bcs_name,
2970 type_,
2971 object_type: object_type.to_canonical_string(true),
2972 object_id: o.id(),
2973 version: o.version(),
2974 digest: o.digest(),
2975 },
2976
2977 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2978 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2983 (
2984 object.version(),
2985 object.digest(),
2986 object.data.opt_object_type().unwrap().clone(),
2987 )
2988 } else {
2989 let object = self
2991 .get_object_store()
2992 .try_get_object_by_key(&object_id, o.version())?
2993 .ok_or_else(|| UserInputError::ObjectNotFound {
2994 object_id,
2995 version: Some(o.version()),
2996 })?;
2997 let version = object.version();
2998 let digest = object.digest();
2999 let object_type = object.data.opt_object_type().unwrap().clone();
3000 (version, digest, object_type)
3001 };
3002
3003 DynamicFieldInfo {
3004 name,
3005 bcs_name,
3006 type_,
3007 object_type: object_type.to_string(),
3008 object_id,
3009 version,
3010 digest,
3011 }
3012 }
3013 }))
3014 }
3015
3016 #[instrument(level = "trace", skip_all, err)]
3017 fn post_process_one_tx(
3018 &self,
3019 transaction: &VerifiedExecutableTransaction,
3020 effects: &TransactionEffects,
3021 inner_temporary_store: &InnerTemporaryStore,
3022 epoch_store: &Arc<AuthorityPerEpochStore>,
3023 ) -> IotaResult {
3024 if self.indexes.is_none() {
3025 return Ok(());
3026 }
3027
3028 let _scope = monitored_scope("Execution::post_process_one_tx");
3029
3030 let tx_digest = transaction.digest();
3031 let timestamp_ms = Self::unixtime_now_ms();
3032 let events = &inner_temporary_store.events;
3033 let written = &inner_temporary_store.written;
3034 let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
3035 effects,
3036 inner_temporary_store,
3037 epoch_store,
3038 );
3039
3040 if let Some(indexes) = &self.indexes {
3042 let _ = self
3043 .index_tx(
3044 indexes.as_ref(),
3045 tx_digest,
3046 transaction,
3047 effects,
3048 events,
3049 timestamp_ms,
3050 tx_coins,
3051 written,
3052 inner_temporary_store,
3053 epoch_store,
3054 )
3055 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
3056 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
3057 .expect("Indexing tx should not fail");
3058
3059 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
3060 let events = self.make_transaction_block_events(
3061 events.clone(),
3062 *tx_digest,
3063 timestamp_ms,
3064 epoch_store,
3065 inner_temporary_store,
3066 )?;
3067 self.subscription_handler
3069 .process_tx(transaction.data().transaction_data(), &effects, &events)
3070 .tap_ok(|_| {
3071 self.metrics
3072 .post_processing_total_tx_had_event_processed
3073 .inc()
3074 })
3075 .tap_err(|e| {
3076 warn!(
3077 ?tx_digest,
3078 "Post processing - Couldn't process events for tx: {}", e
3079 )
3080 })?;
3081
3082 self.metrics
3083 .post_processing_total_events_emitted
3084 .inc_by(events.data.len() as u64);
3085 };
3086 Ok(())
3087 }
3088
3089 fn make_transaction_block_events(
3090 &self,
3091 transaction_events: TransactionEvents,
3092 digest: TransactionDigest,
3093 timestamp_ms: u64,
3094 epoch_store: &Arc<AuthorityPerEpochStore>,
3095 inner_temporary_store: &InnerTemporaryStore,
3096 ) -> IotaResult<IotaTransactionBlockEvents> {
3097 let mut layout_resolver =
3098 epoch_store
3099 .executor()
3100 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
3101 inner_temporary_store,
3102 self.get_backing_package_store(),
3103 )));
3104 IotaTransactionBlockEvents::try_from(
3105 transaction_events,
3106 digest,
3107 Some(timestamp_ms),
3108 layout_resolver.as_mut(),
3109 )
3110 }
3111
3112 pub fn unixtime_now_ms() -> u64 {
3113 let now = SystemTime::now()
3114 .duration_since(UNIX_EPOCH)
3115 .expect("Time went backwards")
3116 .as_millis();
3117 u64::try_from(now).expect("Travelling in time machine")
3118 }
3119
3120 #[instrument(level = "trace", skip_all)]
3121 pub async fn handle_transaction_info_request(
3122 &self,
3123 request: TransactionInfoRequest,
3124 ) -> IotaResult<TransactionInfoResponse> {
3125 let epoch_store = self.load_epoch_store_one_call_per_task();
3126 let (transaction, status) = self
3127 .get_transaction_status(&request.transaction_digest, &epoch_store)?
3128 .ok_or(IotaError::TransactionNotFound {
3129 digest: request.transaction_digest,
3130 })?;
3131 Ok(TransactionInfoResponse {
3132 transaction,
3133 status,
3134 })
3135 }
3136
3137 #[instrument(level = "trace", skip_all)]
3138 pub async fn handle_object_info_request(
3139 &self,
3140 request: ObjectInfoRequest,
3141 ) -> IotaResult<ObjectInfoResponse> {
3142 let epoch_store = self.load_epoch_store_one_call_per_task();
3143
3144 let requested_object_seq = match request.request_kind {
3145 ObjectInfoRequestKind::LatestObjectInfo => {
3146 self.try_get_object_or_tombstone(request.object_id)?
3147 .ok_or_else(|| {
3148 IotaError::from(UserInputError::ObjectNotFound {
3149 object_id: request.object_id,
3150 version: None,
3151 })
3152 })?
3153 .version
3154 }
3155 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3156 };
3157
3158 let object = self
3159 .get_object_store()
3160 .try_get_object_by_key(&request.object_id, requested_object_seq)?
3161 .ok_or_else(|| {
3162 IotaError::from(UserInputError::ObjectNotFound {
3163 object_id: request.object_id,
3164 version: Some(requested_object_seq),
3165 })
3166 })?;
3167
3168 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3169 (request.generate_layout, object.data.as_opt_struct())
3170 {
3171 Some(into_struct_layout(
3172 epoch_store
3173 .executor()
3174 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3175 .get_annotated_layout(move_obj.struct_tag())?,
3176 )?)
3177 } else {
3178 None
3179 };
3180
3181 let lock = if !object.is_address_owned() {
3182 None
3184 } else {
3185 self.get_transaction_lock(&object.object_ref(), &epoch_store)?
3186 .map(|s| s.into_inner())
3187 };
3188
3189 Ok(ObjectInfoResponse {
3190 object,
3191 layout,
3192 lock_for_debugging: lock,
3193 })
3194 }
3195
3196 #[instrument(level = "trace", skip_all)]
3197 pub fn handle_checkpoint_request(
3198 &self,
3199 request: &CheckpointRequest,
3200 ) -> IotaResult<CheckpointResponse> {
3201 let summary = if request.certified {
3202 let summary = match request.sequence_number {
3203 Some(seq) => self
3204 .checkpoint_store
3205 .get_checkpoint_by_sequence_number(seq)?,
3206 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3207 }
3208 .map(|v| v.into_inner());
3209 summary.map(CheckpointSummaryResponse::Certified)
3210 } else {
3211 let summary = match request.sequence_number {
3212 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3213 None => self
3214 .checkpoint_store
3215 .get_latest_locally_computed_checkpoint()?,
3216 };
3217 summary.map(CheckpointSummaryResponse::Pending)
3218 };
3219 let contents = match &summary {
3220 Some(s) => self
3221 .checkpoint_store
3222 .get_checkpoint_contents(&s.content_digest())?,
3223 None => None,
3224 };
3225 Ok(CheckpointResponse {
3226 checkpoint: summary,
3227 contents,
3228 })
3229 }
3230
3231 fn check_protocol_version(
3232 supported_protocol_versions: SupportedProtocolVersions,
3233 current_version: ProtocolVersion,
3234 ) {
3235 info!("current protocol version is now {:?}", current_version);
3236 info!("supported versions are: {:?}", supported_protocol_versions);
3237 if !supported_protocol_versions.is_version_supported(current_version) {
3238 let msg = format!(
3239 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3240 );
3241
3242 error!("{}", msg);
3243 eprintln!("{msg}");
3244
3245 #[cfg(not(msim))]
3246 std::process::exit(1);
3247
3248 #[cfg(msim)]
3249 iota_simulator::task::shutdown_current_node();
3250 }
3251 }
3252
3253 #[expect(clippy::disallowed_methods)] #[expect(clippy::too_many_arguments)]
3255 pub async fn new(
3256 name: AuthorityName,
3257 secret: StableSyncAuthoritySigner,
3258 supported_protocol_versions: SupportedProtocolVersions,
3259 store: Arc<AuthorityStore>,
3260 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3261 epoch_store: Arc<AuthorityPerEpochStore>,
3262 committee_store: Arc<CommitteeStore>,
3263 indexes: Option<Arc<IndexStore>>,
3264 grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3265 checkpoint_store: Arc<CheckpointStore>,
3266 prometheus_registry: &Registry,
3267 genesis_objects: &[Object],
3268 db_checkpoint_config: &DBCheckpointConfig,
3269 config: NodeConfig,
3270 archive_readers: ArchiveReaderBalancer,
3271 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3272 chain_identifier: ChainIdentifier,
3273 pruner_db: Option<Arc<AuthorityPrunerTables>>,
3274 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
3275 policy_config: Option<PolicyConfig>,
3276 firewall_config: Option<RemoteFirewallConfig>,
3277 ) -> Arc<Self> {
3278 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3279
3280 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3281 let (tx_ready_transactions, rx_ready_transactions) = unbounded_channel();
3282 let transaction_manager = Arc::new(TransactionManager::new(
3283 execution_cache_trait_pointers.object_cache_reader.clone(),
3284 execution_cache_trait_pointers
3285 .transaction_cache_reader
3286 .clone(),
3287 &epoch_store,
3288 tx_ready_transactions,
3289 metrics.clone(),
3290 ));
3291 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3292
3293 let authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3294 epoch_store.get_parent_path(),
3295 config
3296 .authority_store_pruning_config
3297 .num_latest_epoch_dbs_to_retain,
3298 )
3299 .await;
3300 let _pruner = AuthorityStorePruner::new(
3301 store.perpetual_tables.clone(),
3302 checkpoint_store.clone(),
3303 grpc_indexes_store.clone(),
3304 indexes.clone(),
3305 config.authority_store_pruning_config.clone(),
3306 epoch_store.committee().authority_exists(&name),
3307 epoch_store.epoch_start_state().epoch_duration_ms(),
3308 prometheus_registry,
3309 archive_readers,
3310 pruner_db,
3311 checkpoint_progress_tracker.clone(),
3312 );
3313 let input_loader =
3314 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3315 let epoch = epoch_store.epoch();
3316 let rgp = epoch_store.reference_gas_price();
3317 let traffic_controller_metrics =
3318 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3319 let traffic_controller = if let Some(policy_config) = policy_config {
3320 Some(Arc::new(
3321 TrafficController::init(
3322 policy_config,
3323 traffic_controller_metrics,
3324 firewall_config.clone(),
3325 )
3326 .await,
3327 ))
3328 } else {
3329 None
3330 };
3331 let state = Arc::new(AuthorityState {
3332 name,
3333 secret,
3334 execution_lock: RwLock::new(epoch),
3335 epoch_store: ArcSwap::new(epoch_store.clone()),
3336 input_loader,
3337 execution_cache_trait_pointers,
3338 indexes,
3339 grpc_indexes_store,
3340 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3341 checkpoint_store,
3342 committee_store,
3343 transaction_manager,
3344 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3345 metrics,
3346 _pruner,
3347 authority_per_epoch_pruner,
3348 checkpoint_progress_tracker,
3349 db_checkpoint_config: db_checkpoint_config.clone(),
3350 config,
3351 overload_info: AuthorityOverloadInfo::default(),
3352 validator_tx_finalizer,
3353 chain_identifier,
3354 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3355 traffic_controller,
3356 });
3357
3358 let authority_state = Arc::downgrade(&state);
3360 spawn_monitored_task!(execution_process(
3361 authority_state,
3362 rx_ready_transactions,
3363 rx_execution_shutdown,
3364 ));
3365 spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3366
3367 state
3369 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3370 .expect("Error indexing genesis objects.");
3371
3372 state
3373 }
3374
3375 pub fn epoch_db_pruner(&self) -> &AuthorityPerEpochStorePruner {
3376 &self.authority_per_epoch_pruner
3377 }
3378
3379 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3381 &self.execution_cache_trait_pointers.object_cache_reader
3382 }
3383
3384 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3385 &self.execution_cache_trait_pointers.transaction_cache_reader
3386 }
3387
3388 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3389 &self.execution_cache_trait_pointers.cache_writer
3390 }
3391
3392 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3393 &self.execution_cache_trait_pointers.backing_store
3394 }
3395
3396 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3397 &self.execution_cache_trait_pointers.backing_package_store
3398 }
3399
3400 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3401 &self.execution_cache_trait_pointers.object_store
3402 }
3403
3404 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3405 &self.execution_cache_trait_pointers.reconfig_api
3406 }
3407
3408 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3409 &self.execution_cache_trait_pointers.global_state_hash_store
3410 }
3411
3412 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3413 &self.execution_cache_trait_pointers.checkpoint_cache
3414 }
3415
3416 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3417 &self.execution_cache_trait_pointers.state_sync_store
3418 }
3419
3420 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3421 &self.execution_cache_trait_pointers.cache_commit
3422 }
3423
3424 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3425 self.execution_cache_trait_pointers
3426 .testing_api
3427 .database_for_testing()
3428 }
3429
3430 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3431 &self,
3432 config: NodeConfig,
3433 metrics: Arc<AuthorityStorePruningMetrics>,
3434 ) -> anyhow::Result<()> {
3435 let archive_readers =
3436 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3437 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3438 &self.database_for_testing().perpetual_tables,
3439 &self.checkpoint_store,
3440 self.grpc_indexes_store.as_deref(),
3441 None,
3442 config.authority_store_pruning_config,
3443 metrics,
3444 archive_readers,
3445 EPOCH_DURATION_MS_FOR_TESTING,
3446 self.checkpoint_progress_tracker.as_ref(),
3447 )
3448 .await
3449 }
3450
3451 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3452 &self.transaction_manager
3453 }
3454
3455 pub fn enqueue_transactions_for_execution(
3458 &self,
3459 transactions: Vec<VerifiedExecutableTransaction>,
3460 epoch_store: &Arc<AuthorityPerEpochStore>,
3461 ) {
3462 self.transaction_manager.enqueue(transactions, epoch_store)
3463 }
3464
3465 pub fn enqueue_certificates_for_execution(
3467 &self,
3468 certs: Vec<VerifiedCertificate>,
3469 epoch_store: &Arc<AuthorityPerEpochStore>,
3470 ) {
3471 self.transaction_manager
3472 .enqueue_certificates(certs, epoch_store)
3473 }
3474
3475 pub fn enqueue_with_expected_effects_digest(
3476 &self,
3477 transactions: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3478 epoch_store: &AuthorityPerEpochStore,
3479 ) {
3480 self.transaction_manager
3481 .enqueue_with_expected_effects_digest(transactions, epoch_store)
3482 }
3483
3484 fn create_owner_index_if_empty(
3485 &self,
3486 genesis_objects: &[Object],
3487 epoch_store: &Arc<AuthorityPerEpochStore>,
3488 ) -> IotaResult {
3489 let Some(index_store) = &self.indexes else {
3490 return Ok(());
3491 };
3492 if !index_store.is_empty() {
3493 return Ok(());
3494 }
3495
3496 let mut new_owners = vec![];
3497 let mut new_dynamic_fields = vec![];
3498 let mut layout_resolver = epoch_store
3499 .executor()
3500 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3501 for o in genesis_objects.iter() {
3502 match o.owner {
3503 Owner::Address(addr) => {
3504 new_owners.push(((addr, o.id()), ObjectInfo::new(&o.object_ref(), o)))
3505 }
3506 Owner::Object(object_id) => {
3507 let id = o.id();
3508 let info = match self.try_create_dynamic_field_info(
3509 o,
3510 &BTreeMap::new(),
3511 layout_resolver.as_mut(),
3512 ) {
3513 Ok(Some(info)) => info,
3514 Ok(None) => continue,
3515 Err(IotaError::UserInput {
3516 error:
3517 UserInputError::ObjectNotFound {
3518 object_id: not_found_id,
3519 version,
3520 },
3521 }) => {
3522 warn!(
3523 ?not_found_id,
3524 ?version,
3525 object_owner=?object_id,
3526 field=?id,
3527 "Skipping dynamic field: referenced genesis object not found"
3528 );
3529 continue;
3530 }
3531 Err(e) => return Err(e),
3532 };
3533 new_dynamic_fields.push(((object_id, id), info));
3534 }
3535 _ => {}
3536 }
3537 }
3538
3539 index_store.insert_genesis_objects(ObjectIndexChanges {
3540 deleted_owners: vec![],
3541 deleted_dynamic_fields: vec![],
3542 new_owners,
3543 new_dynamic_fields,
3544 })
3545 }
3546
3547 pub fn execution_lock_for_executable_transaction(
3551 &self,
3552 transaction: &VerifiedExecutableTransaction,
3553 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3554 let lock = self
3555 .execution_lock
3556 .try_read()
3557 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3558 if *lock == transaction.auth_sig().epoch() {
3559 Ok(lock)
3560 } else {
3561 Err(IotaError::WrongEpoch {
3562 expected_epoch: *lock,
3563 actual_epoch: transaction.auth_sig().epoch(),
3564 })
3565 }
3566 }
3567
3568 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3574 self.execution_lock
3575 .try_read()
3576 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3577 }
3578
3579 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3580 self.execution_lock.write().await
3581 }
3582
3583 #[instrument(level = "error", skip_all)]
3584 pub async fn reconfigure(
3585 &self,
3586 cur_epoch_store: &AuthorityPerEpochStore,
3587 supported_protocol_versions: SupportedProtocolVersions,
3588 new_committee: Committee,
3589 epoch_start_configuration: EpochStartConfiguration,
3590 state_hasher: Arc<GlobalStateHasher>,
3591 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3592 epoch_supply_change: i64,
3593 epoch_last_checkpoint: CheckpointSequenceNumber,
3594 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3595 Self::check_protocol_version(
3596 supported_protocol_versions,
3597 epoch_start_configuration
3598 .epoch_start_state()
3599 .protocol_version(),
3600 );
3601 self.metrics.reset_on_reconfigure();
3602 self.committee_store.insert_new_committee(&new_committee)?;
3603
3604 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3606
3607 cur_epoch_store.epoch_terminated().await;
3609
3610 let highest_locally_built_checkpoint_seq = self
3611 .checkpoint_store
3612 .get_latest_locally_computed_checkpoint()?
3613 .map(|c| *c.sequence_number())
3614 .unwrap_or(0);
3615
3616 assert!(
3617 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3618 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3619 );
3620 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint
3621 || self.is_fullnode(cur_epoch_store)
3622 {
3623 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3627 if num_shared_version_assignments > 10 {
3634 debug_fatal!(
3636 "all shared_version_assignments should have been removed \
3637 (num_shared_version_assignments: {num_shared_version_assignments})"
3638 );
3639 }
3640 }
3641
3642 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3648 .await?;
3649 self.get_reconfig_api()
3650 .clear_state_end_of_epoch(&execution_lock);
3651 self.check_system_consistency(
3652 cur_epoch_store,
3653 state_hasher,
3654 expensive_safety_check_config,
3655 epoch_supply_change,
3656 )?;
3657 self.get_reconfig_api()
3658 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3659 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3660 if self
3661 .db_checkpoint_config
3662 .perform_db_checkpoints_at_epoch_end
3663 {
3664 let checkpoint_indexes = self
3665 .db_checkpoint_config
3666 .perform_index_db_checkpoints_at_epoch_end
3667 .unwrap_or(false);
3668 let current_epoch = cur_epoch_store.epoch();
3669 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3670 self.checkpoint_all_dbs(
3671 &epoch_checkpoint_path,
3672 cur_epoch_store,
3673 checkpoint_indexes,
3674 )?;
3675 }
3676 }
3677
3678 let new_epoch = new_committee.epoch;
3679 let new_epoch_store = self
3680 .reopen_epoch_db(
3681 cur_epoch_store,
3682 new_committee,
3683 epoch_start_configuration,
3684 expensive_safety_check_config,
3685 epoch_last_checkpoint,
3686 )
3687 .await?;
3688 assert_eq!(new_epoch_store.epoch(), new_epoch);
3689 self.transaction_manager.reconfigure(new_epoch);
3690 *execution_lock = new_epoch;
3691 Ok(new_epoch_store)
3695 }
3696
3697 pub async fn reconfigure_for_testing(&self) {
3702 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3703 let epoch_store = self.epoch_store_for_testing().clone();
3704 let protocol_config = epoch_store.protocol_config().clone();
3705 let _guard =
3713 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3714 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3715 self.get_backing_package_store().clone(),
3716 &self.config.expensive_safety_check_config,
3717 self.checkpoint_store
3718 .get_epoch_last_checkpoint(epoch_store.epoch())
3719 .unwrap()
3720 .map(|c| *c.sequence_number())
3721 .unwrap_or_default(),
3722 );
3723 let new_epoch = new_epoch_store.epoch();
3724 self.transaction_manager.reconfigure(new_epoch);
3725 self.epoch_store.store(new_epoch_store);
3726 epoch_store.epoch_terminated().await;
3727 *execution_lock = new_epoch;
3728 }
3729
3730 #[instrument(level = "error", skip_all)]
3731 fn check_system_consistency(
3732 &self,
3733 cur_epoch_store: &AuthorityPerEpochStore,
3734 state_hasher: Arc<GlobalStateHasher>,
3735 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3736 epoch_supply_change: i64,
3737 ) -> IotaResult<()> {
3738 info!(
3739 "Performing iota conservation consistency check for epoch {}",
3740 cur_epoch_store.epoch()
3741 );
3742
3743 if cfg!(debug_assertions) {
3744 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3745 }
3746
3747 self.get_reconfig_api()
3748 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3749
3750 if expensive_safety_check_config.enable_state_consistency_check() {
3752 info!(
3753 "Performing state consistency check for epoch {}",
3754 cur_epoch_store.epoch()
3755 );
3756 self.expensive_check_is_consistent_state(
3757 state_hasher,
3758 cur_epoch_store,
3759 cfg!(debug_assertions), );
3761 }
3762
3763 if expensive_safety_check_config.enable_secondary_index_checks() {
3764 if let Some(indexes) = self.indexes.clone() {
3765 verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
3766 .expect("secondary indexes are inconsistent");
3767 }
3768 }
3769
3770 Ok(())
3771 }
3772
3773 fn expensive_check_is_consistent_state(
3774 &self,
3775 state_hasher: Arc<GlobalStateHasher>,
3776 cur_epoch_store: &AuthorityPerEpochStore,
3777 panic: bool,
3778 ) {
3779 let live_object_set_hash = state_hasher.digest_live_object_set();
3780
3781 let root_state_hash: ECMHLiveObjectSetDigest = self
3782 .get_global_state_hash_store()
3783 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
3784 .expect("Retrieving root state hash cannot fail")
3785 .expect("Root state hash for epoch must exist")
3786 .1
3787 .digest()
3788 .into();
3789
3790 let is_inconsistent = root_state_hash != live_object_set_hash;
3791 if is_inconsistent {
3792 if panic {
3793 panic!(
3794 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3795 );
3796 } else {
3797 error!(
3798 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3799 root_state_hash, live_object_set_hash
3800 );
3801 }
3802 } else {
3803 info!("State consistency check passed");
3804 }
3805
3806 if !panic {
3807 state_hasher.set_inconsistent_state(is_inconsistent);
3808 }
3809 }
3810
3811 pub fn current_epoch_for_testing(&self) -> EpochId {
3812 self.epoch_store_for_testing().epoch()
3813 }
3814
3815 #[instrument(level = "error", skip_all)]
3816 pub fn checkpoint_all_dbs(
3817 &self,
3818 checkpoint_path: &Path,
3819 cur_epoch_store: &AuthorityPerEpochStore,
3820 checkpoint_indexes: bool,
3821 ) -> IotaResult {
3822 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3823 let current_epoch = cur_epoch_store.epoch();
3824
3825 if checkpoint_path.exists() {
3826 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3827 return Ok(());
3828 }
3829
3830 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3831 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3832
3833 if checkpoint_path_tmp.exists() {
3834 fs::remove_dir_all(&checkpoint_path_tmp)
3835 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3836 }
3837
3838 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3839 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3840
3841 self.checkpoint_store
3844 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3845
3846 self.get_reconfig_api()
3847 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3848
3849 self.committee_store
3850 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3851
3852 if checkpoint_indexes {
3853 if let Some(indexes) = self.indexes.as_ref() {
3854 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3855 }
3856 if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3857 grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3858 }
3859 }
3860
3861 fs::rename(checkpoint_path_tmp, checkpoint_path)
3862 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3863 Ok(())
3864 }
3865
3866 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3873 self.epoch_store.load()
3874 }
3875
3876 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3878 self.load_epoch_store_one_call_per_task()
3879 }
3880
3881 pub fn clone_committee_for_testing(&self) -> Committee {
3882 Committee::clone(self.epoch_store_for_testing().committee())
3883 }
3884
3885 #[instrument(level = "trace", skip_all)]
3886 pub fn try_get_object(&self, object_id: &ObjectId) -> IotaResult<Option<Object>> {
3887 self.get_object_store()
3888 .try_get_object(object_id)
3889 .map_err(Into::into)
3890 }
3891
3892 pub fn get_object(&self, object_id: &ObjectId) -> Option<Object> {
3894 self.try_get_object(object_id)
3895 .expect("storage access failed")
3896 }
3897
3898 pub fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3899 Ok(self
3900 .try_get_object(&ObjectId::SYSTEM)?
3901 .expect("system package should always exist")
3902 .object_ref())
3903 }
3904
3905 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3907 self.get_object_cache_reader()
3908 .try_get_iota_system_state_object_unsafe()
3909 }
3910
3911 #[instrument(level = "trace", skip_all)]
3912 pub fn get_checkpoint_by_sequence_number(
3913 &self,
3914 sequence_number: CheckpointSequenceNumber,
3915 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3916 Ok(self
3917 .checkpoint_store
3918 .get_checkpoint_by_sequence_number(sequence_number)?)
3919 }
3920
3921 pub async fn wait_for_checkpoint_inclusion(
3928 &self,
3929 digests: &[TransactionDigest],
3930 timeout: Duration,
3931 ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3932 let epoch_store = self.load_epoch_store_one_call_per_task();
3933
3934 let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3937
3938 let results = epoch_store
3939 .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3940 *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3941 self.get_checkpoint_by_sequence_number(seq)
3942 .ok()
3943 .flatten()
3944 .map(|c| c.timestamp_ms)
3945 .unwrap_or(0)
3946 })
3947 })
3948 .await?;
3949
3950 Ok(digests
3951 .iter()
3952 .copied()
3953 .zip(results)
3954 .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3955 .collect())
3956 }
3957
3958 #[instrument(level = "trace", skip_all)]
3959 pub fn get_transaction_checkpoint_for_tests(
3960 &self,
3961 digest: &TransactionDigest,
3962 epoch_store: &AuthorityPerEpochStore,
3963 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3964 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3965 let Some(checkpoint) = checkpoint else {
3966 return Ok(None);
3967 };
3968 let checkpoint = self
3969 .checkpoint_store
3970 .get_checkpoint_by_sequence_number(checkpoint)?;
3971 Ok(checkpoint)
3972 }
3973
3974 #[instrument(level = "trace", skip_all)]
3975 pub fn get_object_read(&self, object_id: &ObjectId) -> IotaResult<ObjectRead> {
3976 Ok(
3977 match self
3978 .get_object_cache_reader()
3979 .try_get_latest_object_or_tombstone(*object_id)?
3980 {
3981 Some((_, ObjectOrTombstone::Object(object))) => {
3982 let layout = self.get_object_layout(&object)?;
3983 ObjectRead::Exists(object.object_ref(), object, layout)
3984 }
3985 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3986 None => ObjectRead::NotExists(*object_id),
3987 },
3988 )
3989 }
3990
3991 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3993 self.chain_identifier
3994 }
3995
3996 #[instrument(level = "trace", skip_all)]
3997 pub fn get_move_object<T>(&self, object_id: &ObjectId) -> IotaResult<T>
3998 where
3999 T: DeserializeOwned,
4000 {
4001 let o = self.get_object_read(object_id)?.into_object()?;
4002 if let Some(move_object) = o.data.as_opt_struct() {
4003 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
4004 IotaError::ObjectDeserialization {
4005 error: format!("{e}"),
4006 }
4007 })?)
4008 } else {
4009 Err(IotaError::ObjectDeserialization {
4010 error: format!("Provided object : [{object_id}] is not a Move object."),
4011 })
4012 }
4013 }
4014
4015 #[instrument(level = "trace", skip_all)]
4021 pub fn get_past_object_read(
4022 &self,
4023 object_id: &ObjectId,
4024 version: SequenceNumber,
4025 ) -> IotaResult<PastObjectRead> {
4026 let Some(obj_ref) = self
4028 .get_object_cache_reader()
4029 .try_get_latest_object_ref_or_tombstone(*object_id)?
4030 else {
4031 return Ok(PastObjectRead::ObjectNotExists(*object_id));
4032 };
4033
4034 if version > obj_ref.version {
4035 return Ok(PastObjectRead::VersionTooHigh {
4036 object_id: *object_id,
4037 asked_version: version,
4038 latest_version: obj_ref.version,
4039 });
4040 }
4041
4042 if version < obj_ref.version {
4043 return Ok(match self.read_object_at_version(object_id, version)? {
4045 Some((object, layout)) => {
4046 let obj_ref = object.object_ref();
4047 PastObjectRead::VersionFound(obj_ref, object, layout)
4048 }
4049
4050 None => PastObjectRead::VersionNotFound(*object_id, version),
4051 });
4052 }
4053
4054 if !obj_ref.digest.is_alive() {
4055 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
4056 }
4057
4058 match self.read_object_at_version(object_id, obj_ref.version)? {
4059 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
4060 None => {
4061 error!(
4062 "Object with in parent_entry is missing from object store, datastore is \
4063 inconsistent",
4064 );
4065 Err(UserInputError::ObjectNotFound {
4066 object_id: *object_id,
4067 version: Some(obj_ref.version),
4068 }
4069 .into())
4070 }
4071 }
4072 }
4073
4074 #[instrument(level = "trace", skip_all)]
4075 fn read_object_at_version(
4076 &self,
4077 object_id: &ObjectId,
4078 version: SequenceNumber,
4079 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
4080 let Some(object) = self
4081 .get_object_cache_reader()
4082 .try_get_object_by_key(object_id, version)?
4083 else {
4084 return Ok(None);
4085 };
4086
4087 let layout = self.get_object_layout(&object)?;
4088 Ok(Some((object, layout)))
4089 }
4090
4091 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
4092 let layout = object
4093 .data
4094 .as_opt_struct()
4095 .map(|object| {
4096 into_struct_layout(
4097 self.load_epoch_store_one_call_per_task()
4098 .executor()
4099 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4101 .get_annotated_layout(object.struct_tag())?,
4102 )
4103 })
4104 .transpose()?;
4105 Ok(layout)
4106 }
4107
4108 fn get_owner_at_version(
4109 &self,
4110 object_id: &ObjectId,
4111 version: SequenceNumber,
4112 ) -> IotaResult<Owner> {
4113 self.get_object_store()
4114 .try_get_object_by_key(object_id, version)?
4115 .ok_or_else(|| {
4116 IotaError::from(UserInputError::ObjectNotFound {
4117 object_id: *object_id,
4118 version: Some(version),
4119 })
4120 })
4121 .map(|o| o.owner)
4122 }
4123
4124 #[instrument(level = "trace", skip_all)]
4125 pub fn get_owner_objects(
4126 &self,
4127 owner: Address,
4128 cursor: Option<ObjectId>,
4130 limit: usize,
4131 filter: Option<IotaObjectDataFilter>,
4132 ) -> IotaResult<Vec<ObjectInfo>> {
4133 if let Some(indexes) = &self.indexes {
4134 indexes.get_owner_objects(owner, cursor, limit, filter)
4135 } else {
4136 Err(IotaError::IndexStoreNotAvailable)
4137 }
4138 }
4139
4140 #[instrument(level = "trace", skip_all)]
4141 pub fn get_owned_coins_iterator_with_cursor(
4142 &self,
4143 owner: Address,
4144 cursor: (String, ObjectId),
4146 limit: usize,
4147 one_coin_type_only: bool,
4148 ) -> IotaResult<impl Iterator<Item = (String, ObjectId, CoinInfo)> + '_> {
4149 if let Some(indexes) = &self.indexes {
4150 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4151 } else {
4152 Err(IotaError::IndexStoreNotAvailable)
4153 }
4154 }
4155
4156 #[instrument(level = "trace", skip_all)]
4157 pub fn get_owner_objects_iterator(
4158 &self,
4159 owner: Address,
4160 cursor: Option<ObjectId>,
4162 filter: Option<IotaObjectDataFilter>,
4163 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
4164 let cursor_u = cursor.unwrap_or(ObjectId::ZERO);
4165 if let Some(indexes) = &self.indexes {
4166 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4167 } else {
4168 Err(IotaError::IndexStoreNotAvailable)
4169 }
4170 }
4171
4172 #[instrument(level = "trace", skip_all)]
4173 pub fn get_move_objects<T>(&self, owner: Address, tag: StructTag) -> IotaResult<Vec<T>>
4174 where
4175 T: DeserializeOwned,
4176 {
4177 let object_ids = self
4178 .get_owner_objects_iterator(owner, None, None)?
4179 .filter(|o| match &o.type_ {
4180 ObjectType::Struct(s) => *s == tag,
4181 ObjectType::Package => false,
4182 })
4183 .map(|info| ObjectKey(info.object_id, info.version))
4184 .collect::<Vec<_>>();
4185 let mut move_objects = vec![];
4186
4187 let objects = self
4188 .get_object_store()
4189 .try_multi_get_objects_by_key(&object_ids)?;
4190
4191 for (o, id) in objects.into_iter().zip(object_ids) {
4192 let object = o.ok_or_else(|| {
4193 IotaError::from(UserInputError::ObjectNotFound {
4194 object_id: id.0,
4195 version: Some(id.1),
4196 })
4197 })?;
4198 let move_object = object.data.as_opt_struct().ok_or_else(|| {
4199 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4200 })?;
4201 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4202 IotaError::ObjectDeserialization {
4203 error: format!("{e}"),
4204 }
4205 })?);
4206 }
4207 Ok(move_objects)
4208 }
4209
4210 #[instrument(level = "trace", skip_all)]
4211 pub fn get_dynamic_fields(
4212 &self,
4213 owner: ObjectId,
4214 cursor: Option<ObjectId>,
4216 limit: usize,
4217 ) -> IotaResult<Vec<(ObjectId, DynamicFieldInfo)>> {
4218 Ok(self
4219 .get_dynamic_fields_iterator(owner, cursor)?
4220 .take(limit)
4221 .collect::<Result<Vec<_>, _>>()?)
4222 }
4223
4224 fn get_dynamic_fields_iterator(
4225 &self,
4226 owner: ObjectId,
4227 cursor: Option<ObjectId>,
4229 ) -> IotaResult<impl Iterator<Item = Result<(ObjectId, DynamicFieldInfo), TypedStoreError>> + '_>
4230 {
4231 if let Some(indexes) = &self.indexes {
4232 indexes.get_dynamic_fields_iterator(owner, cursor)
4233 } else {
4234 Err(IotaError::IndexStoreNotAvailable)
4235 }
4236 }
4237
4238 #[instrument(level = "trace", skip_all)]
4239 pub fn get_dynamic_field_object_id(
4240 &self,
4241 owner: ObjectId,
4242 name_type: TypeTag,
4243 name_bcs_bytes: &[u8],
4244 ) -> IotaResult<Option<ObjectId>> {
4245 if let Some(indexes) = &self.indexes {
4246 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4247 } else {
4248 Err(IotaError::IndexStoreNotAvailable)
4249 }
4250 }
4251
4252 #[instrument(level = "trace", skip_all)]
4253 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4254 Ok(self.get_indexes()?.next_sequence_number())
4255 }
4256
4257 #[instrument(level = "trace", skip_all)]
4258 pub async fn get_executed_transaction_and_effects(
4259 &self,
4260 digest: TransactionDigest,
4261 kv_store: Arc<TransactionKeyValueStore>,
4262 ) -> IotaResult<(Transaction, TransactionEffects)> {
4263 let transaction = kv_store.get_tx(digest).await?;
4264 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4265 Ok((transaction, effects))
4266 }
4267
4268 #[instrument(level = "trace", skip_all)]
4269 pub fn multi_get_checkpoint_by_sequence_number(
4270 &self,
4271 sequence_numbers: &[CheckpointSequenceNumber],
4272 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4273 Ok(self
4274 .checkpoint_store
4275 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4276 }
4277
4278 #[instrument(level = "trace", skip_all)]
4279 pub fn get_transaction_events(
4280 &self,
4281 digest: &TransactionDigest,
4282 ) -> IotaResult<TransactionEvents> {
4283 self.get_transaction_cache_reader()
4284 .try_get_events(digest)?
4285 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4286 }
4287
4288 pub fn get_transaction_input_objects(
4289 &self,
4290 effects: &TransactionEffects,
4291 ) -> anyhow::Result<Vec<Object>> {
4292 iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4293 .map_err(Into::into)
4294 }
4295
4296 pub fn get_transaction_output_objects(
4297 &self,
4298 effects: &TransactionEffects,
4299 ) -> anyhow::Result<Vec<Object>> {
4300 iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4301 .map_err(Into::into)
4302 }
4303
4304 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4305 match &self.indexes {
4306 Some(i) => Ok(i.clone()),
4307 None => Err(IotaError::UnsupportedFeature {
4308 error: "extended object indexing is not enabled on this server".into(),
4309 }),
4310 }
4311 }
4312
4313 pub async fn get_transactions_for_tests(
4314 self: &Arc<Self>,
4315 filter: Option<TransactionFilter>,
4316 cursor: Option<TransactionDigest>,
4317 limit: Option<usize>,
4318 reverse: bool,
4319 ) -> IotaResult<Vec<TransactionDigest>> {
4320 let metrics = KeyValueStoreMetrics::new_for_tests();
4321 let kv_store = Arc::new(TransactionKeyValueStore::new(
4322 "rocksdb",
4323 metrics,
4324 self.clone(),
4325 ));
4326 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4327 .await
4328 }
4329
4330 #[instrument(level = "trace", skip_all)]
4331 pub async fn get_transactions(
4332 &self,
4333 kv_store: &Arc<TransactionKeyValueStore>,
4334 filter: Option<TransactionFilter>,
4335 cursor: Option<TransactionDigest>,
4337 limit: Option<usize>,
4338 reverse: bool,
4339 ) -> IotaResult<Vec<TransactionDigest>> {
4340 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4341 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4342 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4343 if reverse {
4344 let iter = iter
4345 .rev()
4346 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4347 .skip(usize::from(cursor.is_some()));
4348 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4349 } else {
4350 let iter = iter
4351 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4352 .skip(usize::from(cursor.is_some()));
4353 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4354 }
4355 }
4356 self.get_indexes()?
4357 .get_transactions(filter, cursor, limit, reverse)
4358 }
4359
4360 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4361 &self.checkpoint_store
4362 }
4363
4364 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4365 self.get_checkpoint_store()
4366 .get_highest_executed_checkpoint_seq_number()?
4367 .ok_or(IotaError::UserInput {
4368 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4369 })
4370 }
4371
4372 #[cfg(msim)]
4373 pub fn get_highest_pruned_checkpoint_for_testing(
4374 &self,
4375 ) -> IotaResult<CheckpointSequenceNumber> {
4376 self.database_for_testing()
4377 .perpetual_tables
4378 .get_highest_pruned_checkpoint()
4379 .map(|c| c.unwrap_or(0))
4380 .map_err(Into::into)
4381 }
4382
4383 #[instrument(level = "trace", skip_all)]
4384 pub fn get_checkpoint_summary_by_sequence_number(
4385 &self,
4386 sequence_number: CheckpointSequenceNumber,
4387 ) -> IotaResult<CheckpointSummary> {
4388 let verified_checkpoint = self
4389 .get_checkpoint_store()
4390 .get_checkpoint_by_sequence_number(sequence_number)?;
4391 match verified_checkpoint {
4392 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4393 None => Err(IotaError::UserInput {
4394 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4395 }),
4396 }
4397 }
4398
4399 #[instrument(level = "trace", skip_all)]
4400 pub fn get_checkpoint_summary_by_digest(
4401 &self,
4402 digest: CheckpointDigest,
4403 ) -> IotaResult<CheckpointSummary> {
4404 let verified_checkpoint = self
4405 .get_checkpoint_store()
4406 .get_checkpoint_by_digest(&digest)?;
4407 match verified_checkpoint {
4408 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4409 None => Err(IotaError::UserInput {
4410 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4411 }),
4412 }
4413 }
4414
4415 #[instrument(level = "trace", skip_all)]
4416 pub fn find_publish_txn_digest(&self, package_id: ObjectId) -> IotaResult<TransactionDigest> {
4417 if package_id.is_system_package() {
4418 return self.find_genesis_txn_digest();
4419 }
4420 Ok(self
4421 .get_object_read(&package_id)?
4422 .into_object()?
4423 .previous_transaction)
4424 }
4425
4426 #[instrument(level = "trace", skip_all)]
4427 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4428 let summary = self
4429 .get_verified_checkpoint_by_sequence_number(0)?
4430 .into_message();
4431 let content = self.get_checkpoint_contents(summary.content_digest)?;
4432 let genesis_transaction = content.enumerate_transactions(&summary).next();
4433 Ok(genesis_transaction
4434 .ok_or(IotaError::UserInput {
4435 error: UserInputError::GenesisTransactionNotFound,
4436 })?
4437 .1
4438 .transaction)
4439 }
4440
4441 #[instrument(level = "trace", skip_all)]
4442 pub fn get_verified_checkpoint_by_sequence_number(
4443 &self,
4444 sequence_number: CheckpointSequenceNumber,
4445 ) -> IotaResult<VerifiedCheckpoint> {
4446 let verified_checkpoint = self
4447 .get_checkpoint_store()
4448 .get_checkpoint_by_sequence_number(sequence_number)?;
4449 match verified_checkpoint {
4450 Some(verified_checkpoint) => Ok(verified_checkpoint),
4451 None => Err(IotaError::UserInput {
4452 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4453 }),
4454 }
4455 }
4456
4457 #[instrument(level = "trace", skip_all)]
4458 pub fn get_verified_checkpoint_summary_by_digest(
4459 &self,
4460 digest: CheckpointDigest,
4461 ) -> IotaResult<VerifiedCheckpoint> {
4462 let verified_checkpoint = self
4463 .get_checkpoint_store()
4464 .get_checkpoint_by_digest(&digest)?;
4465 match verified_checkpoint {
4466 Some(verified_checkpoint) => Ok(verified_checkpoint),
4467 None => Err(IotaError::UserInput {
4468 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4469 }),
4470 }
4471 }
4472
4473 #[instrument(level = "trace", skip_all)]
4474 pub fn get_checkpoint_contents(
4475 &self,
4476 digest: CheckpointContentsDigest,
4477 ) -> IotaResult<CheckpointContents> {
4478 self.get_checkpoint_store()
4479 .get_checkpoint_contents(&digest)?
4480 .ok_or(IotaError::UserInput {
4481 error: UserInputError::CheckpointContentsNotFound(digest),
4482 })
4483 }
4484
4485 #[instrument(level = "trace", skip_all)]
4486 pub fn get_checkpoint_contents_by_sequence_number(
4487 &self,
4488 sequence_number: CheckpointSequenceNumber,
4489 ) -> IotaResult<CheckpointContents> {
4490 let verified_checkpoint = self
4491 .get_checkpoint_store()
4492 .get_checkpoint_by_sequence_number(sequence_number)?;
4493 match verified_checkpoint {
4494 Some(verified_checkpoint) => {
4495 let content_digest = verified_checkpoint.into_inner().content_digest;
4496 self.get_checkpoint_contents(content_digest)
4497 }
4498 None => Err(IotaError::UserInput {
4499 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4500 }),
4501 }
4502 }
4503
4504 #[instrument(level = "trace", skip_all)]
4505 pub async fn query_events(
4506 &self,
4507 kv_store: &Arc<TransactionKeyValueStore>,
4508 query: EventFilter,
4509 cursor: Option<EventID>,
4511 limit: usize,
4512 descending: bool,
4513 ) -> IotaResult<Vec<IotaEvent>> {
4514 let index_store = self.get_indexes()?;
4515
4516 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4518 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4519 IotaError::TransactionNotFound {
4520 digest: cursor.tx_digest,
4521 },
4522 )?;
4523 (tx_seq, cursor.event_seq as usize)
4524 } else if descending {
4525 (u64::MAX, usize::MAX)
4526 } else {
4527 (0, 0)
4528 };
4529
4530 let limit = limit + 1;
4531 let mut event_keys = match query {
4532 EventFilter::All(filters) => {
4533 if filters.is_empty() {
4534 index_store.all_events(tx_num, event_num, limit, descending)?
4535 } else {
4536 return Err(IotaError::UserInput {
4537 error: UserInputError::Unsupported(
4538 "This query type does not currently support filter combinations"
4539 .to_string(),
4540 ),
4541 });
4542 }
4543 }
4544 EventFilter::Transaction(digest) => {
4545 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4546 }
4547 EventFilter::MoveModule { package, module } => {
4548 let module_id = ModuleId::new(
4549 AccountAddress::new(package.into_bytes()),
4550 move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4551 );
4552 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4553 }
4554 EventFilter::MoveEventType(struct_name) => index_store
4555 .events_by_move_event_struct_name(
4556 &struct_name,
4557 tx_num,
4558 event_num,
4559 limit,
4560 descending,
4561 )?,
4562 EventFilter::Sender(sender) => {
4563 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4564 }
4565 EventFilter::TimeRange {
4566 start_time,
4567 end_time,
4568 } => index_store
4569 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4570 EventFilter::MoveEventModule { package, module } => index_store
4571 .events_by_move_event_module(
4572 &ModuleId::new(
4573 AccountAddress::new(package.into_bytes()),
4574 move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4575 ),
4576 tx_num,
4577 event_num,
4578 limit,
4579 descending,
4580 )?,
4581 EventFilter::Package(_)
4583 | EventFilter::MoveEventField { .. }
4584 | EventFilter::Any(_)
4585 | EventFilter::And(_, _)
4586 | EventFilter::Or(_, _) => {
4587 return Err(IotaError::UserInput {
4588 error: UserInputError::Unsupported(
4589 "This query type is not supported by the full node.".to_string(),
4590 ),
4591 });
4592 }
4593 };
4594
4595 if cursor.is_some() {
4598 if !event_keys.is_empty() {
4599 event_keys.remove(0);
4600 }
4601 } else {
4602 event_keys.truncate(limit - 1);
4603 }
4604
4605 let transaction_digests = event_keys
4607 .iter()
4608 .map(|(_, digest, _, _)| *digest)
4609 .collect::<HashSet<_>>()
4610 .into_iter()
4611 .collect::<Vec<_>>();
4612
4613 let events = kv_store
4614 .multi_get_events_by_tx_digests(&transaction_digests)
4615 .await?;
4616
4617 let events_map: HashMap<_, _> =
4618 transaction_digests.iter().zip(events.into_iter()).collect();
4619
4620 let stored_events = event_keys
4621 .into_iter()
4622 .map(|k| {
4623 (
4624 k,
4625 events_map
4626 .get(&k.1)
4627 .expect("fetched digest is missing")
4628 .clone()
4629 .and_then(|e| e.get(k.2).cloned()),
4630 )
4631 })
4632 .map(
4633 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4634 event
4635 .map(|e| (e, tx_digest, event_seq, timestamp))
4636 .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4637 },
4638 )
4639 .collect::<Result<Vec<_>, _>>()?;
4640
4641 let epoch_store = self.load_epoch_store_one_call_per_task();
4642 let backing_store = self.get_backing_package_store().as_ref();
4643 let mut layout_resolver = epoch_store
4644 .executor()
4645 .type_layout_resolver(Box::new(backing_store));
4646 let mut events = vec![];
4647 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4648 events.push(IotaEvent::try_from(
4649 e.clone(),
4650 tx_digest,
4651 event_seq as u64,
4652 Some(timestamp),
4653 layout_resolver.get_annotated_layout(&e.type_)?,
4654 )?)
4655 }
4656 Ok(events)
4657 }
4658
4659 pub fn insert_genesis_object(&self, object: Object) {
4660 self.get_reconfig_api()
4661 .try_insert_genesis_object(object)
4662 .expect("Cannot insert genesis object")
4663 }
4664
4665 pub fn insert_genesis_objects(&self, objects: &[Object]) {
4666 for o in objects {
4667 self.insert_genesis_object(o.clone());
4668 }
4669 }
4670
4671 #[instrument(level = "trace", skip_all)]
4673 pub fn get_transaction_status(
4674 &self,
4675 transaction_digest: &TransactionDigest,
4676 epoch_store: &Arc<AuthorityPerEpochStore>,
4677 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4678 if let Some(effects) =
4680 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4681 {
4682 if let Some(transaction) = self
4683 .get_transaction_cache_reader()
4684 .try_get_transaction_block(transaction_digest)?
4685 {
4686 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4687 let events = if effects.events_digest().is_some() {
4688 self.get_transaction_events(effects.transaction_digest())?
4689 } else {
4690 TransactionEvents::default()
4691 };
4692 return Ok(Some((
4693 (*transaction).clone().into_message(),
4694 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4695 )));
4696 } else {
4697 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4702 }
4703 }
4704 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4705 self.metrics.tx_already_processed.inc();
4706 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4707 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4708 } else {
4709 Ok(None)
4710 }
4711 }
4712
4713 #[instrument(level = "trace", skip_all)]
4717 pub fn get_signed_effects_and_maybe_resign(
4718 &self,
4719 transaction_digest: &TransactionDigest,
4720 epoch_store: &Arc<AuthorityPerEpochStore>,
4721 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4722 let effects = self
4723 .get_transaction_cache_reader()
4724 .try_get_executed_effects(transaction_digest)?;
4725 match effects {
4726 Some(effects) => {
4727 if effects.epoch() != epoch_store.epoch() {
4750 debug!(
4751 tx_digest=?transaction_digest,
4752 effects_epoch=?effects.epoch(),
4753 epoch=?epoch_store.epoch(),
4754 "Re-signing the effects with the current epoch"
4755 );
4756 }
4757 Ok(Some(self.sign_effects(effects, epoch_store)?))
4758 }
4759 None => Ok(None),
4760 }
4761 }
4762
4763 #[instrument(level = "trace", skip_all)]
4764 pub(crate) fn sign_effects(
4765 &self,
4766 effects: TransactionEffects,
4767 epoch_store: &Arc<AuthorityPerEpochStore>,
4768 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4769 let tx_digest = *effects.transaction_digest();
4770 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4771 Some(sig) => {
4772 debug_assert!(sig.epoch == epoch_store.epoch());
4773 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4774 }
4775 _ => {
4776 let sig = AuthoritySignInfo::new(
4777 epoch_store.epoch(),
4778 &effects,
4779 Intent::iota_app(IntentScope::TransactionEffects),
4780 self.name,
4781 &*self.secret,
4782 );
4783
4784 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4785
4786 epoch_store.insert_effects_digest_and_signature(
4787 &tx_digest,
4788 effects.digest(),
4789 &sig,
4790 )?;
4791
4792 effects
4793 }
4794 };
4795
4796 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4797 signed_effects,
4798 ))
4799 }
4800
4801 #[instrument(level = "trace", skip_all)]
4803 fn fullnode_only_get_tx_coins_for_indexing(
4804 &self,
4805 effects: &TransactionEffects,
4806 inner_temporary_store: &InnerTemporaryStore,
4807 epoch_store: &Arc<AuthorityPerEpochStore>,
4808 ) -> Option<TxCoins> {
4809 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4810 return None;
4811 }
4812 let written_coin_objects = inner_temporary_store
4813 .written
4814 .iter()
4815 .filter_map(|(k, v)| {
4816 if v.is_coin() {
4817 Some((*k, v.clone()))
4818 } else {
4819 None
4820 }
4821 })
4822 .collect();
4823 let mut input_coin_objects = inner_temporary_store
4824 .input_objects
4825 .iter()
4826 .filter_map(|(k, v)| {
4827 if v.is_coin() {
4828 Some((*k, v.clone()))
4829 } else {
4830 None
4831 }
4832 })
4833 .collect::<ObjectMap>();
4834
4835 for (object_id, version) in effects.modified_at_versions() {
4840 if inner_temporary_store
4841 .loaded_runtime_objects
4842 .contains_key(&object_id)
4843 {
4844 if let Some(object) = self
4845 .get_object_store()
4846 .get_object_by_key(&object_id, version)
4847 {
4848 if object.is_coin() {
4849 input_coin_objects.insert(object_id, object);
4850 }
4851 }
4852 }
4853 }
4854
4855 Some((input_coin_objects, written_coin_objects))
4856 }
4857
4858 #[instrument(level = "trace", skip_all)]
4870 pub fn get_transaction_lock(
4871 &self,
4872 object_ref: &ObjectRef,
4873 epoch_store: &AuthorityPerEpochStore,
4874 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4875 let lock_info = self
4876 .get_object_cache_reader()
4877 .try_get_lock(*object_ref, epoch_store)?;
4878 let lock_info = match lock_info {
4879 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4880 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4881 provided_obj_ref: *object_ref,
4882 current_version: locked_ref.version,
4883 }
4884 .into());
4885 }
4886 ObjectLockStatus::Initialized => {
4887 return Ok(None);
4888 }
4889 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4890 };
4891
4892 epoch_store.get_signed_transaction(&lock_info)
4893 }
4894
4895 pub fn try_get_objects(&self, objects: &[ObjectId]) -> IotaResult<Vec<Option<Object>>> {
4896 self.get_object_cache_reader().try_get_objects(objects)
4897 }
4898
4899 pub fn get_objects(&self, objects: &[ObjectId]) -> Vec<Option<Object>> {
4901 self.try_get_objects(objects)
4902 .expect("storage access failed")
4903 }
4904
4905 pub fn try_get_object_or_tombstone(
4906 &self,
4907 object_id: ObjectId,
4908 ) -> IotaResult<Option<ObjectRef>> {
4909 self.get_object_cache_reader()
4910 .try_get_latest_object_ref_or_tombstone(object_id)
4911 }
4912
4913 pub fn get_object_or_tombstone(&self, object_id: ObjectId) -> Option<ObjectRef> {
4915 self.try_get_object_or_tombstone(object_id)
4916 .expect("storage access failed")
4917 }
4918
4919 pub fn set_override_protocol_upgrade_buffer_stake(
4929 &self,
4930 expected_epoch: EpochId,
4931 buffer_stake_bps: u64,
4932 ) -> IotaResult {
4933 let epoch_store = self.load_epoch_store_one_call_per_task();
4934 let actual_epoch = epoch_store.epoch();
4935 if actual_epoch != expected_epoch {
4936 return Err(IotaError::WrongEpoch {
4937 expected_epoch,
4938 actual_epoch,
4939 });
4940 }
4941
4942 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4943 }
4944
4945 pub fn clear_override_protocol_upgrade_buffer_stake(
4946 &self,
4947 expected_epoch: EpochId,
4948 ) -> IotaResult {
4949 let epoch_store = self.load_epoch_store_one_call_per_task();
4950 let actual_epoch = epoch_store.epoch();
4951 if actual_epoch != expected_epoch {
4952 return Err(IotaError::WrongEpoch {
4953 expected_epoch,
4954 actual_epoch,
4955 });
4956 }
4957
4958 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4959 }
4960
4961 pub async fn get_available_system_packages(
4965 &self,
4966 binary_config: &BinaryConfig,
4967 ) -> Vec<ObjectRef> {
4968 let mut results = vec![];
4969
4970 let system_packages = BuiltInFramework::iter_system_packages();
4971
4972 #[cfg(msim)]
4974 let extra_packages = framework_injection::get_extra_packages(self.name);
4975 #[cfg(msim)]
4976 let system_packages = {
4977 let mut packages: Vec<_> = system_packages.collect();
4978 packages.extend(extra_packages.iter());
4979 packages
4980 };
4981
4982 for system_package in system_packages {
4983 let modules = system_package.modules().to_vec();
4984 #[cfg(msim)]
4986 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4987 .unwrap_or(modules);
4988
4989 let Some(obj_ref) = iota_framework::compare_system_package(
4990 &self.get_object_store(),
4991 &system_package.id,
4992 &modules,
4993 system_package.dependencies.to_vec(),
4994 binary_config,
4995 )
4996 .await
4997 else {
4998 return vec![];
4999 };
5000 results.push(obj_ref);
5001 }
5002
5003 results
5004 }
5005
5006 async fn get_system_package_bytes(
5023 &self,
5024 system_packages: Vec<ObjectRef>,
5025 binary_config: &BinaryConfig,
5026 ) -> Option<Vec<SystemPackage>> {
5027 let ids: Vec<_> = system_packages
5028 .iter()
5029 .map(|object_ref| object_ref.object_id)
5030 .collect();
5031 let objects = self.get_objects(&ids);
5032
5033 let mut res = Vec::with_capacity(system_packages.len());
5034 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
5035 let prev_transaction = match object {
5036 Some(cur_object) if cur_object.object_ref() == system_package_ref => {
5037 info!(
5039 "Framework {} does not need updating",
5040 system_package_ref.object_id
5041 );
5042 continue;
5043 }
5044
5045 Some(cur_object) => cur_object.previous_transaction,
5046 None => TransactionDigest::GENESIS_MARKER,
5047 };
5048
5049 #[cfg(msim)]
5050 let FrameworkSystemPackage {
5051 id: _,
5052 bytes,
5053 dependencies,
5054 } = framework_injection::get_override_system_package(
5055 &system_package_ref.object_id,
5056 self.name,
5057 )
5058 .unwrap_or_else(|| {
5059 BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone()
5060 });
5061
5062 #[cfg(not(msim))]
5063 let FrameworkSystemPackage {
5064 id: _,
5065 bytes,
5066 dependencies,
5067 } = BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone();
5068
5069 let modules: Vec<_> = bytes
5070 .iter()
5071 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
5072 .collect();
5073
5074 let new_object = Object::new_system_package(
5075 &modules,
5076 system_package_ref.version,
5077 dependencies.clone(),
5078 prev_transaction,
5079 );
5080
5081 let new_ref = new_object.object_ref();
5082 if new_ref != system_package_ref {
5083 error!(
5084 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
5085 );
5086 return None;
5087 }
5088
5089 res.push(SystemPackage {
5090 version: system_package_ref.version,
5091 modules: bytes,
5092 dependencies,
5093 });
5094 }
5095
5096 Some(res)
5097 }
5098
5099 fn is_protocol_version_supported_v1(
5103 proposed_protocol_version: ProtocolVersion,
5104 committee: &Committee,
5105 capabilities: Vec<AuthorityCapabilitiesV1>,
5106 mut buffer_stake_bps: u64,
5107 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
5108 if buffer_stake_bps > 10000 {
5109 warn!("clamping buffer_stake_bps to 10000");
5110 buffer_stake_bps = 10000;
5111 }
5112
5113 let mut desired_upgrades: Vec<_> = capabilities
5116 .into_iter()
5117 .filter_map(|mut cap| {
5118 if cap.available_system_packages.is_empty() {
5120 return None;
5121 }
5122
5123 cap.available_system_packages.sort();
5124
5125 info!(
5126 "validator {:?} supports {:?} with system packages: {:?}",
5127 cap.authority.concise(),
5128 cap.supported_protocol_versions,
5129 cap.available_system_packages,
5130 );
5131
5132 cap.supported_protocol_versions
5136 .get_version_digest(proposed_protocol_version)
5137 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5138 })
5139 .collect();
5140
5141 desired_upgrades.sort();
5144 desired_upgrades
5145 .into_iter()
5146 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5147 .into_iter()
5148 .find_map(|((digest, packages), group)| {
5149 assert!(!packages.is_empty());
5151
5152 let mut stake_aggregator: StakeAggregator<(), true> =
5153 StakeAggregator::new(Arc::new(committee.clone()));
5154
5155 for (_, _, authority) in group {
5156 stake_aggregator.insert_generic(authority, ());
5157 }
5158
5159 let total_votes = stake_aggregator.total_votes();
5160 let quorum_threshold = committee.quorum_threshold();
5161 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5162
5163 info!(
5164 protocol_config_digest = ?digest,
5165 ?total_votes,
5166 ?quorum_threshold,
5167 ?buffer_stake_bps,
5168 ?effective_threshold,
5169 ?proposed_protocol_version,
5170 ?packages,
5171 "support for upgrade"
5172 );
5173
5174 let has_support = total_votes >= effective_threshold;
5175 has_support.then_some((proposed_protocol_version, digest, packages))
5176 })
5177 }
5178
5179 fn choose_protocol_version_and_system_packages_v1(
5183 current_protocol_version: ProtocolVersion,
5184 current_protocol_digest: Digest,
5185 committee: &Committee,
5186 capabilities: Vec<AuthorityCapabilitiesV1>,
5187 buffer_stake_bps: u64,
5188 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
5189 let mut next_protocol_version = current_protocol_version;
5190 let mut system_packages = vec![];
5191 let mut protocol_version_digest = current_protocol_digest;
5192
5193 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
5197 next_protocol_version + 1,
5198 committee,
5199 capabilities.clone(),
5200 buffer_stake_bps,
5201 ) {
5202 next_protocol_version = version;
5203 protocol_version_digest = digest;
5204 system_packages = packages;
5205 }
5206
5207 (
5208 next_protocol_version,
5209 protocol_version_digest,
5210 system_packages,
5211 )
5212 }
5213
5214 fn get_validators_supporting_protocol_version(
5219 target_protocol_version: ProtocolVersion,
5220 target_digest: Digest,
5221 active_validators: &[AuthorityPublicKey],
5222 capabilities: &[AuthorityCapabilitiesV1],
5223 ) -> Vec<u64> {
5224 let mut eligible_validators = Vec::new();
5225
5226 for capability in capabilities {
5227 if let Some(digest) = capability
5229 .supported_protocol_versions
5230 .get_version_digest(target_protocol_version)
5231 {
5232 if digest == target_digest {
5233 if let Some(index) = active_validators
5235 .iter()
5236 .position(|name| AuthorityName::from(name) == capability.authority)
5237 {
5238 eligible_validators.push(index as u64);
5239 }
5240 }
5241 }
5242 }
5243
5244 eligible_validators.sort();
5246 eligible_validators
5247 }
5248
5249 fn calculate_eligible_validators_weight(
5254 eligible_validator_indices: &[u64],
5255 active_validators: &[AuthorityPublicKey],
5256 committee: &Committee,
5257 ) -> u64 {
5258 let mut total_weight = 0u64;
5259
5260 for &index in eligible_validator_indices {
5261 let authority_pubkey = &active_validators[index as usize];
5262 if let Some((_, weight)) = committee
5264 .members()
5265 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5266 {
5267 total_weight += weight;
5268 }
5269 }
5270
5271 total_weight
5272 }
5273
5274 #[instrument(level = "error", skip_all)]
5287 pub async fn create_and_execute_advance_epoch_tx(
5288 &self,
5289 epoch_store: &Arc<AuthorityPerEpochStore>,
5290 gas_cost_summary: &GasCostSummary,
5291 checkpoint: CheckpointSequenceNumber,
5292 epoch_start_timestamp_ms: CheckpointTimestamp,
5293 scores: Vec<u64>,
5294 ) -> anyhow::Result<(
5295 IotaSystemState,
5296 Option<SystemEpochInfoEvent>,
5297 TransactionEffects,
5298 )> {
5299 let mut txns = Vec::new();
5300
5301 let next_epoch = epoch_store.epoch() + 1;
5302
5303 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5304 let authority_capabilities = epoch_store
5305 .get_capabilities_v1()
5306 .expect("read capabilities from db cannot fail");
5307 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5308 Self::choose_protocol_version_and_system_packages_v1(
5309 epoch_store.protocol_version(),
5310 SupportedProtocolVersionsWithHashes::protocol_config_digest(
5311 epoch_store.protocol_config(),
5312 ),
5313 epoch_store.committee(),
5314 authority_capabilities.clone(),
5315 buffer_stake_bps,
5316 );
5317
5318 let config = epoch_store.protocol_config();
5322 let binary_config = to_binary_config(config);
5323 let Some(next_epoch_system_package_bytes) = self
5324 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5325 .await
5326 else {
5327 error!(
5328 "upgraded system packages {:?} are not locally available, cannot create \
5329 ChangeEpochTx. validator binary must be upgraded to the correct version!",
5330 next_epoch_system_packages
5331 );
5332 bail!("missing system packages: cannot form ChangeEpochTx");
5342 };
5343
5344 if config.select_committee_from_eligible_validators() {
5347 let active_validators = epoch_store.epoch_start_state().get_active_validators();
5349
5350 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5351
5352 if config.select_committee_supporting_next_epoch_version() {
5356 eligible_active_validators = Self::get_validators_supporting_protocol_version(
5357 next_epoch_protocol_version,
5358 next_epoch_protocol_digest,
5359 &active_validators,
5360 &authority_capabilities,
5361 );
5362
5363 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5365 &eligible_active_validators,
5366 &active_validators,
5367 epoch_store.committee(),
5368 );
5369
5370 let committee = epoch_store.committee();
5374 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5375
5376 if eligible_validators_weight < effective_threshold {
5377 error!(
5378 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5379 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5380 );
5381 eligible_active_validators = (0..active_validators.len() as u64).collect();
5384 }
5385 }
5386
5387 if config.pass_validator_scores_to_advance_epoch() {
5390 txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5391 next_epoch,
5392 next_epoch_protocol_version.as_u64(),
5393 gas_cost_summary.storage_cost,
5394 gas_cost_summary.computation_cost,
5395 gas_cost_summary.computation_cost_burned,
5396 gas_cost_summary.storage_rebate,
5397 gas_cost_summary.non_refundable_storage_fee,
5398 epoch_start_timestamp_ms,
5399 next_epoch_system_package_bytes,
5400 eligible_active_validators,
5401 scores,
5402 config.adjust_rewards_by_score(),
5403 ));
5404 } else {
5405 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5406 next_epoch,
5407 next_epoch_protocol_version.as_u64(),
5408 gas_cost_summary.storage_cost,
5409 gas_cost_summary.computation_cost,
5410 gas_cost_summary.computation_cost_burned,
5411 gas_cost_summary.storage_rebate,
5412 gas_cost_summary.non_refundable_storage_fee,
5413 epoch_start_timestamp_ms,
5414 next_epoch_system_package_bytes,
5415 eligible_active_validators,
5416 ));
5417 }
5418 } else if config.protocol_defined_base_fee()
5419 && config.max_committee_members_count_as_option().is_some()
5420 {
5421 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5422 next_epoch,
5423 next_epoch_protocol_version.as_u64(),
5424 gas_cost_summary.storage_cost,
5425 gas_cost_summary.computation_cost,
5426 gas_cost_summary.computation_cost_burned,
5427 gas_cost_summary.storage_rebate,
5428 gas_cost_summary.non_refundable_storage_fee,
5429 epoch_start_timestamp_ms,
5430 next_epoch_system_package_bytes,
5431 ));
5432 } else {
5433 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5434 next_epoch,
5435 next_epoch_protocol_version.as_u64(),
5436 gas_cost_summary.storage_cost,
5437 gas_cost_summary.computation_cost,
5438 gas_cost_summary.storage_rebate,
5439 gas_cost_summary.non_refundable_storage_fee,
5440 epoch_start_timestamp_ms,
5441 next_epoch_system_package_bytes,
5442 ));
5443 }
5444
5445 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5446
5447 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5448 tx.clone(),
5449 epoch_store.epoch(),
5450 checkpoint,
5451 );
5452
5453 let tx_digest = executable_tx.digest();
5454
5455 info!(
5456 ?next_epoch,
5457 ?next_epoch_protocol_version,
5458 ?next_epoch_system_packages,
5459 computation_cost=?gas_cost_summary.computation_cost,
5460 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5461 storage_cost=?gas_cost_summary.storage_cost,
5462 storage_rebate=?gas_cost_summary.storage_rebate,
5463 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5464 ?tx_digest,
5465 "Creating advance epoch transaction"
5466 );
5467
5468 fail_point_async!("change_epoch_tx_delay");
5469 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5470
5471 if self
5475 .get_transaction_cache_reader()
5476 .try_is_tx_already_executed(tx_digest)?
5477 {
5478 warn!("change epoch tx has already been executed via state sync");
5479 bail!("change epoch tx has already been executed via state sync",);
5480 }
5481
5482 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5483
5484 epoch_store.assign_shared_object_versions_idempotent(
5488 self.get_object_cache_reader().as_ref(),
5489 std::slice::from_ref(&executable_tx),
5490 )?;
5491
5492 let (input_objects, _) =
5493 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5494
5495 let (temporary_store, effects, _execution_error_opt) = self.execute_transaction(
5496 &execution_guard,
5497 &executable_tx,
5498 input_objects,
5499 vec![],
5500 epoch_store,
5501 )?;
5502 let system_obj = get_iota_system_state(&temporary_store.written)
5503 .expect("change epoch tx must write to system object");
5504 let system_epoch_info_event = temporary_store
5506 .events
5507 .0
5508 .into_iter()
5509 .find(|event| event.is_system_epoch_info_event())
5510 .map(SystemEpochInfoEvent::from);
5511 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5514
5515 self.get_state_sync_store()
5519 .try_insert_transaction_and_effects(&tx, &effects)
5520 .map_err(|err| {
5521 let err: anyhow::Error = err.into();
5522 err
5523 })?;
5524
5525 info!(
5526 "Effects summary of the change epoch transaction: {:?}",
5527 effects.summary_for_debug()
5528 );
5529 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5530 assert!(effects.status().is_success());
5532 Ok((system_obj, system_epoch_info_event, effects))
5533 }
5534
5535 #[instrument(level = "error", skip_all)]
5539 async fn revert_uncommitted_epoch_transactions(
5540 &self,
5541 epoch_store: &AuthorityPerEpochStore,
5542 ) -> IotaResult {
5543 {
5544 let state = epoch_store.get_reconfig_state_write_lock_guard();
5545 if state.should_accept_user_certs() {
5546 epoch_store.close_user_certs(state);
5555 }
5556 }
5558
5559 if !epoch_store.protocol_config().enable_pcool_flow() {
5562 let pending_certificates = epoch_store.pending_consensus_certificates();
5563 info!(
5564 "Reverting {} locally executed transactions that was not included in the epoch: \
5565 {:?}",
5566 pending_certificates.len(),
5567 pending_certificates,
5568 );
5569 for digest in pending_certificates {
5570 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5571 info!(
5572 "Not reverting pending consensus transaction {:?} - it was included in \
5573 checkpoint",
5574 digest
5575 );
5576 continue;
5577 }
5578 info!("Reverting {:?} at the end of epoch", digest);
5579 epoch_store.revert_executed_transaction(&digest)?;
5580 self.get_reconfig_api().try_revert_state_update(&digest)?;
5581 }
5582 info!("All uncommitted local transactions reverted");
5583 } else {
5584 info!("P-COOL mode: skipping revert of uncommitted epoch transactions");
5585 }
5586
5587 Ok(())
5588 }
5589
5590 #[instrument(level = "error", skip_all)]
5591 async fn reopen_epoch_db(
5592 &self,
5593 cur_epoch_store: &AuthorityPerEpochStore,
5594 new_committee: Committee,
5595 epoch_start_configuration: EpochStartConfiguration,
5596 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5597 epoch_last_checkpoint: CheckpointSequenceNumber,
5598 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5599 let new_epoch = new_committee.epoch;
5600 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5601 assert_eq!(
5602 epoch_start_configuration.epoch_start_state().epoch(),
5603 new_committee.epoch
5604 );
5605 fail_point!("before-open-new-epoch-store");
5606 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5607 self.name,
5608 new_committee,
5609 epoch_start_configuration,
5610 self.get_backing_package_store().clone(),
5611 expensive_safety_check_config,
5612 epoch_last_checkpoint,
5613 )?;
5614 self.epoch_store.store(new_epoch_store.clone());
5615 Ok(new_epoch_store)
5616 }
5617
5618 fn check_move_account(
5621 &self,
5622 auth_account_object_id: ObjectId,
5623 auth_account_object_seq_number: Option<SequenceNumber>,
5624 auth_account_object_digest: Option<ObjectDigest>,
5625 account_object: ObjectReadResult,
5626 signer: &Address,
5627 ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5628 let account_object = match account_object.object {
5629 ObjectReadResultKind::Object(object) => Ok(object),
5630 ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5631 Err(UserInputError::AccountObjectDeleted {
5632 account_id: account_object.id(),
5633 account_version: version,
5634 transaction_digest: digest,
5635 })
5636 }
5637 ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5640 Err(UserInputError::AccountObjectInCanceledTransaction {
5641 account_id: account_object.id(),
5642 account_version: version,
5643 })
5644 }
5645 }?;
5646
5647 let account_object_addr = Address::from(auth_account_object_id);
5648
5649 fp_ensure!(
5650 signer == &account_object_addr,
5651 UserInputError::IncorrectUserSignature {
5652 error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5653 }
5654 .into()
5655 );
5656
5657 fp_ensure!(
5658 account_object.is_shared() || account_object.is_immutable(),
5659 UserInputError::AccountObjectNotSupported {
5660 object_id: auth_account_object_id
5661 }
5662 .into()
5663 );
5664
5665 let auth_account_object_seq_number =
5666 if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5667 let account_object_version = account_object.version();
5668
5669 fp_ensure!(
5670 account_object_version == auth_account_object_seq_number,
5671 UserInputError::AccountObjectVersionMismatch {
5672 object_id: auth_account_object_id,
5673 expected_version: auth_account_object_seq_number,
5674 actual_version: account_object_version,
5675 }
5676 .into()
5677 );
5678
5679 auth_account_object_seq_number
5680 } else {
5681 account_object.version()
5682 };
5683
5684 if let Some(auth_account_object_digest) = auth_account_object_digest {
5685 let expected_digest = account_object.digest();
5686 fp_ensure!(
5687 expected_digest == auth_account_object_digest,
5688 UserInputError::InvalidAccountObjectDigest {
5689 object_id: auth_account_object_id,
5690 expected_digest,
5691 actual_digest: auth_account_object_digest,
5692 }
5693 .into()
5694 );
5695 }
5696
5697 let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5698 auth_account_object_id,
5699 &AuthenticatorFunctionRefV1Key::tag().into(),
5700 &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5701 )
5702 .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5703 account_object_id: auth_account_object_id,
5704 })?;
5705
5706 let authenticator_function_ref_field = self
5707 .get_object_cache_reader()
5708 .try_find_object_lt_or_eq_version(
5709 authenticator_function_ref_field_id,
5710 auth_account_object_seq_number,
5711 )?;
5712
5713 if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5714 let field_move_object = authenticator_function_ref_field_obj
5715 .data
5716 .as_opt_struct()
5717 .expect("dynamic field should never be a package object");
5718
5719 let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5720 field_move_object.to_rust().map_err(|_| {
5721 UserInputError::InvalidAuthenticatorFunctionRefField {
5722 account_object_id: auth_account_object_id,
5723 }
5724 })?;
5725
5726 Ok(AuthenticatorFunctionRefForExecution::new_v1(
5727 field.value,
5728 authenticator_function_ref_field_obj.object_ref(),
5729 authenticator_function_ref_field_obj.owner,
5730 authenticator_function_ref_field_obj.storage_rebate,
5731 authenticator_function_ref_field_obj.previous_transaction,
5732 ))
5733 } else {
5734 Err(UserInputError::MoveAuthenticatorNotFound {
5735 authenticator_function_ref_id: authenticator_function_ref_field_id,
5736 account_object_id: auth_account_object_id,
5737 account_object_version: auth_account_object_seq_number,
5738 }
5739 .into())
5740 }
5741 }
5742
5743 #[allow(clippy::type_complexity)]
5744 fn read_objects_for_validation(
5745 &self,
5746 transaction: &VerifiedTransaction,
5747 epoch: u64,
5748 ) -> IotaResult<(
5749 InputObjects,
5750 ReceivingObjects,
5751 Vec<(InputObjects, ObjectReadResult)>,
5752 )> {
5753 let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5754 Some(transaction.digest()),
5755 &transaction.collect_all_input_object_kind_for_reading()?,
5756 &transaction.data().transaction_data().receiving_objects(),
5757 epoch,
5758 )?;
5759
5760 transaction
5761 .split_input_objects_into_groups_for_reading(input_objects)
5762 .map(|(tx_input_objects, per_authenticator_inputs)| {
5763 (
5764 tx_input_objects,
5765 tx_receiving_objects,
5766 per_authenticator_inputs,
5767 )
5768 })
5769 }
5770
5771 #[allow(clippy::type_complexity)]
5772 fn check_transaction_inputs_for_validation(
5773 &self,
5774 protocol_config: &ProtocolConfig,
5775 reference_gas_price: u64,
5776 tx_data: &TransactionData,
5777 tx_input_objects: InputObjects,
5778 tx_receiving_objects: &ReceivingObjects,
5779 move_authenticators: &Vec<&MoveAuthenticator>,
5780 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5781 ) -> IotaResult<(
5782 IotaGasStatus,
5783 CheckedInputObjects,
5784 Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5785 )> {
5786 let authenticator_gas_budget = if move_authenticators.is_empty() {
5787 0
5788 } else {
5789 protocol_config.max_auth_gas()
5792 };
5793
5794 debug_assert_eq!(
5795 move_authenticators.len(),
5796 per_authenticator_inputs.len(),
5797 "Move authenticators amount must match the number of authenticator inputs"
5798 );
5799
5800 let per_authenticator_checked_inputs = move_authenticators
5801 .iter()
5802 .zip(per_authenticator_inputs)
5803 .map(
5804 |(move_authenticator, (authenticator_input_objects, account_object))| {
5805 let (
5807 auth_account_object_id,
5808 auth_account_object_seq_number,
5809 auth_account_object_digest,
5810 ) = move_authenticator.object_to_authenticate_components()?;
5811
5812 let signer = move_authenticator.address();
5813
5814 let AuthenticatorFunctionRefForExecution {
5816 authenticator_function_ref,
5817 ..
5818 } = self.check_move_account(
5819 auth_account_object_id,
5820 auth_account_object_seq_number,
5821 auth_account_object_digest,
5822 account_object,
5823 &signer,
5824 )?;
5825
5826 let authenticator_checked_input_objects =
5828 iota_transaction_checks::check_move_authenticator_input_for_validation(
5829 authenticator_input_objects,
5830 )?;
5831
5832 Ok((
5833 authenticator_checked_input_objects,
5834 authenticator_function_ref,
5835 ))
5836 },
5837 )
5838 .collect::<IotaResult<Vec<_>>>()?;
5839
5840 let (gas_status, tx_checked_input_objects) =
5842 iota_transaction_checks::check_transaction_input(
5843 protocol_config,
5844 reference_gas_price,
5845 tx_data,
5846 tx_input_objects,
5847 tx_receiving_objects,
5848 &self.metrics.bytecode_verifier_metrics,
5849 &self.config.verifier_signing_config,
5850 authenticator_gas_budget,
5851 )?;
5852
5853 Ok((
5854 gas_status,
5855 tx_checked_input_objects,
5856 per_authenticator_checked_inputs,
5857 ))
5858 }
5859
5860 #[cfg(test)]
5861 pub(crate) fn iter_live_object_set_for_testing(
5862 &self,
5863 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5864 self.get_global_state_hash_store()
5865 .iter_cached_live_object_set_for_testing()
5866 }
5867
5868 #[cfg(test)]
5869 pub(crate) fn shutdown_execution_for_test(&self) {
5870 self.tx_execution_shutdown
5871 .lock()
5872 .take()
5873 .unwrap()
5874 .send(())
5875 .unwrap();
5876 }
5877
5878 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5881 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5882 self.get_object_cache_reader()
5883 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5884 self.get_reconfig_api()
5885 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5886 }
5887}
5888
5889pub struct RandomnessRoundReceiver {
5890 authority_state: Arc<AuthorityState>,
5891 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5892}
5893
5894impl RandomnessRoundReceiver {
5895 pub fn spawn(
5896 authority_state: Arc<AuthorityState>,
5897 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5898 ) -> JoinHandle<()> {
5899 let rrr = RandomnessRoundReceiver {
5900 authority_state,
5901 randomness_rx,
5902 };
5903 spawn_monitored_task!(rrr.run())
5904 }
5905
5906 async fn run(mut self) {
5907 info!("RandomnessRoundReceiver event loop started");
5908
5909 loop {
5910 tokio::select! {
5911 maybe_recv = self.randomness_rx.recv() => {
5912 if let Some((epoch, round, bytes)) = maybe_recv {
5913 self.handle_new_randomness(epoch, round, bytes).await;
5914 } else {
5915 break;
5916 }
5917 },
5918 }
5919 }
5920
5921 info!("RandomnessRoundReceiver event loop ended");
5922 }
5923
5924 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5925 async fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5926 fail_point_async!("randomness-delay");
5927
5928 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5929 if epoch_store.epoch() != epoch {
5930 warn!(
5931 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5932 epoch_store.epoch()
5933 );
5934 return;
5935 }
5936 let transaction = VerifiedTransaction::new_randomness_state_update(
5937 epoch,
5938 round,
5939 bytes,
5940 epoch_store
5941 .epoch_start_config()
5942 .randomness_obj_initial_shared_version(),
5943 );
5944 debug!(
5945 "created randomness state update transaction with digest: {:?}",
5946 transaction.digest()
5947 );
5948 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5949 let digest = *transaction.digest();
5950
5951 self.authority_state
5956 .get_cache_commit()
5957 .persist_transaction(&transaction);
5958
5959 self.authority_state
5961 .transaction_manager()
5962 .enqueue(vec![transaction], &epoch_store);
5963
5964 let authority_state = self.authority_state.clone();
5965 spawn_monitored_task!(async move {
5966 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5975 let result = tokio::time::timeout(
5976 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5977 authority_state
5978 .get_transaction_cache_reader()
5979 .try_notify_read_executed_effects(&[digest]),
5980 )
5981 .await;
5982 let result = match result {
5983 Ok(result) => result,
5984 Err(_) => {
5985 if cfg!(debug_assertions) {
5986 panic!(
5988 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5989 );
5990 }
5991 warn!(
5992 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5993 );
5994 authority_state
5996 .get_transaction_cache_reader()
5997 .try_notify_read_executed_effects(&[digest])
5998 .await
5999 }
6000 };
6001
6002 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
6003 let effects = effects.pop().expect("should return effects");
6004 if *effects.status() != ExecutionStatus::Success {
6005 fatal!(
6006 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
6007 );
6008 }
6009 debug!(
6010 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
6011 );
6012 });
6013 }
6014}
6015
6016#[async_trait]
6017impl TransactionKeyValueStoreTrait for AuthorityState {
6018 async fn multi_get(
6019 &self,
6020 transaction_keys: &[TransactionDigest],
6021 effects_keys: &[TransactionDigest],
6022 ) -> IotaResult<KVStoreTransactionData> {
6023 let txns = if !transaction_keys.is_empty() {
6024 self.get_transaction_cache_reader()
6025 .try_multi_get_transaction_blocks(transaction_keys)?
6026 .into_iter()
6027 .map(|t| t.map(|t| (*t).clone().into_inner()))
6028 .collect()
6029 } else {
6030 vec![]
6031 };
6032
6033 let fx = if !effects_keys.is_empty() {
6034 self.get_transaction_cache_reader()
6035 .try_multi_get_executed_effects(effects_keys)?
6036 } else {
6037 vec![]
6038 };
6039
6040 Ok((txns, fx))
6041 }
6042
6043 async fn multi_get_checkpoints(
6044 &self,
6045 checkpoint_summaries: &[CheckpointSequenceNumber],
6046 checkpoint_contents: &[CheckpointSequenceNumber],
6047 checkpoint_summaries_by_digest: &[CheckpointDigest],
6048 ) -> IotaResult<(
6049 Vec<Option<CertifiedCheckpointSummary>>,
6050 Vec<Option<CheckpointContents>>,
6051 Vec<Option<CertifiedCheckpointSummary>>,
6052 )> {
6053 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
6055 let store = self.get_checkpoint_store();
6056 for seq in checkpoint_summaries {
6057 let checkpoint = store
6058 .get_checkpoint_by_sequence_number(*seq)?
6059 .map(|c| c.into_inner());
6060
6061 summaries.push(checkpoint);
6062 }
6063
6064 let mut contents = Vec::with_capacity(checkpoint_contents.len());
6065 for seq in checkpoint_contents {
6066 let checkpoint = store
6067 .get_checkpoint_by_sequence_number(*seq)?
6068 .and_then(|summary| {
6069 store
6070 .get_checkpoint_contents(&summary.content_digest)
6071 .expect("db read cannot fail")
6072 });
6073 contents.push(checkpoint);
6074 }
6075
6076 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
6077 for digest in checkpoint_summaries_by_digest {
6078 let checkpoint = store
6079 .get_checkpoint_by_digest(digest)?
6080 .map(|c| c.into_inner());
6081 summaries_by_digest.push(checkpoint);
6082 }
6083
6084 Ok((summaries, contents, summaries_by_digest))
6085 }
6086
6087 async fn get_transaction_perpetual_checkpoint(
6088 &self,
6089 digest: TransactionDigest,
6090 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
6091 self.get_checkpoint_cache()
6092 .try_get_transaction_perpetual_checkpoint(&digest)
6093 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
6094 }
6095
6096 async fn get_object(
6097 &self,
6098 object_id: ObjectId,
6099 version: VersionNumber,
6100 ) -> IotaResult<Option<Object>> {
6101 self.get_object_cache_reader()
6102 .try_get_object_by_key(&object_id, version)
6103 }
6104
6105 #[instrument(skip_all)]
6106 async fn multi_get_objects(
6107 &self,
6108 object_keys: &[ObjectKey],
6109 ) -> IotaResult<Vec<Option<Object>>> {
6110 Ok(self
6111 .get_object_cache_reader()
6112 .multi_get_objects_by_key(object_keys))
6113 }
6114
6115 async fn multi_get_transactions_perpetual_checkpoints(
6116 &self,
6117 digests: &[TransactionDigest],
6118 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
6119 let res = self
6120 .get_checkpoint_cache()
6121 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
6122
6123 Ok(res
6124 .into_iter()
6125 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6126 .collect())
6127 }
6128
6129 #[instrument(skip(self, digests), fields(digests = digests.iter().map(|d| d.to_string()).collect::<Vec<String>>().join(", ")))]
6130 async fn multi_get_events_by_tx_digests(
6131 &self,
6132 digests: &[TransactionDigest],
6133 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
6134 if digests.is_empty() {
6135 return Ok(vec![]);
6136 }
6137
6138 Ok(self
6139 .get_transaction_cache_reader()
6140 .multi_get_events(digests))
6141 }
6142}
6143
6144#[cfg(msim)]
6145pub mod framework_injection {
6146 use std::{
6147 cell::RefCell,
6148 collections::{BTreeMap, BTreeSet},
6149 };
6150
6151 use iota_framework::{BuiltInFramework, SystemPackage};
6152 use iota_sdk_types::ObjectId;
6153 use iota_types::base_types::AuthorityName;
6154 use move_binary_format::CompiledModule;
6155
6156 type FrameworkOverrideConfig = BTreeMap<ObjectId, PackageOverrideConfig>;
6157
6158 thread_local! {
6160 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6161 }
6162
6163 type Framework = Vec<CompiledModule>;
6164
6165 pub type PackageUpgradeCallback =
6166 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6167
6168 enum PackageOverrideConfig {
6169 Global(Framework),
6170 PerValidator(PackageUpgradeCallback),
6171 }
6172
6173 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6174 modules
6175 .iter()
6176 .map(|m| {
6177 let mut buf = Vec::new();
6178 m.serialize_with_version(m.version, &mut buf).unwrap();
6179 buf
6180 })
6181 .collect()
6182 }
6183
6184 pub fn set_override(package_id: ObjectId, modules: Vec<CompiledModule>) {
6185 OVERRIDE.with(|bs| {
6186 bs.borrow_mut()
6187 .insert(package_id, PackageOverrideConfig::Global(modules))
6188 });
6189 }
6190
6191 pub fn set_override_cb(package_id: ObjectId, func: PackageUpgradeCallback) {
6192 OVERRIDE.with(|bs| {
6193 bs.borrow_mut()
6194 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6195 });
6196 }
6197
6198 pub fn get_override_bytes(package_id: &ObjectId, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6199 OVERRIDE.with(|cfg| {
6200 cfg.borrow().get(package_id).and_then(|entry| match entry {
6201 PackageOverrideConfig::Global(framework) => {
6202 Some(compiled_modules_to_bytes(framework))
6203 }
6204 PackageOverrideConfig::PerValidator(func) => {
6205 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6206 }
6207 })
6208 })
6209 }
6210
6211 pub fn get_override_modules(
6212 package_id: &ObjectId,
6213 name: AuthorityName,
6214 ) -> Option<Vec<CompiledModule>> {
6215 OVERRIDE.with(|cfg| {
6216 cfg.borrow().get(package_id).and_then(|entry| match entry {
6217 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6218 PackageOverrideConfig::PerValidator(func) => func(name),
6219 })
6220 })
6221 }
6222
6223 pub fn get_override_system_package(
6224 package_id: &ObjectId,
6225 name: AuthorityName,
6226 ) -> Option<SystemPackage> {
6227 let bytes = get_override_bytes(package_id, name)?;
6228 let dependencies = if package_id.is_system_package() {
6229 BuiltInFramework::get_package_by_id(package_id)
6230 .dependencies
6231 .to_vec()
6232 } else {
6233 BuiltInFramework::all_package_ids()
6236 };
6237 Some(SystemPackage {
6238 id: *package_id,
6239 bytes,
6240 dependencies,
6241 })
6242 }
6243
6244 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6245 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6246 let extra: Vec<ObjectId> = OVERRIDE.with(|cfg| {
6247 cfg.borrow()
6248 .keys()
6249 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6250 .collect()
6251 });
6252
6253 extra
6254 .into_iter()
6255 .map(|package| SystemPackage {
6256 id: package,
6257 bytes: get_override_bytes(&package, name).unwrap(),
6258 dependencies: BuiltInFramework::all_package_ids(),
6259 })
6260 .collect()
6261 }
6262}
6263
6264#[derive(Debug, Serialize, Deserialize, Clone)]
6265pub struct ObjDumpFormat {
6266 pub id: ObjectId,
6267 pub version: VersionNumber,
6268 pub digest: ObjectDigest,
6269 pub object: Object,
6270}
6271
6272impl ObjDumpFormat {
6273 fn new(object: Object) -> Self {
6274 let oref = object.object_ref();
6275 Self {
6276 id: oref.object_id,
6277 version: oref.version,
6278 digest: oref.digest,
6279 object,
6280 }
6281 }
6282}
6283
6284#[derive(Debug, Serialize, Deserialize, Clone)]
6285pub struct NodeStateDump {
6286 pub tx_digest: TransactionDigest,
6287 pub sender_signed_data: SenderSignedData,
6288 pub executed_epoch: u64,
6289 pub reference_gas_price: u64,
6290 pub protocol_version: u64,
6291 pub epoch_start_timestamp_ms: u64,
6292 pub computed_effects: TransactionEffects,
6293 pub expected_effects_digest: TransactionEffectsDigest,
6294 pub relevant_system_packages: Vec<ObjDumpFormat>,
6295 pub shared_objects: Vec<ObjDumpFormat>,
6296 pub loaded_child_objects: Vec<ObjDumpFormat>,
6297 pub modified_at_versions: Vec<ObjDumpFormat>,
6298 pub runtime_reads: Vec<ObjDumpFormat>,
6299 pub input_objects: Vec<ObjDumpFormat>,
6300}
6301
6302impl NodeStateDump {
6303 pub fn new(
6304 tx_digest: &TransactionDigest,
6305 effects: &TransactionEffects,
6306 expected_effects_digest: TransactionEffectsDigest,
6307 object_store: &dyn ObjectStore,
6308 epoch_store: &Arc<AuthorityPerEpochStore>,
6309 inner_temporary_store: &InnerTemporaryStore,
6310 transaction: &VerifiedExecutableTransaction,
6311 ) -> IotaResult<Self> {
6312 let executed_epoch = epoch_store.epoch();
6314 let reference_gas_price = epoch_store.reference_gas_price();
6315 let epoch_start_config = epoch_store.epoch_start_config();
6316 let protocol_version = epoch_store.protocol_version().as_u64();
6317 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6318
6319 let mut relevant_system_packages = Vec::new();
6321 for sys_package_id in BuiltInFramework::all_package_ids() {
6322 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6323 relevant_system_packages.push(ObjDumpFormat::new(w))
6324 }
6325 }
6326
6327 let mut shared_objects = Vec::new();
6329 for kind in effects.input_shared_objects() {
6330 match kind {
6331 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6332 if let Some(w) =
6333 object_store.try_get_object_by_key(&obj_ref.object_id, obj_ref.version)?
6334 {
6335 shared_objects.push(ObjDumpFormat::new(w))
6336 }
6337 }
6338 InputSharedObject::ReadDeleted(..)
6339 | InputSharedObject::MutateDeleted(..)
6340 | InputSharedObject::Cancelled(..) => (), }
6343 }
6344
6345 let mut loaded_child_objects = Vec::new();
6348 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6349 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6350 loaded_child_objects.push(ObjDumpFormat::new(w))
6351 }
6352 }
6353
6354 let mut modified_at_versions = Vec::new();
6356 for (id, ver) in effects.modified_at_versions() {
6357 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6358 modified_at_versions.push(ObjDumpFormat::new(w))
6359 }
6360 }
6361
6362 let mut runtime_reads = Vec::new();
6366 for obj in inner_temporary_store
6367 .runtime_packages_loaded_from_db
6368 .values()
6369 {
6370 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6371 }
6372
6373 Ok(Self {
6376 tx_digest: *tx_digest,
6377 executed_epoch,
6378 reference_gas_price,
6379 epoch_start_timestamp_ms,
6380 protocol_version,
6381 relevant_system_packages,
6382 shared_objects,
6383 loaded_child_objects,
6384 modified_at_versions,
6385 runtime_reads,
6386 sender_signed_data: transaction.clone().into_message(),
6387 input_objects: inner_temporary_store
6388 .input_objects
6389 .values()
6390 .map(|o| ObjDumpFormat::new(o.clone()))
6391 .collect(),
6392 computed_effects: effects.clone(),
6393 expected_effects_digest,
6394 })
6395 }
6396
6397 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6398 let mut objects = Vec::new();
6399 objects.extend(self.relevant_system_packages.clone());
6400 objects.extend(self.shared_objects.clone());
6401 objects.extend(self.loaded_child_objects.clone());
6402 objects.extend(self.modified_at_versions.clone());
6403 objects.extend(self.runtime_reads.clone());
6404 objects.extend(self.input_objects.clone());
6405 objects
6406 }
6407
6408 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6409 let file_name = format!(
6410 "{}_{}_NODE_DUMP.json",
6411 self.tx_digest,
6412 AuthorityState::unixtime_now_ms()
6413 );
6414 let mut path = path.to_path_buf();
6415 path.push(&file_name);
6416 let mut file = File::create(path.clone())?;
6417 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6418 Ok(path)
6419 }
6420
6421 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6422 let file = File::open(path)?;
6423 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6424 }
6425}
6426
6427fn pre_consensus_move_authenticators<'a>(
6439 tx: &'a VerifiedTransaction,
6440 protocol_config: &ProtocolConfig,
6441) -> Vec<&'a MoveAuthenticator> {
6442 if protocol_config.pre_consensus_sponsor_only_move_authentication() {
6443 if tx.transaction_data().is_sponsored_tx() {
6444 if let Some(sponsor_move_authenticator) = tx.sponsor_move_authenticator() {
6445 vec![sponsor_move_authenticator]
6446 } else {
6447 vec![]
6448 }
6449 } else {
6450 tx.move_authenticators()
6451 }
6452 } else {
6453 tx.move_authenticators()
6454 }
6455}