1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage as FrameworkSystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::{
48 ExecutionStatus, ObjectId, Owner, StructTag, TransactionExpiration, TransactionKind, TypeTag,
49 crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion},
50};
51use iota_storage::{
52 key_value_store::{
53 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
54 },
55 key_value_store_metrics::KeyValueStoreMetrics,
56};
57#[cfg(msim)]
58use iota_types::committee::CommitteeTrait;
59use iota_types::{
60 account_abstraction::{
61 account::AuthenticatorFunctionRefV1Key,
62 authenticator_function::{
63 AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
64 AuthenticatorFunctionRefV1, extract_auth_fun_refs,
65 },
66 },
67 auth_context::AuthContextData,
68 base_types::{
69 AuthorityName, ConciseableName, IotaAddress, ObjectInfo, ObjectRef, ObjectType,
70 SequenceNumber, VersionNumber,
71 },
72 committee::{Committee, EpochId, ProtocolVersion},
73 crypto::{AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer},
74 deny_list_v1::check_coin_deny_list_v1_during_signing,
75 digests::{ChainIdentifier, Digest, ObjectDigest, TransactionDigest, TransactionEffectsDigest},
76 dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
77 effects::{
78 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
79 TransactionEffectsExt, TransactionEvents, VerifiedSignedTransactionEffects,
80 },
81 error::{ExecutionError, IotaError, IotaResult, UserInputError},
82 event::{Event, EventID, SystemEpochInfoEvent},
83 executable_transaction::VerifiedExecutableTransaction,
84 execution_config_utils::to_binary_config,
85 fp_ensure,
86 gas::{GasCostSummary, IotaGasStatus},
87 gas_coin::NANOS_PER_IOTA,
88 inner_temporary_store::{
89 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
90 WrittenObjects,
91 },
92 iota_sdk_types_conversions::type_tag_core_to_sdk,
93 iota_system_state::{
94 IotaSystemState, IotaSystemStateTrait,
95 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
96 },
97 layout_resolver::{LayoutResolver, into_struct_layout},
98 message_envelope::Message,
99 messages_checkpoint::{
100 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
101 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
102 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
103 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
104 },
105 messages_consensus::AuthorityCapabilitiesV1,
106 messages_grpc::{
107 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
108 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
109 TransactionStatus,
110 },
111 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
112 move_authenticator::MoveAuthenticator,
113 object::{
114 MoveObject, MoveObjectExt, OBJECT_START_VERSION, Object, ObjectRead, PastObjectRead,
115 bounded_visitor::BoundedVisitor,
116 },
117 storage::{
118 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
119 },
120 supported_protocol_versions::{
121 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
122 },
123 traffic_control::{PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams},
124 transaction::*,
125 transaction_executor::{SimulateTransactionResult, VmChecks},
126};
127use itertools::Itertools;
128use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
129use move_core_types::{
130 account_address::AccountAddress, annotated_value::MoveStructLayout, language_storage::ModuleId,
131};
132use parking_lot::Mutex;
133use prometheus::{
134 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
135 register_histogram_vec_with_registry, register_histogram_with_registry,
136 register_int_counter_vec_with_registry, register_int_counter_with_registry,
137 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
138};
139use serde::{Deserialize, Serialize, de::DeserializeOwned};
140use tap::TapFallible;
141use tokio::{
142 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
143 task::JoinHandle,
144};
145use tracing::{debug, error, info, instrument, trace, warn};
146use typed_store::TypedStoreError;
147
148use self::{
149 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
150};
151#[cfg(msim)]
152pub use crate::checkpoints::checkpoint_executor::utils::{
153 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
154};
155use crate::{
156 authority::{
157 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
158 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
159 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
160 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
161 authority_store_tables::AuthorityPrunerTables,
162 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
163 },
164 authority_client::NetworkAuthorityClient,
165 checkpoint_progress_tracker::CheckpointProgressTracker,
166 checkpoints::CheckpointStore,
167 congestion_tracker::CongestionTracker,
168 consensus_adapter::ConsensusAdapter,
169 epoch::committee_store::CommitteeStore,
170 execution_cache::{
171 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
172 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
173 TransactionCacheRead,
174 },
175 execution_driver::execution_process,
176 global_state_hasher::{GlobalStateHashStore, GlobalStateHasher},
177 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
178 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
179 metrics::{LatencyObserver, RateTracker},
180 module_cache_metrics::ResolverMetrics,
181 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
182 stake_aggregator::StakeAggregator,
183 subscription_handler::SubscriptionHandler,
184 traffic_controller::{TrafficController, metrics::TrafficControllerMetrics},
185 transaction_input_loader::TransactionInputLoader,
186 transaction_manager::TransactionManager,
187 transaction_outputs::TransactionOutputs,
188 validator_tx_finalizer::ValidatorTxFinalizer,
189 verify_indexes::verify_indexes,
190};
191
192#[cfg(test)]
193#[path = "unit_tests/authority_tests.rs"]
194pub mod authority_tests;
195
196#[cfg(test)]
197#[path = "unit_tests/transaction_tests.rs"]
198pub mod transaction_tests;
199
200#[cfg(test)]
201#[path = "unit_tests/batch_transaction_tests.rs"]
202mod batch_transaction_tests;
203
204#[cfg(test)]
205#[path = "unit_tests/move_integration_tests.rs"]
206pub mod move_integration_tests;
207
208#[cfg(test)]
209#[path = "unit_tests/gas_tests.rs"]
210mod gas_tests;
211
212#[cfg(test)]
213#[path = "unit_tests/batch_verification_tests.rs"]
214mod batch_verification_tests;
215
216#[cfg(test)]
217#[path = "unit_tests/coin_deny_list_tests.rs"]
218mod coin_deny_list_tests;
219
220#[cfg(test)]
221#[path = "unit_tests/auth_unit_test_utils.rs"]
222pub mod auth_unit_test_utils;
223
224pub mod authority_test_utils;
225
226pub mod authority_per_epoch_store;
227pub mod authority_per_epoch_store_pruner;
228
229mod authority_store_migrations;
230pub mod authority_store_pruner;
231pub mod authority_store_tables;
232pub mod authority_store_types;
233pub mod epoch_start_configuration;
234pub mod shared_object_congestion_tracker;
235pub mod shared_object_version_manager;
236pub mod suggested_gas_price_calculator;
237pub mod test_authority_builder;
238pub mod transaction_deferral;
239
240pub(crate) mod authority_store;
241pub mod backpressure;
242
243pub struct AuthorityMetrics {
245 tx_orders: IntCounter,
246 total_certs: IntCounter,
247 total_cert_attempts: IntCounter,
248 total_effects: IntCounter,
249 pub shared_obj_tx: IntCounter,
250 sponsored_tx: IntCounter,
251 tx_already_processed: IntCounter,
252 num_input_objs: Histogram,
253 num_shared_objects: Histogram,
254 batch_size: Histogram,
255
256 authority_state_handle_transaction_latency: Histogram,
257
258 execute_certificate_latency_single_writer: Histogram,
259 execute_certificate_latency_shared_object: Histogram,
260
261 internal_execution_latency: Histogram,
262 execution_load_input_objects_latency: Histogram,
263 prepare_certificate_latency: Histogram,
264 commit_certificate_latency: Histogram,
265 db_checkpoint_latency: Histogram,
266
267 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
268 pub(crate) transaction_manager_num_missing_objects: IntGauge,
269 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
270 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
271 pub(crate) transaction_manager_num_ready: IntGauge,
272 pub(crate) transaction_manager_object_cache_size: IntGauge,
273 pub(crate) transaction_manager_object_cache_hits: IntCounter,
274 pub(crate) transaction_manager_object_cache_misses: IntCounter,
275 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
276 pub(crate) transaction_manager_package_cache_size: IntGauge,
277 pub(crate) transaction_manager_package_cache_hits: IntCounter,
278 pub(crate) transaction_manager_package_cache_misses: IntCounter,
279 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
280 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
281
282 pub(crate) execution_driver_executed_transactions: IntCounter,
283 pub(crate) execution_driver_dispatch_queue: IntGauge,
284 pub(crate) execution_queueing_delay_s: Histogram,
285 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
286 pub(crate) execution_gas_latency_ratio: Histogram,
287
288 pub(crate) skipped_consensus_txns: IntCounter,
289 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
290
291 pub(crate) authority_overload_status: IntGauge,
292 pub(crate) authority_load_shedding_percentage: IntGauge,
293
294 pub(crate) transaction_overload_sources: IntCounterVec,
295
296 post_processing_total_events_emitted: IntCounter,
298 post_processing_total_tx_indexed: IntCounter,
299 post_processing_total_tx_had_event_processed: IntCounter,
300 post_processing_total_failures: IntCounter,
301
302 pub consensus_handler_processed: IntCounterVec,
304 pub consensus_handler_transaction_sizes: HistogramVec,
305 pub consensus_handler_num_low_scoring_authorities: IntGauge,
306 pub consensus_handler_scores: IntGaugeVec,
307 pub consensus_handler_deferred_transactions: IntCounter,
308 pub consensus_handler_congested_transactions: IntCounter,
309 pub consensus_handler_cancelled_transactions: IntCounter,
310 pub consensus_handler_max_object_costs: IntGaugeVec,
311 pub consensus_committed_subdags: IntCounterVec,
312 pub consensus_committed_messages: IntGaugeVec,
313 pub consensus_committed_user_transactions: IntGaugeVec,
314 pub consensus_handler_leader_round: IntGauge,
315 pub consensus_calculated_throughput: IntGauge,
316 pub consensus_calculated_throughput_profile: IntGauge,
317
318 pub validator_scoreboard_scores: IntGaugeVec,
319 pub invalid_misbehavior_reports_by_authority: IntGaugeVec,
320
321 pub limits_metrics: Arc<LimitsMetrics>,
322
323 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
325
326 pub multisig_sig_count: IntCounter,
328
329 pub execution_queueing_latency: LatencyObserver,
332
333 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
340
341 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
344}
345
346const POSITIVE_INT_BUCKETS: &[f64] = &[
348 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
349 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
350 7000000., 10000000.,
351];
352
353const LATENCY_SEC_BUCKETS: &[f64] = &[
354 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
355 10., 20., 30., 60., 90.,
356];
357
358const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
360 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
361 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
362];
363
364const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
365 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
366 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
367];
368
369pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
373 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
374 let execute_certificate_latency = register_histogram_vec_with_registry!(
375 "authority_state_execute_certificate_latency",
376 "Latency of executing certificates, including waiting for inputs",
377 &["tx_type"],
378 LATENCY_SEC_BUCKETS.to_vec(),
379 registry,
380 )
381 .unwrap();
382
383 let execute_certificate_latency_single_writer =
384 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
385 let execute_certificate_latency_shared_object =
386 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
387
388 Self {
389 tx_orders: register_int_counter_with_registry!(
390 "total_transaction_orders",
391 "Total number of transaction orders",
392 registry,
393 )
394 .unwrap(),
395 total_certs: register_int_counter_with_registry!(
396 "total_transaction_certificates",
397 "Total number of transaction certificates handled",
398 registry,
399 )
400 .unwrap(),
401 total_cert_attempts: register_int_counter_with_registry!(
402 "total_handle_certificate_attempts",
403 "Number of calls to handle_certificate",
404 registry,
405 )
406 .unwrap(),
407 total_effects: register_int_counter_with_registry!(
409 "total_transaction_effects",
410 "Total number of transaction effects produced",
411 registry,
412 )
413 .unwrap(),
414
415 shared_obj_tx: register_int_counter_with_registry!(
416 "num_shared_obj_tx",
417 "Number of transactions involving shared objects",
418 registry,
419 )
420 .unwrap(),
421
422 sponsored_tx: register_int_counter_with_registry!(
423 "num_sponsored_tx",
424 "Number of sponsored transactions",
425 registry,
426 )
427 .unwrap(),
428
429 tx_already_processed: register_int_counter_with_registry!(
430 "num_tx_already_processed",
431 "Number of transaction orders already processed previously",
432 registry,
433 )
434 .unwrap(),
435 num_input_objs: register_histogram_with_registry!(
436 "num_input_objects",
437 "Distribution of number of input TX objects per TX",
438 POSITIVE_INT_BUCKETS.to_vec(),
439 registry,
440 )
441 .unwrap(),
442 num_shared_objects: register_histogram_with_registry!(
443 "num_shared_objects",
444 "Number of shared input objects per TX",
445 POSITIVE_INT_BUCKETS.to_vec(),
446 registry,
447 )
448 .unwrap(),
449 batch_size: register_histogram_with_registry!(
450 "batch_size",
451 "Distribution of size of transaction batch",
452 POSITIVE_INT_BUCKETS.to_vec(),
453 registry,
454 )
455 .unwrap(),
456 authority_state_handle_transaction_latency: register_histogram_with_registry!(
457 "authority_state_handle_transaction_latency",
458 "Latency of handling transactions",
459 LATENCY_SEC_BUCKETS.to_vec(),
460 registry,
461 )
462 .unwrap(),
463 execute_certificate_latency_single_writer,
464 execute_certificate_latency_shared_object,
465 internal_execution_latency: register_histogram_with_registry!(
466 "authority_state_internal_execution_latency",
467 "Latency of actual certificate executions",
468 LATENCY_SEC_BUCKETS.to_vec(),
469 registry,
470 )
471 .unwrap(),
472 execution_load_input_objects_latency: register_histogram_with_registry!(
473 "authority_state_execution_load_input_objects_latency",
474 "Latency of loading input objects for execution",
475 LOW_LATENCY_SEC_BUCKETS.to_vec(),
476 registry,
477 )
478 .unwrap(),
479 prepare_certificate_latency: register_histogram_with_registry!(
480 "authority_state_prepare_certificate_latency",
481 "Latency of executing certificates, before committing the results",
482 LATENCY_SEC_BUCKETS.to_vec(),
483 registry,
484 )
485 .unwrap(),
486 commit_certificate_latency: register_histogram_with_registry!(
487 "authority_state_commit_certificate_latency",
488 "Latency of committing certificate execution results",
489 LATENCY_SEC_BUCKETS.to_vec(),
490 registry,
491 )
492 .unwrap(),
493 db_checkpoint_latency: register_histogram_with_registry!(
494 "db_checkpoint_latency",
495 "Latency of checkpointing dbs",
496 LATENCY_SEC_BUCKETS.to_vec(),
497 registry,
498 ).unwrap(),
499 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
500 "transaction_manager_num_enqueued_certificates",
501 "Current number of certificates enqueued to TransactionManager",
502 &["result"],
503 registry,
504 )
505 .unwrap(),
506 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
507 "transaction_manager_num_missing_objects",
508 "Current number of missing objects in TransactionManager",
509 registry,
510 )
511 .unwrap(),
512 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
513 "transaction_manager_num_pending_certificates",
514 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
515 registry,
516 )
517 .unwrap(),
518 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
519 "transaction_manager_num_executing_certificates",
520 "Number of executing certificates, including queued and actually running certificates",
521 registry,
522 )
523 .unwrap(),
524 transaction_manager_num_ready: register_int_gauge_with_registry!(
525 "transaction_manager_num_ready",
526 "Number of ready transactions in TransactionManager",
527 registry,
528 )
529 .unwrap(),
530 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
531 "transaction_manager_object_cache_size",
532 "Current size of object-availability cache in TransactionManager",
533 registry,
534 )
535 .unwrap(),
536 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
537 "transaction_manager_object_cache_hits",
538 "Number of object-availability cache hits in TransactionManager",
539 registry,
540 )
541 .unwrap(),
542 authority_overload_status: register_int_gauge_with_registry!(
543 "authority_overload_status",
544 "Whether authority is current experiencing overload and enters load shedding mode.",
545 registry)
546 .unwrap(),
547 authority_load_shedding_percentage: register_int_gauge_with_registry!(
548 "authority_load_shedding_percentage",
549 "The percentage of transactions is shed when the authority is in load shedding mode.",
550 registry)
551 .unwrap(),
552 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
553 "transaction_manager_object_cache_misses",
554 "Number of object-availability cache misses in TransactionManager",
555 registry,
556 )
557 .unwrap(),
558 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
559 "transaction_manager_object_cache_evictions",
560 "Number of object-availability cache evictions in TransactionManager",
561 registry,
562 )
563 .unwrap(),
564 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
565 "transaction_manager_package_cache_size",
566 "Current size of package-availability cache in TransactionManager",
567 registry,
568 )
569 .unwrap(),
570 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
571 "transaction_manager_package_cache_hits",
572 "Number of package-availability cache hits in TransactionManager",
573 registry,
574 )
575 .unwrap(),
576 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
577 "transaction_manager_package_cache_misses",
578 "Number of package-availability cache misses in TransactionManager",
579 registry,
580 )
581 .unwrap(),
582 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
583 "transaction_manager_package_cache_evictions",
584 "Number of package-availability cache evictions in TransactionManager",
585 registry,
586 )
587 .unwrap(),
588 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
589 "transaction_manager_transaction_queue_age_s",
590 "Time spent in waiting for transaction in the queue",
591 LATENCY_SEC_BUCKETS.to_vec(),
592 registry,
593 )
594 .unwrap(),
595 transaction_overload_sources: register_int_counter_vec_with_registry!(
596 "transaction_overload_sources",
597 "Number of times each source indicates transaction overload.",
598 &["source"],
599 registry)
600 .unwrap(),
601 execution_driver_executed_transactions: register_int_counter_with_registry!(
602 "execution_driver_executed_transactions",
603 "Cumulative number of transaction executed by execution driver",
604 registry,
605 )
606 .unwrap(),
607 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
608 "execution_driver_dispatch_queue",
609 "Number of transaction pending in execution driver dispatch queue",
610 registry,
611 )
612 .unwrap(),
613 execution_queueing_delay_s: register_histogram_with_registry!(
614 "execution_queueing_delay_s",
615 "Queueing delay between a transaction is ready for execution until it starts executing.",
616 LATENCY_SEC_BUCKETS.to_vec(),
617 registry
618 )
619 .unwrap(),
620 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
621 "prepare_cert_gas_latency_ratio",
622 "The ratio of computation gas divided by VM execution latency.",
623 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
624 registry
625 )
626 .unwrap(),
627 execution_gas_latency_ratio: register_histogram_with_registry!(
628 "execution_gas_latency_ratio",
629 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
630 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
631 registry
632 )
633 .unwrap(),
634 skipped_consensus_txns: register_int_counter_with_registry!(
635 "skipped_consensus_txns",
636 "Total number of consensus transactions skipped",
637 registry,
638 )
639 .unwrap(),
640 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
641 "skipped_consensus_txns_cache_hit",
642 "Total number of consensus transactions skipped because of local cache hit",
643 registry,
644 )
645 .unwrap(),
646 post_processing_total_events_emitted: register_int_counter_with_registry!(
647 "post_processing_total_events_emitted",
648 "Total number of events emitted in post processing",
649 registry,
650 )
651 .unwrap(),
652 post_processing_total_tx_indexed: register_int_counter_with_registry!(
653 "post_processing_total_tx_indexed",
654 "Total number of txes indexed in post processing",
655 registry,
656 )
657 .unwrap(),
658 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
659 "post_processing_total_tx_had_event_processed",
660 "Total number of txes finished event processing in post processing",
661 registry,
662 )
663 .unwrap(),
664 post_processing_total_failures: register_int_counter_with_registry!(
665 "post_processing_total_failures",
666 "Total number of failure in post processing",
667 registry,
668 )
669 .unwrap(),
670 consensus_handler_processed: register_int_counter_vec_with_registry!(
671 "consensus_handler_processed",
672 "Number of transactions processed by consensus handler",
673 &["class"],
674 registry
675 ).unwrap(),
676 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
677 "consensus_handler_transaction_sizes",
678 "Sizes of each type of transactions processed by consensus handler",
679 &["class"],
680 POSITIVE_INT_BUCKETS.to_vec(),
681 registry
682 ).unwrap(),
683 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
684 "consensus_handler_num_low_scoring_authorities",
685 "Number of low scoring authorities based on reputation scores from consensus",
686 registry
687 ).unwrap(),
688 consensus_handler_scores: register_int_gauge_vec_with_registry!(
689 "consensus_handler_scores",
690 "scores from consensus for each authority",
691 &["authority"],
692 registry,
693 ).unwrap(),
694 validator_scoreboard_scores: register_int_gauge_vec_with_registry!(
695 "validator_scoreboard_scores",
696 "Per-authority validator scores published by the local Scoreboard after each consensus commit. Range [0, MAX_SCORE].",
697 &["authority"],
698 registry,
699 ).unwrap(),
700 invalid_misbehavior_reports_by_authority: register_int_gauge_vec_with_registry!(
701 "invalid_misbehavior_reports_by_authority",
702 "Cumulative count of invalid misbehavior reports received from each reporting authority in the current epoch. Bumped when a `MisbehaviorReport` consensus transaction fails sender/authority match or payload validation. Snapshot republished after each consensus commit.",
703 &["authority"],
704 registry,
705 ).unwrap(),
706 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
707 "consensus_handler_deferred_transactions",
708 "Number of transactions deferred by consensus handler",
709 registry,
710 ).unwrap(),
711 consensus_handler_congested_transactions: register_int_counter_with_registry!(
712 "consensus_handler_congested_transactions",
713 "Number of transactions deferred by consensus handler due to congestion",
714 registry,
715 ).unwrap(),
716 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
717 "consensus_handler_cancelled_transactions",
718 "Number of transactions cancelled by consensus handler",
719 registry,
720 ).unwrap(),
721 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
722 "consensus_handler_max_congestion_control_object_costs",
723 "Max object costs for congestion control in the current consensus commit",
724 &["commit_type"],
725 registry,
726 ).unwrap(),
727 consensus_committed_subdags: register_int_counter_vec_with_registry!(
728 "consensus_committed_subdags",
729 "Number of committed subdags, sliced by leader",
730 &["authority"],
731 registry,
732 ).unwrap(),
733 consensus_committed_messages: register_int_gauge_vec_with_registry!(
734 "consensus_committed_messages",
735 "Total number of committed consensus messages, sliced by author",
736 &["authority"],
737 registry,
738 ).unwrap(),
739 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
740 "consensus_committed_user_transactions",
741 "Number of committed user transactions, sliced by submitter",
742 &["authority"],
743 registry,
744 ).unwrap(),
745 consensus_handler_leader_round: register_int_gauge_with_registry!(
746 "consensus_handler_leader_round",
747 "The leader round of the current consensus output being processed in the consensus handler",
748 registry,
749 ).unwrap(),
750 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
751 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
752 multisig_sig_count: register_int_counter_with_registry!(
753 "multisig_sig_count",
754 "Count of multisig signatures",
755 registry,
756 )
757 .unwrap(),
758 consensus_calculated_throughput: register_int_gauge_with_registry!(
759 "consensus_calculated_throughput",
760 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
761 registry,
762 ).unwrap(),
763 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
764 "consensus_calculated_throughput_profile",
765 "The current active calculated throughput profile",
766 registry
767 ).unwrap(),
768 execution_queueing_latency: LatencyObserver::new(),
769 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
770 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
771 }
772 }
773
774 pub fn reset_on_reconfigure(&self) {
778 self.consensus_committed_messages.reset();
779 self.consensus_handler_scores.reset();
780 self.validator_scoreboard_scores.reset();
781 self.invalid_misbehavior_reports_by_authority.reset();
782 self.consensus_committed_user_transactions.reset();
783 }
784}
785
786pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
793
794pub struct AuthorityState {
795 pub name: AuthorityName,
798 pub secret: StableSyncAuthoritySigner,
800
801 input_loader: TransactionInputLoader,
803 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
804
805 epoch_store: ArcSwap<AuthorityPerEpochStore>,
806
807 execution_lock: RwLock<EpochId>,
813
814 pub indexes: Option<Arc<IndexStore>>,
815 pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
816
817 pub subscription_handler: Arc<SubscriptionHandler>,
818 pub checkpoint_store: Arc<CheckpointStore>,
819
820 committee_store: Arc<CommitteeStore>,
821
822 transaction_manager: Arc<TransactionManager>,
824
825 #[cfg_attr(not(test), expect(unused))]
827 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
828
829 pub metrics: Arc<AuthorityMetrics>,
830 _pruner: AuthorityStorePruner,
831 authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
832 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
833
834 db_checkpoint_config: DBCheckpointConfig,
836
837 pub config: NodeConfig,
838
839 pub overload_info: AuthorityOverloadInfo,
841
842 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
843
844 chain_identifier: ChainIdentifier,
847
848 pub(crate) congestion_tracker: Arc<CongestionTracker>,
849
850 pub traffic_controller: Option<Arc<TrafficController>>,
852}
853
854impl AuthorityState {
863 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
864 epoch_store.committee().authority_exists(&self.name)
865 }
866
867 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
868 epoch_store
869 .active_validators()
870 .iter()
871 .any(|a| AuthorityName::from(a) == self.name)
872 }
873
874 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
875 !self.is_committee_validator(epoch_store)
876 }
877
878 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
879 &self.committee_store
880 }
881
882 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
883 self.committee_store.clone()
884 }
885
886 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
887 &self.config.authority_overload_config
888 }
889
890 pub fn get_epoch_state_commitments(
891 &self,
892 epoch: EpochId,
893 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
894 self.checkpoint_store.get_epoch_state_commitments(epoch)
895 }
896
897 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
901 async fn handle_transaction_impl(
902 &self,
903 transaction: VerifiedTransaction,
904 epoch_store: &Arc<AuthorityPerEpochStore>,
905 ) -> IotaResult<VerifiedSignedTransaction> {
906 let _execution_lock = self.execution_lock_for_signing()?;
908
909 let protocol_config = epoch_store.protocol_config();
910 let reference_gas_price = epoch_store.reference_gas_price();
911
912 let epoch = epoch_store.epoch();
913
914 let tx_data = transaction.data().transaction_data();
915
916 iota_transaction_checks::deny::check_transaction_for_signing(
920 tx_data,
921 transaction.tx_signatures(),
922 &transaction.input_objects()?,
923 &tx_data.receiving_objects(),
924 &self.config.transaction_deny_config,
925 self.get_backing_package_store().as_ref(),
926 )?;
927
928 let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
933 self.read_objects_for_signing(&transaction, epoch)?;
934
935 let move_authenticators = transaction.move_authenticators();
936
937 let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
943 .check_transaction_inputs_for_signing(
944 protocol_config,
945 reference_gas_price,
946 tx_data,
947 tx_input_objects,
948 &tx_receiving_objects,
949 &move_authenticators,
950 per_authenticator_inputs,
951 )?;
952
953 let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
956 .iter()
957 .map(|i| &i.0)
958 .collect();
959
960 check_coin_deny_list_v1_during_signing(
964 tx_data.sender(),
965 &tx_checked_input_objects,
966 &tx_receiving_objects,
967 &per_authenticator_checked_input_objects,
968 &self.get_object_store(),
969 )?;
970
971 let (kind, signer, gas_data) = tx_data.execution_parts();
972
973 let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
974 extract_auth_fun_refs(signer, gas_data.owner, |address| {
975 move_authenticators
976 .iter()
977 .zip(per_authenticator_checked_inputs.iter())
978 .find(|(move_authenticator, _)| {
979 move_authenticator.address().ok().as_ref() == Some(&address)
980 })
981 .map(|(_, (_, auth_fun_ref))| auth_fun_ref.clone())
982 });
983
984 let pre_consensus_move_authenticators =
989 pre_consensus_move_authenticators(&transaction, protocol_config);
990 let (move_authenticators, per_authenticator_checked_inputs): (Vec<_>, Vec<_>) =
991 move_authenticators
992 .into_iter()
993 .zip(per_authenticator_checked_inputs)
994 .filter(|(a, _)| pre_consensus_move_authenticators.contains(a))
995 .unzip();
996 let per_authenticator_checked_input_objects: Vec<_> = per_authenticator_checked_inputs
997 .iter()
998 .map(|i| &i.0)
999 .collect();
1000
1001 if !move_authenticators.is_empty() {
1004 let aggregated_authenticator_input_objects =
1005 iota_transaction_checks::aggregate_authenticator_input_objects(
1006 &per_authenticator_checked_input_objects,
1007 )?;
1008
1009 debug_assert_eq!(
1010 move_authenticators.len(),
1011 per_authenticator_checked_inputs.len(),
1012 "Move authenticators amount must match the number of checked authenticator inputs"
1013 );
1014
1015 let move_authenticators = move_authenticators
1016 .into_iter()
1017 .zip(per_authenticator_checked_inputs)
1018 .map(
1019 |(
1020 move_authenticator,
1021 (authenticator_checked_input_objects, authenticator_function_ref),
1022 )| {
1023 (
1024 move_authenticator.to_owned(),
1025 authenticator_function_ref,
1026 authenticator_checked_input_objects,
1027 )
1028 },
1029 )
1030 .collect();
1031
1032 let tx_data_bytes =
1037 bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
1038
1039 let (sender_auth_digest, sponsor_auth_digest) =
1040 transaction.data().compute_auth_digests()?;
1041
1042 let auth_context_data = AuthContextData {
1043 transaction_data_bytes: tx_data_bytes,
1044 sender_auth_digest,
1045 sponsor_auth_digest,
1046 sender_authenticator_function_ref,
1047 sponsor_authenticator_function_ref,
1048 };
1049
1050 let validation_result = epoch_store.executor().authenticate_transaction(
1052 self.get_backing_store().as_ref(),
1053 protocol_config,
1054 self.metrics.limits_metrics.clone(),
1055 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1056 epoch_store
1057 .epoch_start_config()
1058 .epoch_data()
1059 .epoch_start_timestamp(),
1060 gas_data,
1061 gas_status,
1062 move_authenticators,
1063 aggregated_authenticator_input_objects,
1064 kind,
1065 signer,
1066 transaction.digest().to_owned(),
1067 auth_context_data,
1068 &mut None,
1069 );
1070
1071 if let Err(validation_error) = validation_result {
1072 return Err(IotaError::MoveAuthenticatorExecutionFailure {
1073 error: validation_error.to_string(),
1074 });
1075 }
1076 }
1077
1078 let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
1079
1080 let signed_transaction =
1081 VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1082
1083 self.get_cache_writer().try_acquire_transaction_locks(
1088 epoch_store,
1089 &owned_objects,
1090 signed_transaction.clone(),
1091 )?;
1092
1093 Ok(signed_transaction)
1094 }
1095
1096 #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()))]
1098 pub async fn handle_transaction(
1099 &self,
1100 epoch_store: &Arc<AuthorityPerEpochStore>,
1101 transaction: VerifiedTransaction,
1102 ) -> IotaResult<HandleTransactionResponse> {
1103 let tx_digest = *transaction.digest();
1104 debug!("handle_transaction");
1105
1106 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1108 return Ok(HandleTransactionResponse { status });
1109 }
1110
1111 let _metrics_guard = self
1112 .metrics
1113 .authority_state_handle_transaction_latency
1114 .start_timer();
1115 self.metrics.tx_orders.inc();
1116
1117 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1118 match signed {
1119 Ok(s) => {
1120 if self.is_committee_validator(epoch_store) {
1121 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1122 let tx = s.clone();
1123 let validator_tx_finalizer = validator_tx_finalizer.clone();
1124 let cache_reader = self.get_transaction_cache_reader().clone();
1125 let epoch_store = epoch_store.clone();
1126 spawn_monitored_task!(epoch_store.within_alive_epoch(
1127 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1128 ));
1129 }
1130 }
1131 Ok(HandleTransactionResponse {
1132 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1133 })
1134 }
1135 Err(err) => Ok(HandleTransactionResponse {
1139 status: self
1140 .get_transaction_status(&tx_digest, epoch_store)?
1141 .ok_or(err)?
1142 .1,
1143 }),
1144 }
1145 }
1146
1147 pub fn check_system_overload_at_signing(&self) -> bool {
1148 self.config
1149 .authority_overload_config
1150 .check_system_overload_at_signing
1151 }
1152
1153 pub fn check_system_overload_at_execution(&self) -> bool {
1154 self.config
1155 .authority_overload_config
1156 .check_system_overload_at_execution
1157 }
1158
1159 pub(crate) fn check_system_overload(
1160 &self,
1161 consensus_adapter: &Arc<ConsensusAdapter>,
1162 tx_data: &SenderSignedData,
1163 do_authority_overload_check: bool,
1164 ) -> IotaResult {
1165 if do_authority_overload_check {
1166 self.check_authority_overload(tx_data).tap_err(|_| {
1167 self.update_overload_metrics("execution_queue");
1168 })?;
1169 }
1170 self.transaction_manager
1171 .check_execution_overload(self.overload_config(), tx_data)
1172 .tap_err(|_| {
1173 self.update_overload_metrics("execution_pending");
1174 })?;
1175 consensus_adapter.check_consensus_overload().tap_err(|_| {
1176 self.update_overload_metrics("consensus");
1177 })?;
1178
1179 let pending_tx_count = self
1180 .get_cache_commit()
1181 .approximate_pending_transaction_count();
1182 if pending_tx_count
1183 > self
1184 .config
1185 .execution_cache_config
1186 .writeback_cache
1187 .backpressure_threshold_for_rpc()
1188 {
1189 return Err(IotaError::ValidatorOverloadedRetryAfter {
1190 retry_after_secs: 10,
1191 });
1192 }
1193
1194 Ok(())
1195 }
1196
1197 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1198 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1199 return Ok(());
1200 }
1201
1202 let load_shedding_percentage = self
1203 .overload_info
1204 .load_shedding_percentage
1205 .load(Ordering::Relaxed);
1206 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1207 }
1208
1209 fn update_overload_metrics(&self, source: &str) {
1210 self.metrics
1211 .transaction_overload_sources
1212 .with_label_values(&[source])
1213 .inc();
1214 }
1215
1216 #[instrument(level = "trace", skip_all)]
1218 pub async fn execute_certificate(
1219 &self,
1220 certificate: &VerifiedCertificate,
1221 epoch_store: &Arc<AuthorityPerEpochStore>,
1222 ) -> IotaResult<TransactionEffects> {
1223 let _metrics_guard = if certificate.contains_shared_object() {
1224 self.metrics
1225 .execute_certificate_latency_shared_object
1226 .start_timer()
1227 } else {
1228 self.metrics
1229 .execute_certificate_latency_single_writer
1230 .start_timer()
1231 };
1232 trace!("execute_certificate");
1233
1234 self.metrics.total_cert_attempts.inc();
1235
1236 if !certificate.contains_shared_object() {
1237 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1242 }
1243
1244 epoch_store
1247 .within_alive_epoch(self.notify_read_effects(certificate))
1248 .await
1249 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1250 .and_then(|r| r)
1251 }
1252
1253 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1268 pub fn try_execute_immediately(
1269 &self,
1270 certificate: &VerifiedExecutableTransaction,
1271 expected_effects_digest: Option<TransactionEffectsDigest>,
1272 epoch_store: &Arc<AuthorityPerEpochStore>,
1273 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1274 let _scope = monitored_scope("Execution::try_execute_immediately");
1275 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1276
1277 let tx_digest = certificate.digest();
1278
1279 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1281
1282 if let Some(effects) = self
1285 .get_transaction_cache_reader()
1286 .try_get_executed_effects(tx_digest)?
1287 {
1288 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1289 assert_eq!(
1290 effects.digest(),
1291 expected_effects_digest_inner,
1292 "Unexpected effects digest for transaction {tx_digest}"
1293 );
1294 }
1295 tx_guard.release();
1296 return Ok((effects, None));
1297 }
1298
1299 let (tx_input_objects, per_authenticator_inputs) =
1300 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1301
1302 let expected_effects_digest =
1308 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1309
1310 self.process_certificate(
1311 tx_guard,
1312 certificate,
1313 tx_input_objects,
1314 per_authenticator_inputs,
1315 expected_effects_digest,
1316 epoch_store,
1317 )
1318 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1319 .tap_ok(
1320 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1321 )
1322 }
1323
1324 pub fn read_objects_for_execution(
1325 &self,
1326 tx_lock: &CertLockGuard,
1327 certificate: &VerifiedExecutableTransaction,
1328 epoch_store: &Arc<AuthorityPerEpochStore>,
1329 ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1330 let _scope = monitored_scope("Execution::load_input_objects");
1331 let _metrics_guard = self
1332 .metrics
1333 .execution_load_input_objects_latency
1334 .start_timer();
1335
1336 let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1337
1338 let input_objects = self.input_loader.read_objects_for_execution(
1339 epoch_store,
1340 &certificate.key(),
1341 tx_lock,
1342 &input_objects,
1343 epoch_store.epoch(),
1344 )?;
1345
1346 certificate.split_input_objects_into_groups_for_reading(input_objects)
1347 }
1348
1349 pub fn try_execute_for_test(
1353 &self,
1354 certificate: &VerifiedCertificate,
1355 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1356 let epoch_store = self.epoch_store_for_testing();
1357 let (effects, execution_error_opt) = self.try_execute_immediately(
1358 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1359 None,
1360 &epoch_store,
1361 )?;
1362 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1363 Ok((signed_effects, execution_error_opt))
1364 }
1365
1366 pub fn execute_for_test(
1368 &self,
1369 certificate: &VerifiedCertificate,
1370 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1371 self.try_execute_for_test(certificate)
1372 .expect("try_execute_for_test should not fail")
1373 }
1374
1375 pub async fn notify_read_effects(
1376 &self,
1377 certificate: &VerifiedCertificate,
1378 ) -> IotaResult<TransactionEffects> {
1379 self.get_transaction_cache_reader()
1380 .try_notify_read_executed_effects(&[*certificate.digest()])
1381 .await
1382 .map(|mut r| r.pop().expect("must return correct number of effects"))
1383 }
1384
1385 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1386 self.get_object_cache_reader()
1387 .try_check_owned_objects_are_live(owned_object_refs)
1388 }
1389
1390 pub(crate) fn debug_dump_transaction_state(
1395 &self,
1396 tx_digest: &TransactionDigest,
1397 effects: &TransactionEffects,
1398 expected_effects_digest: TransactionEffectsDigest,
1399 inner_temporary_store: &InnerTemporaryStore,
1400 certificate: &VerifiedExecutableTransaction,
1401 debug_dump_config: &StateDebugDumpConfig,
1402 ) -> IotaResult<PathBuf> {
1403 let dump_dir = debug_dump_config
1406 .dump_file_directory
1407 .as_ref()
1408 .cloned()
1409 .unwrap_or(std::env::temp_dir());
1410 let epoch_store = self.load_epoch_store_one_call_per_task();
1411
1412 NodeStateDump::new(
1413 tx_digest,
1414 effects,
1415 expected_effects_digest,
1416 self.get_object_store().as_ref(),
1417 &epoch_store,
1418 inner_temporary_store,
1419 certificate,
1420 )?
1421 .write_to_file(&dump_dir)
1422 .map_err(|e| IotaError::FileIO(e.to_string()))
1423 }
1424
1425 #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?certificate.digest(), sender = ?certificate.data().transaction_data().gas_owner().to_string()))]
1426 pub(crate) fn process_certificate(
1427 &self,
1428 tx_guard: CertTxGuard,
1429 certificate: &VerifiedExecutableTransaction,
1430 tx_input_objects: InputObjects,
1431 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1432 expected_effects_digest: Option<TransactionEffectsDigest>,
1433 epoch_store: &Arc<AuthorityPerEpochStore>,
1434 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1435 let process_certificate_start_time = tokio::time::Instant::now();
1436 let digest = *certificate.digest();
1437
1438 let _scope = monitored_scope("Execution::process_certificate");
1439
1440 fail_point_if!("correlated-crash-process-certificate", || {
1441 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1442 iota_simulator::task::kill_current_node(None);
1443 }
1444 });
1445
1446 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1447 let execution_guard = match execution_guard {
1453 Ok(execution_guard) => execution_guard,
1454 Err(err) => {
1455 tx_guard.release();
1456 return Err(err);
1457 }
1458 };
1459 if *execution_guard != epoch_store.epoch() {
1463 tx_guard.release();
1464 info!("The epoch of the execution_guard doesn't match the epoch store");
1465 return Err(IotaError::WrongEpoch {
1466 expected_epoch: epoch_store.epoch(),
1467 actual_epoch: *execution_guard,
1468 });
1469 }
1470
1471 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1477 &execution_guard,
1478 certificate,
1479 tx_input_objects,
1480 per_authenticator_inputs,
1481 epoch_store,
1482 ) {
1483 Err(e) => {
1484 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1485 tx_guard.release();
1486 return Err(e);
1487 }
1488 Ok(res) => res,
1489 };
1490
1491 if let Some(expected_effects_digest) = expected_effects_digest {
1492 if effects.digest() != expected_effects_digest {
1493 match self.debug_dump_transaction_state(
1495 &digest,
1496 &effects,
1497 expected_effects_digest,
1498 &inner_temporary_store,
1499 certificate,
1500 &self.config.state_debug_dump_config,
1501 ) {
1502 Ok(out_path) => {
1503 info!(
1504 "Dumped node state for transaction {} to {}",
1505 digest,
1506 out_path.as_path().display().to_string()
1507 );
1508 }
1509 Err(e) => {
1510 error!("Error dumping state for transaction {}: {e}", digest);
1511 }
1512 }
1513 error!(
1514 tx_digest = ?digest,
1515 ?expected_effects_digest,
1516 actual_effects = ?effects,
1517 "fork detected!"
1518 );
1519 panic!(
1520 "Transaction {} is expected to have effects digest {}, but got {}!",
1521 digest,
1522 expected_effects_digest,
1523 effects.digest(),
1524 );
1525 }
1526 }
1527
1528 fail_point!("crash");
1529
1530 self.commit_certificate(
1531 certificate,
1532 inner_temporary_store,
1533 &effects,
1534 tx_guard,
1535 execution_guard,
1536 epoch_store,
1537 )?;
1538
1539 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1540 if elapsed > 0.0 {
1541 self.metrics
1542 .execution_gas_latency_ratio
1543 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1544 };
1545 Ok((effects, execution_error_opt))
1546 }
1547
1548 pub async fn reconfigure_traffic_control(
1549 &self,
1550 params: TrafficControlReconfigParams,
1551 ) -> Result<TrafficControlReconfigParams, IotaError> {
1552 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1553 traffic_controller.admin_reconfigure(params).await
1554 } else {
1555 Err(IotaError::InvalidAdminRequest(
1556 "Traffic controller is not configured on this node".to_string(),
1557 ))
1558 }
1559 }
1560
1561 #[instrument(level = "trace", skip_all)]
1562 fn commit_certificate(
1563 &self,
1564 certificate: &VerifiedExecutableTransaction,
1565 inner_temporary_store: InnerTemporaryStore,
1566 effects: &TransactionEffects,
1567 tx_guard: CertTxGuard,
1568 _execution_guard: ExecutionLockReadGuard<'_>,
1569 epoch_store: &Arc<AuthorityPerEpochStore>,
1570 ) -> IotaResult {
1571 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1572 monitored_scope("Execution::commit_certificate");
1573 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1574
1575 let tx_key = certificate.key();
1576 let tx_digest = certificate.digest();
1577 let input_object_count = inner_temporary_store.input_objects.len();
1578 let shared_object_count = effects.input_shared_objects().len();
1579
1580 let output_keys = inner_temporary_store.get_output_keys(effects);
1581
1582 let _ = self
1584 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1585 .tap_err(|e| {
1586 self.metrics.post_processing_total_failures.inc();
1587 error!(?tx_digest, "tx post processing failed: {e}");
1588 });
1589
1590 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1594
1595 fail_point!("crash");
1597
1598 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1599 certificate.clone().into_unsigned(),
1600 effects.clone(),
1601 inner_temporary_store,
1602 );
1603 self.get_cache_writer()
1604 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1605
1606 if certificate.transaction_data().is_end_of_epoch_tx() {
1607 self.get_object_cache_reader()
1610 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1611 }
1612
1613 tx_guard.commit_tx();
1615
1616 self.transaction_manager
1620 .notify_commit(tx_digest, output_keys, epoch_store);
1621
1622 self.update_metrics(certificate, input_object_count, shared_object_count);
1623
1624 Ok(())
1625 }
1626
1627 fn update_metrics(
1628 &self,
1629 certificate: &VerifiedExecutableTransaction,
1630 input_object_count: usize,
1631 shared_object_count: usize,
1632 ) {
1633 if certificate.has_upgraded_multisig() {
1635 self.metrics.multisig_sig_count.inc();
1636 }
1637
1638 self.metrics.total_effects.inc();
1639 self.metrics.total_certs.inc();
1640
1641 if shared_object_count > 0 {
1642 self.metrics.shared_obj_tx.inc();
1643 }
1644
1645 if certificate.is_sponsored_tx() {
1646 self.metrics.sponsored_tx.inc();
1647 }
1648
1649 self.metrics
1650 .num_input_objs
1651 .observe(input_object_count as f64);
1652 self.metrics
1653 .num_shared_objects
1654 .observe(shared_object_count as f64);
1655 self.metrics.batch_size.observe(
1656 certificate
1657 .data()
1658 .intent_message()
1659 .value
1660 .kind()
1661 .num_commands() as f64,
1662 );
1663 }
1664
1665 #[instrument(level = "trace", skip_all)]
1677 fn prepare_certificate(
1678 &self,
1679 _execution_guard: &ExecutionLockReadGuard<'_>,
1680 certificate: &VerifiedExecutableTransaction,
1681 tx_input_objects: InputObjects,
1682 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1683 epoch_store: &Arc<AuthorityPerEpochStore>,
1684 ) -> IotaResult<(
1685 InnerTemporaryStore,
1686 TransactionEffects,
1687 Option<ExecutionError>,
1688 )> {
1689 let _scope = monitored_scope("Execution::prepare_certificate");
1690 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1691 let prepare_certificate_start_time = tokio::time::Instant::now();
1692
1693 let protocol_config = epoch_store.protocol_config();
1694
1695 let reference_gas_price = epoch_store.reference_gas_price();
1696
1697 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1698 let epoch_start_timestamp = epoch_store
1699 .epoch_start_config()
1700 .epoch_data()
1701 .epoch_start_timestamp();
1702
1703 let backing_store = self.get_backing_store().as_ref();
1704
1705 let tx_digest = *certificate.digest();
1706
1707 let tx_data = certificate.data().transaction_data();
1710 tx_data.validity_check(protocol_config)?;
1711
1712 let (kind, signer, gas_data) = tx_data.execution_parts();
1713
1714 let move_authenticators = certificate.move_authenticators();
1715
1716 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1717 let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1718 .is_empty()
1719 {
1720 let (tx_gas_status, tx_checked_input_objects) =
1725 iota_transaction_checks::check_certificate_input(
1726 certificate,
1727 tx_input_objects,
1728 protocol_config,
1729 reference_gas_price,
1730 )?;
1731
1732 let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1733 self.check_owned_locks(&owned_object_refs)?;
1734 epoch_store.executor().execute_transaction_to_effects(
1735 backing_store,
1736 protocol_config,
1737 self.metrics.limits_metrics.clone(),
1738 self.config
1741 .expensive_safety_check_config
1742 .enable_deep_per_tx_iota_conservation_check(),
1743 self.config.certificate_deny_config.certificate_deny_set(),
1744 &epoch_id,
1745 epoch_start_timestamp,
1746 tx_checked_input_objects,
1747 gas_data,
1748 tx_gas_status,
1749 kind,
1750 signer,
1751 tx_digest,
1752 &mut None,
1753 )
1754 } else {
1755 debug_assert_eq!(
1761 move_authenticators.len(),
1762 per_authenticator_inputs.len(),
1763 "Move authenticators amount must match the number of authenticator inputs"
1764 );
1765
1766 let per_authenticator_inputs = move_authenticators
1767 .iter()
1768 .zip(per_authenticator_inputs)
1769 .map(
1770 |(move_authenticator, (authenticator_input_objects, account_object))| {
1771 let (
1774 auth_account_object_id,
1775 auth_account_object_seq_number,
1776 auth_account_object_digest,
1777 ) = move_authenticator.object_to_authenticate_components()?;
1778
1779 let signer = move_authenticator.address()?;
1780
1781 let authenticator_function_ref_for_execution = self.check_move_account(
1782 auth_account_object_id,
1783 auth_account_object_seq_number,
1784 auth_account_object_digest,
1785 account_object,
1786 &signer,
1787 )?;
1788
1789 Ok((
1790 authenticator_input_objects,
1791 authenticator_function_ref_for_execution,
1792 ))
1793 },
1794 )
1795 .collect::<IotaResult<Vec<_>>>()?;
1796
1797 let per_authenticator_input_objects = per_authenticator_inputs
1798 .iter()
1799 .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1800 .collect::<Vec<_>>();
1801
1802 let tx_data_bytes =
1804 bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1805
1806 let (sender_auth_digest, sponsor_auth_digest) =
1807 certificate.data().compute_auth_digests()?;
1808
1809 let authenticator_gas_budget = protocol_config.max_auth_gas();
1814 let (
1815 gas_status,
1816 per_authenticator_checked_input_objects,
1817 authenticator_and_tx_checked_input_objects,
1818 ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1819 certificate,
1820 tx_input_objects,
1821 per_authenticator_input_objects,
1822 authenticator_gas_budget,
1823 protocol_config,
1824 reference_gas_price,
1825 )?;
1826
1827 debug_assert_eq!(
1828 move_authenticators.len(),
1829 per_authenticator_checked_input_objects.len(),
1830 "Move authenticators amount must match the number of checked authenticator inputs"
1831 );
1832
1833 let move_authenticators = move_authenticators
1834 .into_iter()
1835 .zip(per_authenticator_inputs)
1836 .zip(per_authenticator_checked_input_objects)
1837 .map(
1838 |(
1839 (move_authenticator, (_, authenticator_function_ref_for_execution)),
1840 authenticator_checked_input_objects,
1841 )| {
1842 (
1843 move_authenticator.to_owned(),
1844 authenticator_function_ref_for_execution,
1845 authenticator_checked_input_objects,
1846 )
1847 },
1848 )
1849 .collect::<Vec<_>>();
1850
1851 let owned_object_refs = authenticator_and_tx_checked_input_objects
1852 .inner()
1853 .filter_owned_objects();
1854 self.check_owned_locks(&owned_object_refs)?;
1855
1856 let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
1857 extract_auth_fun_refs(signer, gas_data.owner, |address| {
1858 move_authenticators
1859 .iter()
1860 .find(|t| t.0.address().ok().as_ref() == Some(&address))
1861 .map(|t| t.1.authenticator_function_ref.clone())
1862 });
1863
1864 let auth_context_data = AuthContextData {
1865 transaction_data_bytes: tx_data_bytes,
1866 sender_auth_digest,
1867 sponsor_auth_digest,
1868 sender_authenticator_function_ref,
1869 sponsor_authenticator_function_ref,
1870 };
1871
1872 epoch_store
1873 .executor()
1874 .authenticate_then_execute_transaction_to_effects(
1875 backing_store,
1876 protocol_config,
1877 self.metrics.limits_metrics.clone(),
1878 self.config
1879 .expensive_safety_check_config
1880 .enable_deep_per_tx_iota_conservation_check(),
1881 self.config.certificate_deny_config.certificate_deny_set(),
1882 &epoch_id,
1883 epoch_start_timestamp,
1884 gas_data,
1885 gas_status,
1886 move_authenticators,
1887 authenticator_and_tx_checked_input_objects,
1888 kind,
1889 signer,
1890 tx_digest,
1891 auth_context_data,
1892 &mut None,
1893 )
1894 };
1895
1896 fail_point_if!("cp_execution_nondeterminism", || {
1897 #[cfg(msim)]
1898 self.create_fail_state(certificate, epoch_store, &mut effects);
1899 });
1900
1901 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1902 if elapsed > 0.0 {
1903 self.metrics
1904 .prepare_cert_gas_latency_ratio
1905 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1906 }
1907
1908 Ok((inner_temp_store, effects, execution_error_opt.err()))
1909 }
1910
1911 pub fn prepare_certificate_for_benchmark(
1912 &self,
1913 certificate: &VerifiedExecutableTransaction,
1914 input_objects: InputObjects,
1915 epoch_store: &Arc<AuthorityPerEpochStore>,
1916 ) -> IotaResult<(
1917 InnerTemporaryStore,
1918 TransactionEffects,
1919 Option<ExecutionError>,
1920 )> {
1921 let lock = RwLock::new(epoch_store.epoch());
1922 let execution_guard = lock.try_read().unwrap();
1923
1924 self.prepare_certificate(
1925 &execution_guard,
1926 certificate,
1927 input_objects,
1928 vec![],
1929 epoch_store,
1930 )
1931 }
1932
1933 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1936 #[allow(clippy::type_complexity)]
1937 pub fn dry_exec_transaction(
1938 &self,
1939 transaction: TransactionData,
1940 transaction_digest: TransactionDigest,
1941 ) -> IotaResult<(
1942 DryRunTransactionBlockResponse,
1943 BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1944 TransactionEffects,
1945 Option<ObjectId>,
1946 )> {
1947 let epoch_store = self.load_epoch_store_one_call_per_task();
1948 if !self.is_fullnode(&epoch_store) {
1949 return Err(IotaError::UnsupportedFeature {
1950 error: "dry-exec is only supported on fullnodes".to_string(),
1951 });
1952 }
1953
1954 if transaction.kind().is_system() {
1955 return Err(IotaError::UnsupportedFeature {
1956 error: "dry-exec does not support system transactions".to_string(),
1957 });
1958 }
1959
1960 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1961 }
1962
1963 #[allow(clippy::type_complexity)]
1964 pub fn dry_exec_transaction_for_benchmark(
1965 &self,
1966 transaction: TransactionData,
1967 transaction_digest: TransactionDigest,
1968 ) -> IotaResult<(
1969 DryRunTransactionBlockResponse,
1970 BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1971 TransactionEffects,
1972 Option<ObjectId>,
1973 )> {
1974 let epoch_store = self.load_epoch_store_one_call_per_task();
1975 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1976 }
1977
1978 #[instrument(level = "trace", skip_all)]
1979 #[allow(clippy::type_complexity)]
1980 fn dry_exec_transaction_impl(
1981 &self,
1982 epoch_store: &AuthorityPerEpochStore,
1983 transaction: TransactionData,
1984 transaction_digest: TransactionDigest,
1985 ) -> IotaResult<(
1986 DryRunTransactionBlockResponse,
1987 BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1988 TransactionEffects,
1989 Option<ObjectId>,
1990 )> {
1991 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1993
1994 let input_object_kinds = transaction.input_objects()?;
1995 let receiving_object_refs = transaction.receiving_objects();
1996
1997 iota_transaction_checks::deny::check_transaction_for_signing(
1998 &transaction,
1999 &[],
2000 &input_object_kinds,
2001 &receiving_object_refs,
2002 &self.config.transaction_deny_config,
2003 self.get_backing_package_store().as_ref(),
2004 )?;
2005
2006 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2007 None,
2009 &input_object_kinds,
2010 &receiving_object_refs,
2011 epoch_store.epoch(),
2012 )?;
2013
2014 let mut transaction = transaction;
2016 let reference_gas_price = epoch_store.reference_gas_price();
2017 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2018 let sender = transaction.gas_owner();
2019 let gas_object_id = ObjectId::random();
2020 let gas_object = Object::new_move(
2021 MoveObject::new_gas_coin(
2022 OBJECT_START_VERSION,
2023 gas_object_id,
2024 SIMULATION_GAS_COIN_VALUE,
2025 ),
2026 Owner::Address(sender),
2027 TransactionDigest::GENESIS_MARKER,
2028 );
2029 let gas_object_ref = gas_object.object_ref();
2030 transaction.gas_data_mut().objects = vec![gas_object_ref];
2032 (
2033 iota_transaction_checks::check_transaction_input_with_given_gas(
2034 epoch_store.protocol_config(),
2035 reference_gas_price,
2036 &transaction,
2037 input_objects,
2038 receiving_objects,
2039 gas_object,
2040 &self.metrics.bytecode_verifier_metrics,
2041 &self.config.verifier_signing_config,
2042 )?,
2043 Some(gas_object_id),
2044 )
2045 } else {
2046 let authenticator_gas_budget = 0;
2049
2050 (
2051 iota_transaction_checks::check_transaction_input(
2052 epoch_store.protocol_config(),
2053 reference_gas_price,
2054 &transaction,
2055 input_objects,
2056 &receiving_objects,
2057 &self.metrics.bytecode_verifier_metrics,
2058 &self.config.verifier_signing_config,
2059 authenticator_gas_budget,
2060 )?,
2061 None,
2062 )
2063 };
2064
2065 let protocol_config = epoch_store.protocol_config();
2066 let (kind, signer, gas_data) = transaction.execution_parts();
2067
2068 let silent = true;
2069 let executor = iota_execution::executor(protocol_config, silent, None)
2070 .expect("Creating an executor should not fail here");
2071
2072 let expensive_checks = false;
2073 let (inner_temp_store, _, effects, execution_error) = executor
2074 .execute_transaction_to_effects(
2075 self.get_backing_store().as_ref(),
2076 protocol_config,
2077 self.metrics.limits_metrics.clone(),
2078 expensive_checks,
2079 self.config.certificate_deny_config.certificate_deny_set(),
2080 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2081 epoch_store
2082 .epoch_start_config()
2083 .epoch_data()
2084 .epoch_start_timestamp(),
2085 checked_input_objects,
2086 gas_data,
2087 gas_status,
2088 kind,
2089 signer,
2090 transaction_digest,
2091 &mut None,
2092 );
2093 let tx_digest = *effects.transaction_digest();
2094
2095 let module_cache =
2096 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2097
2098 let mut layout_resolver =
2099 epoch_store
2100 .executor()
2101 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2102 &inner_temp_store,
2103 self.get_backing_package_store(),
2104 )));
2105 let object_changes = Vec::new();
2107
2108 let balance_changes = Vec::new();
2110
2111 let written_with_kind = effects
2112 .created()
2113 .into_iter()
2114 .map(|(oref, _)| (oref, WriteKind::Create))
2115 .chain(
2116 effects
2117 .unwrapped()
2118 .into_iter()
2119 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2120 )
2121 .chain(
2122 effects
2123 .mutated()
2124 .into_iter()
2125 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2126 )
2127 .map(|(oref, kind)| {
2128 let obj = inner_temp_store.written.get(&oref.object_id).unwrap();
2129 (oref.object_id, (oref, obj.clone(), kind))
2131 })
2132 .collect();
2133
2134 let execution_error_source = execution_error
2135 .as_ref()
2136 .err()
2137 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2138
2139 Ok((
2140 DryRunTransactionBlockResponse {
2141 suggested_gas_price: self
2143 .congestion_tracker
2144 .get_prediction_suggested_gas_price(&transaction),
2145 input: IotaTransactionBlockData::try_from_with_module_cache(
2146 transaction,
2147 &module_cache,
2148 tx_digest,
2149 )
2150 .map_err(|e| IotaError::TransactionSerialization {
2151 error: format!(
2152 "Failed to convert transaction to IotaTransactionBlockData: {e}",
2153 ),
2154 })?, effects: effects.clone().try_into()?,
2156 events: IotaTransactionBlockEvents::try_from(
2157 inner_temp_store.events.clone(),
2158 tx_digest,
2159 None,
2160 layout_resolver.as_mut(),
2161 )?,
2162 object_changes,
2163 balance_changes,
2164 execution_error_source,
2165 },
2166 written_with_kind,
2167 effects,
2168 mock_gas,
2169 ))
2170 }
2171
2172 pub fn simulate_transaction(
2173 &self,
2174 mut transaction: TransactionData,
2175 checks: VmChecks,
2176 ) -> IotaResult<SimulateTransactionResult> {
2177 if transaction.kind().is_system() {
2178 return Err(IotaError::UnsupportedFeature {
2179 error: "simulate does not support system transactions".to_string(),
2180 });
2181 }
2182
2183 let epoch_store = self.load_epoch_store_one_call_per_task();
2184 if !self.is_fullnode(&epoch_store) {
2185 return Err(IotaError::UnsupportedFeature {
2186 error: "simulate is only supported on fullnodes".to_string(),
2187 });
2188 }
2189
2190 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2194
2195 let input_object_kinds = transaction.input_objects()?;
2196 let receiving_object_refs = transaction.receiving_objects();
2197
2198 iota_transaction_checks::deny::check_transaction_for_signing(
2201 &transaction,
2202 &[],
2203 &input_object_kinds,
2204 &receiving_object_refs,
2205 &self.config.transaction_deny_config,
2206 self.get_backing_package_store().as_ref(),
2207 )?;
2208
2209 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2211 None,
2213 &input_object_kinds,
2214 &receiving_object_refs,
2215 epoch_store.epoch(),
2216 )?;
2217
2218 let mock_gas_id = if transaction.gas().is_empty() {
2220 let mock_gas_object = Object::new_move(
2221 MoveObject::new_gas_coin(
2222 OBJECT_START_VERSION,
2223 ObjectId::MAX,
2224 SIMULATION_GAS_COIN_VALUE,
2225 ),
2226 Owner::Address(transaction.gas_data().owner),
2227 TransactionDigest::GENESIS_MARKER,
2228 );
2229 let mock_gas_object_ref = mock_gas_object.object_ref();
2230 transaction.gas_data_mut().objects = vec![mock_gas_object_ref];
2231 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2232 Some(mock_gas_object.id())
2233 } else {
2234 None
2235 };
2236
2237 let protocol_config = epoch_store.protocol_config();
2238
2239 let authenticator_gas_budget = 0;
2242
2243 let (gas_status, checked_input_objects) = if checks.enabled() {
2246 iota_transaction_checks::check_transaction_input(
2247 protocol_config,
2248 epoch_store.reference_gas_price(),
2249 &transaction,
2250 input_objects,
2251 &receiving_objects,
2252 &self.metrics.bytecode_verifier_metrics,
2253 &self.config.verifier_signing_config,
2254 authenticator_gas_budget,
2255 )?
2256 } else {
2257 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2258 protocol_config,
2259 transaction.kind(),
2260 input_objects,
2261 receiving_objects,
2262 )?;
2263 let gas_status = IotaGasStatus::new(
2264 transaction.gas_budget(),
2265 transaction.gas_price(),
2266 epoch_store.reference_gas_price(),
2267 protocol_config,
2268 )?;
2269
2270 (gas_status, checked_input_objects)
2271 };
2272
2273 let executor = iota_execution::executor(
2275 protocol_config,
2276 true, None,
2278 )
2279 .expect("Creating an executor should not fail here");
2280
2281 let (kind, signer, gas_data) = transaction.execution_parts();
2283 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2284 self.get_backing_store().as_ref(),
2285 protocol_config,
2286 self.metrics.limits_metrics.clone(),
2287 false, self.config.certificate_deny_config.certificate_deny_set(),
2289 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2290 epoch_store
2291 .epoch_start_config()
2292 .epoch_data()
2293 .epoch_start_timestamp(),
2294 checked_input_objects,
2295 gas_data,
2296 gas_status,
2297 kind,
2298 signer,
2299 transaction.digest(),
2300 checks.disabled(),
2301 );
2302
2303 Ok(SimulateTransactionResult {
2306 input_objects: inner_temp_store.input_objects,
2307 output_objects: inner_temp_store.written,
2308 events: effects.events_digest().map(|_| inner_temp_store.events),
2309 effects,
2310 execution_result,
2311 suggested_gas_price: self
2312 .congestion_tracker
2313 .get_prediction_suggested_gas_price(&transaction),
2314 mock_gas_id,
2315 })
2316 }
2317
2318 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2323 pub async fn dev_inspect_transaction_block(
2324 &self,
2325 sender: IotaAddress,
2326 transaction_kind: TransactionKind,
2327 gas_price: Option<u64>,
2328 gas_budget: Option<u64>,
2329 gas_sponsor: Option<IotaAddress>,
2330 gas_objects: Option<Vec<ObjectRef>>,
2331 show_raw_txn_data_and_effects: Option<bool>,
2332 skip_checks: Option<bool>,
2333 ) -> IotaResult<DevInspectResults> {
2334 let epoch_store = self.load_epoch_store_one_call_per_task();
2335
2336 if !self.is_fullnode(&epoch_store) {
2337 return Err(IotaError::UnsupportedFeature {
2338 error: "dev-inspect is only supported on fullnodes".to_string(),
2339 });
2340 }
2341
2342 if transaction_kind.is_system() {
2343 return Err(IotaError::UnsupportedFeature {
2344 error: "system transactions are not supported".to_string(),
2345 });
2346 }
2347
2348 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2349 let skip_checks = skip_checks.unwrap_or(true);
2350 let reference_gas_price = epoch_store.reference_gas_price();
2351 let protocol_config = epoch_store.protocol_config();
2352 let max_tx_gas = protocol_config.max_tx_gas();
2353
2354 let price = gas_price.unwrap_or(reference_gas_price);
2355 let budget = gas_budget.unwrap_or(max_tx_gas);
2356 let owner = gas_sponsor.unwrap_or(sender);
2357 let payment = gas_objects.unwrap_or_default();
2360 let mut transaction = TransactionData::V1(TransactionDataV1 {
2361 kind: transaction_kind.clone(),
2362 sender,
2363 gas_payment: GasData {
2364 objects: payment,
2365 owner,
2366 price,
2367 budget,
2368 },
2369 expiration: TransactionExpiration::None,
2370 });
2371
2372 let raw_txn_data = if show_raw_txn_data_and_effects {
2373 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2374 error: "Failed to serialize transaction during dev inspect".to_string(),
2375 })?
2376 } else {
2377 vec![]
2378 };
2379
2380 transaction.validity_check_no_gas_check(protocol_config)?;
2381
2382 let input_object_kinds = transaction.input_objects()?;
2383 let receiving_object_refs = transaction.receiving_objects();
2384
2385 iota_transaction_checks::deny::check_transaction_for_signing(
2386 &transaction,
2387 &[],
2388 &input_object_kinds,
2389 &receiving_object_refs,
2390 &self.config.transaction_deny_config,
2391 self.get_backing_package_store().as_ref(),
2392 )?;
2393
2394 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2395 None,
2397 &input_object_kinds,
2398 &receiving_object_refs,
2399 epoch_store.epoch(),
2400 )?;
2401
2402 let (gas_status, checked_input_objects) = if skip_checks {
2403 if transaction.gas().is_empty() {
2408 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2410 SIMULATION_GAS_COIN_VALUE,
2411 transaction.gas_owner(),
2412 );
2413 let gas_object_ref = dummy_gas_object.object_ref();
2414 transaction.gas_data_mut().objects = vec![gas_object_ref];
2415 input_objects.push(ObjectReadResult::new(
2416 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2417 dummy_gas_object.into(),
2418 ));
2419 }
2420 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2421 protocol_config,
2422 &transaction_kind,
2423 input_objects,
2424 receiving_objects,
2425 )?;
2426 let gas_status = IotaGasStatus::new(
2427 max_tx_gas,
2428 transaction.gas_price(),
2429 reference_gas_price,
2430 protocol_config,
2431 )?;
2432
2433 (gas_status, checked_input_objects)
2434 } else {
2435 if transaction.gas().is_empty() {
2439 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2441 SIMULATION_GAS_COIN_VALUE,
2442 transaction.gas_owner(),
2443 );
2444 let gas_object_ref = dummy_gas_object.object_ref();
2445 transaction.gas_data_mut().objects = vec![gas_object_ref];
2446 iota_transaction_checks::check_transaction_input_with_given_gas(
2447 epoch_store.protocol_config(),
2448 reference_gas_price,
2449 &transaction,
2450 input_objects,
2451 receiving_objects,
2452 dummy_gas_object,
2453 &self.metrics.bytecode_verifier_metrics,
2454 &self.config.verifier_signing_config,
2455 )?
2456 } else {
2457 let authenticator_gas_budget = 0;
2460
2461 iota_transaction_checks::check_transaction_input(
2462 epoch_store.protocol_config(),
2463 reference_gas_price,
2464 &transaction,
2465 input_objects,
2466 &receiving_objects,
2467 &self.metrics.bytecode_verifier_metrics,
2468 &self.config.verifier_signing_config,
2469 authenticator_gas_budget,
2470 )?
2471 }
2472 };
2473
2474 let executor = iota_execution::executor(protocol_config, true, None)
2475 .expect("Creating an executor should not fail here");
2476 let gas_data = transaction.gas_data().clone();
2477 let intent_msg = IntentMessage::new(
2478 Intent {
2479 version: IntentVersion::V0,
2480 scope: IntentScope::TransactionData,
2481 app_id: IntentAppId::Iota,
2482 },
2483 transaction,
2484 );
2485 let transaction_digest = TransactionDigest::new(intent_msg.value.digest().into_inner());
2486 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2487 self.get_backing_store().as_ref(),
2488 protocol_config,
2489 self.metrics.limits_metrics.clone(),
2490 false,
2492 self.config.certificate_deny_config.certificate_deny_set(),
2493 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2494 epoch_store
2495 .epoch_start_config()
2496 .epoch_data()
2497 .epoch_start_timestamp(),
2498 checked_input_objects,
2499 gas_data,
2500 gas_status,
2501 transaction_kind,
2502 sender,
2503 transaction_digest,
2504 skip_checks,
2505 );
2506
2507 let raw_effects = if show_raw_txn_data_and_effects {
2508 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2509 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2510 })?
2511 } else {
2512 vec![]
2513 };
2514
2515 let mut layout_resolver =
2516 epoch_store
2517 .executor()
2518 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2519 &inner_temp_store,
2520 self.get_backing_package_store(),
2521 )));
2522
2523 DevInspectResults::new(
2524 effects,
2525 inner_temp_store.events.clone(),
2526 execution_result,
2527 raw_txn_data,
2528 raw_effects,
2529 layout_resolver.as_mut(),
2530 )
2531 }
2532
2533 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2535 let epoch_store = self.epoch_store_for_testing();
2536 Ok(epoch_store.reference_gas_price())
2537 }
2538
2539 #[instrument(level = "trace", skip_all)]
2540 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2541 self.get_transaction_cache_reader()
2542 .try_is_tx_already_executed(digest)
2543 }
2544
2545 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2547 self.try_is_tx_already_executed(digest)
2548 .expect("storage access failed")
2549 }
2550
2551 #[instrument(level = "debug", skip_all, err)]
2553 fn index_tx(
2554 &self,
2555 indexes: &IndexStore,
2556 digest: &TransactionDigest,
2557 cert: &VerifiedExecutableTransaction,
2559 effects: &TransactionEffects,
2560 events: &TransactionEvents,
2561 timestamp_ms: u64,
2562 tx_coins: Option<TxCoins>,
2563 written: &WrittenObjects,
2564 inner_temporary_store: &InnerTemporaryStore,
2565 ) -> IotaResult<u64> {
2566 let changes = self
2567 .process_object_index(effects, written, inner_temporary_store)
2568 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2569
2570 indexes.index_tx(
2571 cert.data().intent_message().value.sender(),
2572 cert.data()
2573 .intent_message()
2574 .value
2575 .input_objects()?
2576 .iter()
2577 .map(|o| o.object_id()),
2578 effects
2579 .all_changed_objects()
2580 .into_iter()
2581 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2582 cert.data()
2583 .intent_message()
2584 .value
2585 .move_calls()
2586 .into_iter()
2587 .map(|(package, module, function)| {
2588 (*package, module.to_owned(), function.to_owned())
2589 }),
2590 events,
2591 changes,
2592 digest,
2593 timestamp_ms,
2594 tx_coins,
2595 )
2596 }
2597
2598 #[cfg(msim)]
2599 fn create_fail_state(
2600 &self,
2601 certificate: &VerifiedExecutableTransaction,
2602 epoch_store: &Arc<AuthorityPerEpochStore>,
2603 effects: &mut TransactionEffects,
2604 ) {
2605 use std::cell::RefCell;
2606
2607 use iota_types::effects::TransactionEffectsAPIForTesting;
2608 thread_local! {
2609 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2610 }
2611 if !certificate.data().intent_message().value.is_system_tx() {
2612 let committee = epoch_store.committee();
2613 let cur_stake = (**committee).weight(&self.name);
2614 if cur_stake > 0 {
2615 FAIL_STATE.with_borrow_mut(|fail_state| {
2616 if fail_state.0 < committee.validity_threshold() {
2618 fail_state.0 += cur_stake;
2619 fail_state.1.insert(self.name);
2620 }
2621
2622 if fail_state.1.contains(&self.name) {
2623 info!("cp_exec failing tx");
2624 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2625 }
2626 });
2627 }
2628 }
2629 }
2630
2631 fn process_object_index(
2632 &self,
2633 effects: &TransactionEffects,
2634 written: &WrittenObjects,
2635 inner_temporary_store: &InnerTemporaryStore,
2636 ) -> IotaResult<ObjectIndexChanges> {
2637 let epoch_store = self.load_epoch_store_one_call_per_task();
2638 let mut layout_resolver =
2639 epoch_store
2640 .executor()
2641 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2642 inner_temporary_store,
2643 self.get_backing_package_store(),
2644 )));
2645
2646 let modified_at_version = effects
2647 .modified_at_versions()
2648 .into_iter()
2649 .collect::<HashMap<_, _>>();
2650
2651 let tx_digest = effects.transaction_digest();
2652 let mut deleted_owners = vec![];
2653 let mut deleted_dynamic_fields = vec![];
2654 for object_ref in effects.deleted().into_iter().chain(effects.wrapped()) {
2655 let old_version = modified_at_version.get(&object_ref.object_id).unwrap();
2656 match self.get_owner_at_version(&object_ref.object_id, *old_version).unwrap_or_else(
2659 |e| panic!("tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {} at version {old_version:?}. Err: {e:?}",object_ref.object_id)
2660 ) {
2661 Owner::Address(addr) => deleted_owners.push((addr, object_ref.object_id)),
2662 Owner::Object(object_id) => {
2663 deleted_dynamic_fields.push((object_id, object_ref.object_id))
2664 }
2665 _ => {}
2666 }
2667 }
2668
2669 let mut new_owners = vec![];
2670 let mut new_dynamic_fields = vec![];
2671
2672 for (oref, owner, kind) in effects.all_changed_objects() {
2673 let id = &oref.object_id;
2674 if let WriteKind::Mutate = kind {
2677 let Some(old_version) = modified_at_version.get(id) else {
2678 panic!(
2679 "tx_digest={tx_digest}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2680 );
2681 };
2682 let Some(old_object) = self
2685 .get_object_store()
2686 .try_get_object_by_key(id, *old_version)?
2687 else {
2688 panic!(
2689 "tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {id} at version {old_version:?}"
2690 );
2691 };
2692 if old_object.owner != owner {
2693 match old_object.owner {
2694 Owner::Address(addr) => {
2695 deleted_owners.push((addr, *id));
2696 }
2697 Owner::Object(object_id) => deleted_dynamic_fields.push((object_id, *id)),
2698 _ => {}
2699 }
2700 }
2701 }
2702
2703 match owner {
2704 Owner::Address(addr) => {
2705 let new_object = written.get(id).unwrap_or_else(
2708 || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2709 );
2710 assert_eq!(
2711 new_object.version(),
2712 oref.version,
2713 "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2714 tx_digest,
2715 id,
2716 new_object.version(),
2717 oref.version
2718 );
2719
2720 let type_ = new_object
2721 .type_()
2722 .map(|type_| ObjectType::Struct(type_.clone()))
2723 .unwrap_or(ObjectType::Package);
2724
2725 new_owners.push((
2726 (addr, *id),
2727 ObjectInfo {
2728 object_id: *id,
2729 version: oref.version,
2730 digest: oref.digest,
2731 type_,
2732 owner,
2733 previous_transaction: *effects.transaction_digest(),
2734 },
2735 ));
2736 }
2737 Owner::Object(owner) => {
2738 let new_object = written.get(id).unwrap_or_else(
2739 || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2740 );
2741 assert_eq!(
2742 new_object.version(),
2743 oref.version,
2744 "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2745 tx_digest,
2746 id,
2747 new_object.version(),
2748 oref.version
2749 );
2750
2751 let Some(df_info) = self
2752 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2753 .unwrap_or_else(|e| {
2754 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2755 None
2756 }
2757 )
2758 else {
2759 continue;
2761 };
2762 new_dynamic_fields.push(((owner, *id), df_info))
2763 }
2764 _ => {}
2765 }
2766 }
2767
2768 Ok(ObjectIndexChanges {
2769 deleted_owners,
2770 deleted_dynamic_fields,
2771 new_owners,
2772 new_dynamic_fields,
2773 })
2774 }
2775
2776 fn try_create_dynamic_field_info(
2777 &self,
2778 o: &Object,
2779 written: &WrittenObjects,
2780 resolver: &mut dyn LayoutResolver,
2781 ) -> IotaResult<Option<DynamicFieldInfo>> {
2782 let Some(move_object) = o.data.as_struct_opt().cloned() else {
2784 return Ok(None);
2785 };
2786
2787 if !move_object.struct_tag().is_dynamic_field() {
2789 return Ok(None);
2790 }
2791
2792 let layout = resolver
2793 .get_annotated_layout(move_object.struct_tag())?
2794 .into_layout();
2795
2796 let field =
2797 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2798 IotaError::ObjectDeserialization {
2799 error: e.to_string(),
2800 }
2801 })?;
2802
2803 let type_ = field.kind;
2804 let name_type: TypeTag = type_tag_core_to_sdk(&field.name_layout.into());
2805 let bcs_name = field.name_bytes.to_owned();
2806
2807 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2808 .map_err(|e| {
2809 warn!("{e}");
2810 IotaError::ObjectDeserialization {
2811 error: e.to_string(),
2812 }
2813 })?;
2814
2815 let name = DynamicFieldName {
2816 type_: name_type,
2817 value: IotaMoveValue::from(name_value).to_json_value(),
2818 };
2819
2820 let value_metadata = field.value_metadata().map_err(|e| {
2821 warn!("{e}");
2822 IotaError::ObjectDeserialization {
2823 error: e.to_string(),
2824 }
2825 })?;
2826
2827 Ok(Some(match value_metadata {
2828 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2829 name,
2830 bcs_name,
2831 type_,
2832 object_type: object_type.to_canonical_string(true),
2833 object_id: o.id(),
2834 version: o.version(),
2835 digest: o.digest(),
2836 },
2837
2838 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2839 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2844 (
2845 object.version(),
2846 object.digest(),
2847 object.data.object_type().unwrap().clone(),
2848 )
2849 } else {
2850 let object = self
2852 .get_object_store()
2853 .try_get_object_by_key(&object_id, o.version())?
2854 .ok_or_else(|| UserInputError::ObjectNotFound {
2855 object_id,
2856 version: Some(o.version()),
2857 })?;
2858 let version = object.version();
2859 let digest = object.digest();
2860 let object_type = object.data.object_type().unwrap().clone();
2861 (version, digest, object_type)
2862 };
2863
2864 DynamicFieldInfo {
2865 name,
2866 bcs_name,
2867 type_,
2868 object_type: object_type.to_string(),
2869 object_id,
2870 version,
2871 digest,
2872 }
2873 }
2874 }))
2875 }
2876
2877 #[instrument(level = "trace", skip_all, err)]
2878 fn post_process_one_tx(
2879 &self,
2880 certificate: &VerifiedExecutableTransaction,
2881 effects: &TransactionEffects,
2882 inner_temporary_store: &InnerTemporaryStore,
2883 epoch_store: &Arc<AuthorityPerEpochStore>,
2884 ) -> IotaResult {
2885 if self.indexes.is_none() {
2886 return Ok(());
2887 }
2888
2889 let _scope = monitored_scope("Execution::post_process_one_tx");
2890
2891 let tx_digest = certificate.digest();
2892 let timestamp_ms = Self::unixtime_now_ms();
2893 let events = &inner_temporary_store.events;
2894 let written = &inner_temporary_store.written;
2895 let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2896 effects,
2897 inner_temporary_store,
2898 epoch_store,
2899 );
2900
2901 if let Some(indexes) = &self.indexes {
2903 let _ = self
2904 .index_tx(
2905 indexes.as_ref(),
2906 tx_digest,
2907 certificate,
2908 effects,
2909 events,
2910 timestamp_ms,
2911 tx_coins,
2912 written,
2913 inner_temporary_store,
2914 )
2915 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2916 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2917 .expect("Indexing tx should not fail");
2918
2919 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2920 let events = self.make_transaction_block_events(
2921 events.clone(),
2922 *tx_digest,
2923 timestamp_ms,
2924 epoch_store,
2925 inner_temporary_store,
2926 )?;
2927 self.subscription_handler
2929 .process_tx(certificate.data().transaction_data(), &effects, &events)
2930 .tap_ok(|_| {
2931 self.metrics
2932 .post_processing_total_tx_had_event_processed
2933 .inc()
2934 })
2935 .tap_err(|e| {
2936 warn!(
2937 ?tx_digest,
2938 "Post processing - Couldn't process events for tx: {}", e
2939 )
2940 })?;
2941
2942 self.metrics
2943 .post_processing_total_events_emitted
2944 .inc_by(events.data.len() as u64);
2945 };
2946 Ok(())
2947 }
2948
2949 fn make_transaction_block_events(
2950 &self,
2951 transaction_events: TransactionEvents,
2952 digest: TransactionDigest,
2953 timestamp_ms: u64,
2954 epoch_store: &Arc<AuthorityPerEpochStore>,
2955 inner_temporary_store: &InnerTemporaryStore,
2956 ) -> IotaResult<IotaTransactionBlockEvents> {
2957 let mut layout_resolver =
2958 epoch_store
2959 .executor()
2960 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2961 inner_temporary_store,
2962 self.get_backing_package_store(),
2963 )));
2964 IotaTransactionBlockEvents::try_from(
2965 transaction_events,
2966 digest,
2967 Some(timestamp_ms),
2968 layout_resolver.as_mut(),
2969 )
2970 }
2971
2972 pub fn unixtime_now_ms() -> u64 {
2973 let now = SystemTime::now()
2974 .duration_since(UNIX_EPOCH)
2975 .expect("Time went backwards")
2976 .as_millis();
2977 u64::try_from(now).expect("Travelling in time machine")
2978 }
2979
2980 #[instrument(level = "trace", skip_all)]
2981 pub async fn handle_transaction_info_request(
2982 &self,
2983 request: TransactionInfoRequest,
2984 ) -> IotaResult<TransactionInfoResponse> {
2985 let epoch_store = self.load_epoch_store_one_call_per_task();
2986 let (transaction, status) = self
2987 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2988 .ok_or(IotaError::TransactionNotFound {
2989 digest: request.transaction_digest,
2990 })?;
2991 Ok(TransactionInfoResponse {
2992 transaction,
2993 status,
2994 })
2995 }
2996
2997 #[instrument(level = "trace", skip_all)]
2998 pub async fn handle_object_info_request(
2999 &self,
3000 request: ObjectInfoRequest,
3001 ) -> IotaResult<ObjectInfoResponse> {
3002 let epoch_store = self.load_epoch_store_one_call_per_task();
3003
3004 let requested_object_seq = match request.request_kind {
3005 ObjectInfoRequestKind::LatestObjectInfo => {
3006 self.try_get_object_or_tombstone(request.object_id)
3007 .await?
3008 .ok_or_else(|| {
3009 IotaError::from(UserInputError::ObjectNotFound {
3010 object_id: request.object_id,
3011 version: None,
3012 })
3013 })?
3014 .version
3015 }
3016 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3017 };
3018
3019 let object = self
3020 .get_object_store()
3021 .try_get_object_by_key(&request.object_id, requested_object_seq)?
3022 .ok_or_else(|| {
3023 IotaError::from(UserInputError::ObjectNotFound {
3024 object_id: request.object_id,
3025 version: Some(requested_object_seq),
3026 })
3027 })?;
3028
3029 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3030 (request.generate_layout, object.data.as_struct_opt())
3031 {
3032 Some(into_struct_layout(
3033 epoch_store
3034 .executor()
3035 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3036 .get_annotated_layout(move_obj.struct_tag())?,
3037 )?)
3038 } else {
3039 None
3040 };
3041
3042 let lock = if !object.is_address_owned() {
3043 None
3045 } else {
3046 self.get_transaction_lock(&object.object_ref(), &epoch_store)
3047 .await?
3048 .map(|s| s.into_inner())
3049 };
3050
3051 Ok(ObjectInfoResponse {
3052 object,
3053 layout,
3054 lock_for_debugging: lock,
3055 })
3056 }
3057
3058 #[instrument(level = "trace", skip_all)]
3059 pub fn handle_checkpoint_request(
3060 &self,
3061 request: &CheckpointRequest,
3062 ) -> IotaResult<CheckpointResponse> {
3063 let summary = if request.certified {
3064 let summary = match request.sequence_number {
3065 Some(seq) => self
3066 .checkpoint_store
3067 .get_checkpoint_by_sequence_number(seq)?,
3068 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3069 }
3070 .map(|v| v.into_inner());
3071 summary.map(CheckpointSummaryResponse::Certified)
3072 } else {
3073 let summary = match request.sequence_number {
3074 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3075 None => self
3076 .checkpoint_store
3077 .get_latest_locally_computed_checkpoint()?,
3078 };
3079 summary.map(CheckpointSummaryResponse::Pending)
3080 };
3081 let contents = match &summary {
3082 Some(s) => self
3083 .checkpoint_store
3084 .get_checkpoint_contents(&s.content_digest())?,
3085 None => None,
3086 };
3087 Ok(CheckpointResponse {
3088 checkpoint: summary,
3089 contents,
3090 })
3091 }
3092
3093 fn check_protocol_version(
3094 supported_protocol_versions: SupportedProtocolVersions,
3095 current_version: ProtocolVersion,
3096 ) {
3097 info!("current protocol version is now {:?}", current_version);
3098 info!("supported versions are: {:?}", supported_protocol_versions);
3099 if !supported_protocol_versions.is_version_supported(current_version) {
3100 let msg = format!(
3101 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3102 );
3103
3104 error!("{}", msg);
3105 eprintln!("{msg}");
3106
3107 #[cfg(not(msim))]
3108 std::process::exit(1);
3109
3110 #[cfg(msim)]
3111 iota_simulator::task::shutdown_current_node();
3112 }
3113 }
3114
3115 #[expect(clippy::disallowed_methods)] #[expect(clippy::too_many_arguments)]
3117 pub async fn new(
3118 name: AuthorityName,
3119 secret: StableSyncAuthoritySigner,
3120 supported_protocol_versions: SupportedProtocolVersions,
3121 store: Arc<AuthorityStore>,
3122 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3123 epoch_store: Arc<AuthorityPerEpochStore>,
3124 committee_store: Arc<CommitteeStore>,
3125 indexes: Option<Arc<IndexStore>>,
3126 grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3127 checkpoint_store: Arc<CheckpointStore>,
3128 prometheus_registry: &Registry,
3129 genesis_objects: &[Object],
3130 db_checkpoint_config: &DBCheckpointConfig,
3131 config: NodeConfig,
3132 archive_readers: ArchiveReaderBalancer,
3133 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3134 chain_identifier: ChainIdentifier,
3135 pruner_db: Option<Arc<AuthorityPrunerTables>>,
3136 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
3137 policy_config: Option<PolicyConfig>,
3138 firewall_config: Option<RemoteFirewallConfig>,
3139 ) -> Arc<Self> {
3140 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3141
3142 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3143 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3144 let transaction_manager = Arc::new(TransactionManager::new(
3145 execution_cache_trait_pointers.object_cache_reader.clone(),
3146 execution_cache_trait_pointers
3147 .transaction_cache_reader
3148 .clone(),
3149 &epoch_store,
3150 tx_ready_certificates,
3151 metrics.clone(),
3152 ));
3153 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3154
3155 let authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3156 epoch_store.get_parent_path(),
3157 config
3158 .authority_store_pruning_config
3159 .num_latest_epoch_dbs_to_retain,
3160 )
3161 .await;
3162 let _pruner = AuthorityStorePruner::new(
3163 store.perpetual_tables.clone(),
3164 checkpoint_store.clone(),
3165 grpc_indexes_store.clone(),
3166 indexes.clone(),
3167 config.authority_store_pruning_config.clone(),
3168 epoch_store.committee().authority_exists(&name),
3169 epoch_store.epoch_start_state().epoch_duration_ms(),
3170 prometheus_registry,
3171 archive_readers,
3172 pruner_db,
3173 checkpoint_progress_tracker.clone(),
3174 );
3175 let input_loader =
3176 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3177 let epoch = epoch_store.epoch();
3178 let rgp = epoch_store.reference_gas_price();
3179 let traffic_controller_metrics =
3180 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3181 let traffic_controller = if let Some(policy_config) = policy_config {
3182 Some(Arc::new(
3183 TrafficController::init(
3184 policy_config,
3185 traffic_controller_metrics,
3186 firewall_config.clone(),
3187 )
3188 .await,
3189 ))
3190 } else {
3191 None
3192 };
3193 let state = Arc::new(AuthorityState {
3194 name,
3195 secret,
3196 execution_lock: RwLock::new(epoch),
3197 epoch_store: ArcSwap::new(epoch_store.clone()),
3198 input_loader,
3199 execution_cache_trait_pointers,
3200 indexes,
3201 grpc_indexes_store,
3202 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3203 checkpoint_store,
3204 committee_store,
3205 transaction_manager,
3206 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3207 metrics,
3208 _pruner,
3209 authority_per_epoch_pruner,
3210 checkpoint_progress_tracker,
3211 db_checkpoint_config: db_checkpoint_config.clone(),
3212 config,
3213 overload_info: AuthorityOverloadInfo::default(),
3214 validator_tx_finalizer,
3215 chain_identifier,
3216 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3217 traffic_controller,
3218 });
3219
3220 let authority_state = Arc::downgrade(&state);
3222 spawn_monitored_task!(execution_process(
3223 authority_state,
3224 rx_ready_certificates,
3225 rx_execution_shutdown,
3226 ));
3227 spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3228
3229 state
3231 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3232 .expect("Error indexing genesis objects.");
3233
3234 state
3235 }
3236
3237 pub fn epoch_db_pruner(&self) -> &AuthorityPerEpochStorePruner {
3238 &self.authority_per_epoch_pruner
3239 }
3240
3241 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3243 &self.execution_cache_trait_pointers.object_cache_reader
3244 }
3245
3246 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3247 &self.execution_cache_trait_pointers.transaction_cache_reader
3248 }
3249
3250 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3251 &self.execution_cache_trait_pointers.cache_writer
3252 }
3253
3254 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3255 &self.execution_cache_trait_pointers.backing_store
3256 }
3257
3258 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3259 &self.execution_cache_trait_pointers.backing_package_store
3260 }
3261
3262 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3263 &self.execution_cache_trait_pointers.object_store
3264 }
3265
3266 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3267 &self.execution_cache_trait_pointers.reconfig_api
3268 }
3269
3270 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3271 &self.execution_cache_trait_pointers.global_state_hash_store
3272 }
3273
3274 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3275 &self.execution_cache_trait_pointers.checkpoint_cache
3276 }
3277
3278 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3279 &self.execution_cache_trait_pointers.state_sync_store
3280 }
3281
3282 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3283 &self.execution_cache_trait_pointers.cache_commit
3284 }
3285
3286 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3287 self.execution_cache_trait_pointers
3288 .testing_api
3289 .database_for_testing()
3290 }
3291
3292 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3293 &self,
3294 config: NodeConfig,
3295 metrics: Arc<AuthorityStorePruningMetrics>,
3296 ) -> anyhow::Result<()> {
3297 let archive_readers =
3298 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3299 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3300 &self.database_for_testing().perpetual_tables,
3301 &self.checkpoint_store,
3302 self.grpc_indexes_store.as_deref(),
3303 None,
3304 config.authority_store_pruning_config,
3305 metrics,
3306 archive_readers,
3307 EPOCH_DURATION_MS_FOR_TESTING,
3308 self.checkpoint_progress_tracker.as_ref(),
3309 )
3310 .await
3311 }
3312
3313 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3314 &self.transaction_manager
3315 }
3316
3317 pub fn enqueue_transactions_for_execution(
3320 &self,
3321 txns: Vec<VerifiedExecutableTransaction>,
3322 epoch_store: &Arc<AuthorityPerEpochStore>,
3323 ) {
3324 self.transaction_manager.enqueue(txns, epoch_store)
3325 }
3326
3327 pub fn enqueue_certificates_for_execution(
3329 &self,
3330 certs: Vec<VerifiedCertificate>,
3331 epoch_store: &Arc<AuthorityPerEpochStore>,
3332 ) {
3333 self.transaction_manager
3334 .enqueue_certificates(certs, epoch_store)
3335 }
3336
3337 pub fn enqueue_with_expected_effects_digest(
3338 &self,
3339 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3340 epoch_store: &AuthorityPerEpochStore,
3341 ) {
3342 self.transaction_manager
3343 .enqueue_with_expected_effects_digest(certs, epoch_store)
3344 }
3345
3346 fn create_owner_index_if_empty(
3347 &self,
3348 genesis_objects: &[Object],
3349 epoch_store: &Arc<AuthorityPerEpochStore>,
3350 ) -> IotaResult {
3351 let Some(index_store) = &self.indexes else {
3352 return Ok(());
3353 };
3354 if !index_store.is_empty() {
3355 return Ok(());
3356 }
3357
3358 let mut new_owners = vec![];
3359 let mut new_dynamic_fields = vec![];
3360 let mut layout_resolver = epoch_store
3361 .executor()
3362 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3363 for o in genesis_objects.iter() {
3364 match o.owner {
3365 Owner::Address(addr) => {
3366 new_owners.push(((addr, o.id()), ObjectInfo::new(&o.object_ref(), o)))
3367 }
3368 Owner::Object(object_id) => {
3369 let id = o.id();
3370 let info = match self.try_create_dynamic_field_info(
3371 o,
3372 &BTreeMap::new(),
3373 layout_resolver.as_mut(),
3374 ) {
3375 Ok(Some(info)) => info,
3376 Ok(None) => continue,
3377 Err(IotaError::UserInput {
3378 error:
3379 UserInputError::ObjectNotFound {
3380 object_id: not_found_id,
3381 version,
3382 },
3383 }) => {
3384 warn!(
3385 ?not_found_id,
3386 ?version,
3387 object_owner=?object_id,
3388 field=?id,
3389 "Skipping dynamic field: referenced genesis object not found"
3390 );
3391 continue;
3392 }
3393 Err(e) => return Err(e),
3394 };
3395 new_dynamic_fields.push(((object_id, id), info));
3396 }
3397 _ => {}
3398 }
3399 }
3400
3401 index_store.insert_genesis_objects(ObjectIndexChanges {
3402 deleted_owners: vec![],
3403 deleted_dynamic_fields: vec![],
3404 new_owners,
3405 new_dynamic_fields,
3406 })
3407 }
3408
3409 pub fn execution_lock_for_executable_transaction(
3413 &self,
3414 transaction: &VerifiedExecutableTransaction,
3415 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3416 let lock = self
3417 .execution_lock
3418 .try_read()
3419 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3420 if *lock == transaction.auth_sig().epoch() {
3421 Ok(lock)
3422 } else {
3423 Err(IotaError::WrongEpoch {
3424 expected_epoch: *lock,
3425 actual_epoch: transaction.auth_sig().epoch(),
3426 })
3427 }
3428 }
3429
3430 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3436 self.execution_lock
3437 .try_read()
3438 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3439 }
3440
3441 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3442 self.execution_lock.write().await
3443 }
3444
3445 #[instrument(level = "error", skip_all)]
3446 pub async fn reconfigure(
3447 &self,
3448 cur_epoch_store: &AuthorityPerEpochStore,
3449 supported_protocol_versions: SupportedProtocolVersions,
3450 new_committee: Committee,
3451 epoch_start_configuration: EpochStartConfiguration,
3452 state_hasher: Arc<GlobalStateHasher>,
3453 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3454 epoch_supply_change: i64,
3455 epoch_last_checkpoint: CheckpointSequenceNumber,
3456 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3457 Self::check_protocol_version(
3458 supported_protocol_versions,
3459 epoch_start_configuration
3460 .epoch_start_state()
3461 .protocol_version(),
3462 );
3463 self.metrics.reset_on_reconfigure();
3464 self.committee_store.insert_new_committee(&new_committee)?;
3465
3466 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3468
3469 cur_epoch_store.epoch_terminated().await;
3471
3472 let highest_locally_built_checkpoint_seq = self
3473 .checkpoint_store
3474 .get_latest_locally_computed_checkpoint()?
3475 .map(|c| *c.sequence_number())
3476 .unwrap_or(0);
3477
3478 assert!(
3479 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3480 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3481 );
3482 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint
3483 || self.is_fullnode(cur_epoch_store)
3484 {
3485 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3489 if num_shared_version_assignments > 10 {
3496 debug_fatal!(
3498 "all shared_version_assignments should have been removed \
3499 (num_shared_version_assignments: {num_shared_version_assignments})"
3500 );
3501 }
3502 }
3503
3504 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3510 .await?;
3511 self.get_reconfig_api()
3512 .clear_state_end_of_epoch(&execution_lock);
3513 self.check_system_consistency(
3514 cur_epoch_store,
3515 state_hasher,
3516 expensive_safety_check_config,
3517 epoch_supply_change,
3518 )?;
3519 self.get_reconfig_api()
3520 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3521 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3522 if self
3523 .db_checkpoint_config
3524 .perform_db_checkpoints_at_epoch_end
3525 {
3526 let checkpoint_indexes = self
3527 .db_checkpoint_config
3528 .perform_index_db_checkpoints_at_epoch_end
3529 .unwrap_or(false);
3530 let current_epoch = cur_epoch_store.epoch();
3531 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3532 self.checkpoint_all_dbs(
3533 &epoch_checkpoint_path,
3534 cur_epoch_store,
3535 checkpoint_indexes,
3536 )?;
3537 }
3538 }
3539
3540 let new_epoch = new_committee.epoch;
3541 let new_epoch_store = self
3542 .reopen_epoch_db(
3543 cur_epoch_store,
3544 new_committee,
3545 epoch_start_configuration,
3546 expensive_safety_check_config,
3547 epoch_last_checkpoint,
3548 )
3549 .await?;
3550 assert_eq!(new_epoch_store.epoch(), new_epoch);
3551 self.transaction_manager.reconfigure(new_epoch);
3552 *execution_lock = new_epoch;
3553 Ok(new_epoch_store)
3557 }
3558
3559 pub async fn reconfigure_for_testing(&self) {
3564 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3565 let epoch_store = self.epoch_store_for_testing().clone();
3566 let protocol_config = epoch_store.protocol_config().clone();
3567 let _guard =
3575 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3576 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3577 self.get_backing_package_store().clone(),
3578 &self.config.expensive_safety_check_config,
3579 self.checkpoint_store
3580 .get_epoch_last_checkpoint(epoch_store.epoch())
3581 .unwrap()
3582 .map(|c| *c.sequence_number())
3583 .unwrap_or_default(),
3584 );
3585 let new_epoch = new_epoch_store.epoch();
3586 self.transaction_manager.reconfigure(new_epoch);
3587 self.epoch_store.store(new_epoch_store);
3588 epoch_store.epoch_terminated().await;
3589 *execution_lock = new_epoch;
3590 }
3591
3592 #[instrument(level = "error", skip_all)]
3593 fn check_system_consistency(
3594 &self,
3595 cur_epoch_store: &AuthorityPerEpochStore,
3596 state_hasher: Arc<GlobalStateHasher>,
3597 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3598 epoch_supply_change: i64,
3599 ) -> IotaResult<()> {
3600 info!(
3601 "Performing iota conservation consistency check for epoch {}",
3602 cur_epoch_store.epoch()
3603 );
3604
3605 if cfg!(debug_assertions) {
3606 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3607 }
3608
3609 self.get_reconfig_api()
3610 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3611
3612 if expensive_safety_check_config.enable_state_consistency_check() {
3614 info!(
3615 "Performing state consistency check for epoch {}",
3616 cur_epoch_store.epoch()
3617 );
3618 self.expensive_check_is_consistent_state(
3619 state_hasher,
3620 cur_epoch_store,
3621 cfg!(debug_assertions), );
3623 }
3624
3625 if expensive_safety_check_config.enable_secondary_index_checks() {
3626 if let Some(indexes) = self.indexes.clone() {
3627 verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
3628 .expect("secondary indexes are inconsistent");
3629 }
3630 }
3631
3632 Ok(())
3633 }
3634
3635 fn expensive_check_is_consistent_state(
3636 &self,
3637 state_hasher: Arc<GlobalStateHasher>,
3638 cur_epoch_store: &AuthorityPerEpochStore,
3639 panic: bool,
3640 ) {
3641 let live_object_set_hash = state_hasher.digest_live_object_set();
3642
3643 let root_state_hash: ECMHLiveObjectSetDigest = self
3644 .get_global_state_hash_store()
3645 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
3646 .expect("Retrieving root state hash cannot fail")
3647 .expect("Root state hash for epoch must exist")
3648 .1
3649 .digest()
3650 .into();
3651
3652 let is_inconsistent = root_state_hash != live_object_set_hash;
3653 if is_inconsistent {
3654 if panic {
3655 panic!(
3656 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3657 );
3658 } else {
3659 error!(
3660 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3661 root_state_hash, live_object_set_hash
3662 );
3663 }
3664 } else {
3665 info!("State consistency check passed");
3666 }
3667
3668 if !panic {
3669 state_hasher.set_inconsistent_state(is_inconsistent);
3670 }
3671 }
3672
3673 pub fn current_epoch_for_testing(&self) -> EpochId {
3674 self.epoch_store_for_testing().epoch()
3675 }
3676
3677 #[instrument(level = "error", skip_all)]
3678 pub fn checkpoint_all_dbs(
3679 &self,
3680 checkpoint_path: &Path,
3681 cur_epoch_store: &AuthorityPerEpochStore,
3682 checkpoint_indexes: bool,
3683 ) -> IotaResult {
3684 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3685 let current_epoch = cur_epoch_store.epoch();
3686
3687 if checkpoint_path.exists() {
3688 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3689 return Ok(());
3690 }
3691
3692 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3693 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3694
3695 if checkpoint_path_tmp.exists() {
3696 fs::remove_dir_all(&checkpoint_path_tmp)
3697 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3698 }
3699
3700 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3701 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3702
3703 self.checkpoint_store
3706 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3707
3708 self.get_reconfig_api()
3709 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3710
3711 self.committee_store
3712 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3713
3714 if checkpoint_indexes {
3715 if let Some(indexes) = self.indexes.as_ref() {
3716 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3717 }
3718 if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3719 grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3720 }
3721 }
3722
3723 fs::rename(checkpoint_path_tmp, checkpoint_path)
3724 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3725 Ok(())
3726 }
3727
3728 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3735 self.epoch_store.load()
3736 }
3737
3738 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3740 self.load_epoch_store_one_call_per_task()
3741 }
3742
3743 pub fn clone_committee_for_testing(&self) -> Committee {
3744 Committee::clone(self.epoch_store_for_testing().committee())
3745 }
3746
3747 #[instrument(level = "trace", skip_all)]
3748 pub async fn try_get_object(&self, object_id: &ObjectId) -> IotaResult<Option<Object>> {
3749 self.get_object_store()
3750 .try_get_object(object_id)
3751 .map_err(Into::into)
3752 }
3753
3754 pub async fn get_object(&self, object_id: &ObjectId) -> Option<Object> {
3756 self.try_get_object(object_id)
3757 .await
3758 .expect("storage access failed")
3759 }
3760
3761 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3762 Ok(self
3763 .try_get_object(&ObjectId::SYSTEM)
3764 .await?
3765 .expect("system package should always exist")
3766 .object_ref())
3767 }
3768
3769 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3771 self.get_object_cache_reader()
3772 .try_get_iota_system_state_object_unsafe()
3773 }
3774
3775 #[instrument(level = "trace", skip_all)]
3776 pub fn get_checkpoint_by_sequence_number(
3777 &self,
3778 sequence_number: CheckpointSequenceNumber,
3779 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3780 Ok(self
3781 .checkpoint_store
3782 .get_checkpoint_by_sequence_number(sequence_number)?)
3783 }
3784
3785 pub async fn wait_for_checkpoint_inclusion(
3792 &self,
3793 digests: &[TransactionDigest],
3794 timeout: Duration,
3795 ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3796 let epoch_store = self.load_epoch_store_one_call_per_task();
3797
3798 let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3801
3802 let results = epoch_store
3803 .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3804 *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3805 self.get_checkpoint_by_sequence_number(seq)
3806 .ok()
3807 .flatten()
3808 .map(|c| c.timestamp_ms)
3809 .unwrap_or(0)
3810 })
3811 })
3812 .await?;
3813
3814 Ok(digests
3815 .iter()
3816 .copied()
3817 .zip(results)
3818 .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3819 .collect())
3820 }
3821
3822 #[instrument(level = "trace", skip_all)]
3823 pub fn get_transaction_checkpoint_for_tests(
3824 &self,
3825 digest: &TransactionDigest,
3826 epoch_store: &AuthorityPerEpochStore,
3827 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3828 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3829 let Some(checkpoint) = checkpoint else {
3830 return Ok(None);
3831 };
3832 let checkpoint = self
3833 .checkpoint_store
3834 .get_checkpoint_by_sequence_number(checkpoint)?;
3835 Ok(checkpoint)
3836 }
3837
3838 #[instrument(level = "trace", skip_all)]
3839 pub fn get_object_read(&self, object_id: &ObjectId) -> IotaResult<ObjectRead> {
3840 Ok(
3841 match self
3842 .get_object_cache_reader()
3843 .try_get_latest_object_or_tombstone(*object_id)?
3844 {
3845 Some((_, ObjectOrTombstone::Object(object))) => {
3846 let layout = self.get_object_layout(&object)?;
3847 ObjectRead::Exists(object.object_ref(), object, layout)
3848 }
3849 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3850 None => ObjectRead::NotExists(*object_id),
3851 },
3852 )
3853 }
3854
3855 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3857 self.chain_identifier
3858 }
3859
3860 #[instrument(level = "trace", skip_all)]
3861 pub fn get_move_object<T>(&self, object_id: &ObjectId) -> IotaResult<T>
3862 where
3863 T: DeserializeOwned,
3864 {
3865 let o = self.get_object_read(object_id)?.into_object()?;
3866 if let Some(move_object) = o.data.as_struct_opt() {
3867 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3868 IotaError::ObjectDeserialization {
3869 error: format!("{e}"),
3870 }
3871 })?)
3872 } else {
3873 Err(IotaError::ObjectDeserialization {
3874 error: format!("Provided object : [{object_id}] is not a Move object."),
3875 })
3876 }
3877 }
3878
3879 #[instrument(level = "trace", skip_all)]
3885 pub fn get_past_object_read(
3886 &self,
3887 object_id: &ObjectId,
3888 version: SequenceNumber,
3889 ) -> IotaResult<PastObjectRead> {
3890 let Some(obj_ref) = self
3892 .get_object_cache_reader()
3893 .try_get_latest_object_ref_or_tombstone(*object_id)?
3894 else {
3895 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3896 };
3897
3898 if version > obj_ref.version {
3899 return Ok(PastObjectRead::VersionTooHigh {
3900 object_id: *object_id,
3901 asked_version: version,
3902 latest_version: obj_ref.version,
3903 });
3904 }
3905
3906 if version < obj_ref.version {
3907 return Ok(match self.read_object_at_version(object_id, version)? {
3909 Some((object, layout)) => {
3910 let obj_ref = object.object_ref();
3911 PastObjectRead::VersionFound(obj_ref, object, layout)
3912 }
3913
3914 None => PastObjectRead::VersionNotFound(*object_id, version),
3915 });
3916 }
3917
3918 if !obj_ref.digest.is_object_alive() {
3919 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3920 }
3921
3922 match self.read_object_at_version(object_id, obj_ref.version)? {
3923 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3924 None => {
3925 error!(
3926 "Object with in parent_entry is missing from object store, datastore is \
3927 inconsistent",
3928 );
3929 Err(UserInputError::ObjectNotFound {
3930 object_id: *object_id,
3931 version: Some(obj_ref.version),
3932 }
3933 .into())
3934 }
3935 }
3936 }
3937
3938 #[instrument(level = "trace", skip_all)]
3939 fn read_object_at_version(
3940 &self,
3941 object_id: &ObjectId,
3942 version: SequenceNumber,
3943 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3944 let Some(object) = self
3945 .get_object_cache_reader()
3946 .try_get_object_by_key(object_id, version)?
3947 else {
3948 return Ok(None);
3949 };
3950
3951 let layout = self.get_object_layout(&object)?;
3952 Ok(Some((object, layout)))
3953 }
3954
3955 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3956 let layout = object
3957 .data
3958 .as_struct_opt()
3959 .map(|object| {
3960 into_struct_layout(
3961 self.load_epoch_store_one_call_per_task()
3962 .executor()
3963 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3965 .get_annotated_layout(object.struct_tag())?,
3966 )
3967 })
3968 .transpose()?;
3969 Ok(layout)
3970 }
3971
3972 fn get_owner_at_version(
3973 &self,
3974 object_id: &ObjectId,
3975 version: SequenceNumber,
3976 ) -> IotaResult<Owner> {
3977 self.get_object_store()
3978 .try_get_object_by_key(object_id, version)?
3979 .ok_or_else(|| {
3980 IotaError::from(UserInputError::ObjectNotFound {
3981 object_id: *object_id,
3982 version: Some(version),
3983 })
3984 })
3985 .map(|o| o.owner)
3986 }
3987
3988 #[instrument(level = "trace", skip_all)]
3989 pub fn get_owner_objects(
3990 &self,
3991 owner: IotaAddress,
3992 cursor: Option<ObjectId>,
3994 limit: usize,
3995 filter: Option<IotaObjectDataFilter>,
3996 ) -> IotaResult<Vec<ObjectInfo>> {
3997 if let Some(indexes) = &self.indexes {
3998 indexes.get_owner_objects(owner, cursor, limit, filter)
3999 } else {
4000 Err(IotaError::IndexStoreNotAvailable)
4001 }
4002 }
4003
4004 #[instrument(level = "trace", skip_all)]
4005 pub fn get_owned_coins_iterator_with_cursor(
4006 &self,
4007 owner: IotaAddress,
4008 cursor: (String, ObjectId),
4010 limit: usize,
4011 one_coin_type_only: bool,
4012 ) -> IotaResult<impl Iterator<Item = (String, ObjectId, CoinInfo)> + '_> {
4013 if let Some(indexes) = &self.indexes {
4014 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4015 } else {
4016 Err(IotaError::IndexStoreNotAvailable)
4017 }
4018 }
4019
4020 #[instrument(level = "trace", skip_all)]
4021 pub fn get_owner_objects_iterator(
4022 &self,
4023 owner: IotaAddress,
4024 cursor: Option<ObjectId>,
4026 filter: Option<IotaObjectDataFilter>,
4027 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
4028 let cursor_u = cursor.unwrap_or(ObjectId::ZERO);
4029 if let Some(indexes) = &self.indexes {
4030 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4031 } else {
4032 Err(IotaError::IndexStoreNotAvailable)
4033 }
4034 }
4035
4036 #[instrument(level = "trace", skip_all)]
4037 pub async fn get_move_objects<T>(
4038 &self,
4039 owner: IotaAddress,
4040 tag: StructTag,
4041 ) -> IotaResult<Vec<T>>
4042 where
4043 T: DeserializeOwned,
4044 {
4045 let object_ids = self
4046 .get_owner_objects_iterator(owner, None, None)?
4047 .filter(|o| match &o.type_ {
4048 ObjectType::Struct(s) => *s == tag,
4049 ObjectType::Package => false,
4050 })
4051 .map(|info| ObjectKey(info.object_id, info.version))
4052 .collect::<Vec<_>>();
4053 let mut move_objects = vec![];
4054
4055 let objects = self
4056 .get_object_store()
4057 .try_multi_get_objects_by_key(&object_ids)?;
4058
4059 for (o, id) in objects.into_iter().zip(object_ids) {
4060 let object = o.ok_or_else(|| {
4061 IotaError::from(UserInputError::ObjectNotFound {
4062 object_id: id.0,
4063 version: Some(id.1),
4064 })
4065 })?;
4066 let move_object = object.data.as_struct_opt().ok_or_else(|| {
4067 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4068 })?;
4069 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4070 IotaError::ObjectDeserialization {
4071 error: format!("{e}"),
4072 }
4073 })?);
4074 }
4075 Ok(move_objects)
4076 }
4077
4078 #[instrument(level = "trace", skip_all)]
4079 pub fn get_dynamic_fields(
4080 &self,
4081 owner: ObjectId,
4082 cursor: Option<ObjectId>,
4084 limit: usize,
4085 ) -> IotaResult<Vec<(ObjectId, DynamicFieldInfo)>> {
4086 Ok(self
4087 .get_dynamic_fields_iterator(owner, cursor)?
4088 .take(limit)
4089 .collect::<Result<Vec<_>, _>>()?)
4090 }
4091
4092 fn get_dynamic_fields_iterator(
4093 &self,
4094 owner: ObjectId,
4095 cursor: Option<ObjectId>,
4097 ) -> IotaResult<impl Iterator<Item = Result<(ObjectId, DynamicFieldInfo), TypedStoreError>> + '_>
4098 {
4099 if let Some(indexes) = &self.indexes {
4100 indexes.get_dynamic_fields_iterator(owner, cursor)
4101 } else {
4102 Err(IotaError::IndexStoreNotAvailable)
4103 }
4104 }
4105
4106 #[instrument(level = "trace", skip_all)]
4107 pub fn get_dynamic_field_object_id(
4108 &self,
4109 owner: ObjectId,
4110 name_type: TypeTag,
4111 name_bcs_bytes: &[u8],
4112 ) -> IotaResult<Option<ObjectId>> {
4113 if let Some(indexes) = &self.indexes {
4114 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4115 } else {
4116 Err(IotaError::IndexStoreNotAvailable)
4117 }
4118 }
4119
4120 #[instrument(level = "trace", skip_all)]
4121 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4122 Ok(self.get_indexes()?.next_sequence_number())
4123 }
4124
4125 #[instrument(level = "trace", skip_all)]
4126 pub async fn get_executed_transaction_and_effects(
4127 &self,
4128 digest: TransactionDigest,
4129 kv_store: Arc<TransactionKeyValueStore>,
4130 ) -> IotaResult<(Transaction, TransactionEffects)> {
4131 let transaction = kv_store.get_tx(digest).await?;
4132 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4133 Ok((transaction, effects))
4134 }
4135
4136 #[instrument(level = "trace", skip_all)]
4137 pub fn multi_get_checkpoint_by_sequence_number(
4138 &self,
4139 sequence_numbers: &[CheckpointSequenceNumber],
4140 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4141 Ok(self
4142 .checkpoint_store
4143 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4144 }
4145
4146 #[instrument(level = "trace", skip_all)]
4147 pub fn get_transaction_events(
4148 &self,
4149 digest: &TransactionDigest,
4150 ) -> IotaResult<TransactionEvents> {
4151 self.get_transaction_cache_reader()
4152 .try_get_events(digest)?
4153 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4154 }
4155
4156 pub fn get_transaction_input_objects(
4157 &self,
4158 effects: &TransactionEffects,
4159 ) -> anyhow::Result<Vec<Object>> {
4160 iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4161 .map_err(Into::into)
4162 }
4163
4164 pub fn get_transaction_output_objects(
4165 &self,
4166 effects: &TransactionEffects,
4167 ) -> anyhow::Result<Vec<Object>> {
4168 iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4169 .map_err(Into::into)
4170 }
4171
4172 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4173 match &self.indexes {
4174 Some(i) => Ok(i.clone()),
4175 None => Err(IotaError::UnsupportedFeature {
4176 error: "extended object indexing is not enabled on this server".into(),
4177 }),
4178 }
4179 }
4180
4181 pub async fn get_transactions_for_tests(
4182 self: &Arc<Self>,
4183 filter: Option<TransactionFilter>,
4184 cursor: Option<TransactionDigest>,
4185 limit: Option<usize>,
4186 reverse: bool,
4187 ) -> IotaResult<Vec<TransactionDigest>> {
4188 let metrics = KeyValueStoreMetrics::new_for_tests();
4189 let kv_store = Arc::new(TransactionKeyValueStore::new(
4190 "rocksdb",
4191 metrics,
4192 self.clone(),
4193 ));
4194 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4195 .await
4196 }
4197
4198 #[instrument(level = "trace", skip_all)]
4199 pub async fn get_transactions(
4200 &self,
4201 kv_store: &Arc<TransactionKeyValueStore>,
4202 filter: Option<TransactionFilter>,
4203 cursor: Option<TransactionDigest>,
4205 limit: Option<usize>,
4206 reverse: bool,
4207 ) -> IotaResult<Vec<TransactionDigest>> {
4208 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4209 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4210 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4211 if reverse {
4212 let iter = iter
4213 .rev()
4214 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4215 .skip(usize::from(cursor.is_some()));
4216 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4217 } else {
4218 let iter = iter
4219 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4220 .skip(usize::from(cursor.is_some()));
4221 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4222 }
4223 }
4224 self.get_indexes()?
4225 .get_transactions(filter, cursor, limit, reverse)
4226 }
4227
4228 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4229 &self.checkpoint_store
4230 }
4231
4232 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4233 self.get_checkpoint_store()
4234 .get_highest_executed_checkpoint_seq_number()?
4235 .ok_or(IotaError::UserInput {
4236 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4237 })
4238 }
4239
4240 #[cfg(msim)]
4241 pub fn get_highest_pruned_checkpoint_for_testing(
4242 &self,
4243 ) -> IotaResult<CheckpointSequenceNumber> {
4244 self.database_for_testing()
4245 .perpetual_tables
4246 .get_highest_pruned_checkpoint()
4247 .map(|c| c.unwrap_or(0))
4248 .map_err(Into::into)
4249 }
4250
4251 #[instrument(level = "trace", skip_all)]
4252 pub fn get_checkpoint_summary_by_sequence_number(
4253 &self,
4254 sequence_number: CheckpointSequenceNumber,
4255 ) -> IotaResult<CheckpointSummary> {
4256 let verified_checkpoint = self
4257 .get_checkpoint_store()
4258 .get_checkpoint_by_sequence_number(sequence_number)?;
4259 match verified_checkpoint {
4260 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4261 None => Err(IotaError::UserInput {
4262 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4263 }),
4264 }
4265 }
4266
4267 #[instrument(level = "trace", skip_all)]
4268 pub fn get_checkpoint_summary_by_digest(
4269 &self,
4270 digest: CheckpointDigest,
4271 ) -> IotaResult<CheckpointSummary> {
4272 let verified_checkpoint = self
4273 .get_checkpoint_store()
4274 .get_checkpoint_by_digest(&digest)?;
4275 match verified_checkpoint {
4276 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4277 None => Err(IotaError::UserInput {
4278 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4279 }),
4280 }
4281 }
4282
4283 #[instrument(level = "trace", skip_all)]
4284 pub fn find_publish_txn_digest(&self, package_id: ObjectId) -> IotaResult<TransactionDigest> {
4285 if package_id.is_system_package() {
4286 return self.find_genesis_txn_digest();
4287 }
4288 Ok(self
4289 .get_object_read(&package_id)?
4290 .into_object()?
4291 .previous_transaction)
4292 }
4293
4294 #[instrument(level = "trace", skip_all)]
4295 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4296 let summary = self
4297 .get_verified_checkpoint_by_sequence_number(0)?
4298 .into_message();
4299 let content = self.get_checkpoint_contents(summary.content_digest)?;
4300 let genesis_transaction = content.enumerate_transactions(&summary).next();
4301 Ok(genesis_transaction
4302 .ok_or(IotaError::UserInput {
4303 error: UserInputError::GenesisTransactionNotFound,
4304 })?
4305 .1
4306 .transaction)
4307 }
4308
4309 #[instrument(level = "trace", skip_all)]
4310 pub fn get_verified_checkpoint_by_sequence_number(
4311 &self,
4312 sequence_number: CheckpointSequenceNumber,
4313 ) -> IotaResult<VerifiedCheckpoint> {
4314 let verified_checkpoint = self
4315 .get_checkpoint_store()
4316 .get_checkpoint_by_sequence_number(sequence_number)?;
4317 match verified_checkpoint {
4318 Some(verified_checkpoint) => Ok(verified_checkpoint),
4319 None => Err(IotaError::UserInput {
4320 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4321 }),
4322 }
4323 }
4324
4325 #[instrument(level = "trace", skip_all)]
4326 pub fn get_verified_checkpoint_summary_by_digest(
4327 &self,
4328 digest: CheckpointDigest,
4329 ) -> IotaResult<VerifiedCheckpoint> {
4330 let verified_checkpoint = self
4331 .get_checkpoint_store()
4332 .get_checkpoint_by_digest(&digest)?;
4333 match verified_checkpoint {
4334 Some(verified_checkpoint) => Ok(verified_checkpoint),
4335 None => Err(IotaError::UserInput {
4336 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4337 }),
4338 }
4339 }
4340
4341 #[instrument(level = "trace", skip_all)]
4342 pub fn get_checkpoint_contents(
4343 &self,
4344 digest: CheckpointContentsDigest,
4345 ) -> IotaResult<CheckpointContents> {
4346 self.get_checkpoint_store()
4347 .get_checkpoint_contents(&digest)?
4348 .ok_or(IotaError::UserInput {
4349 error: UserInputError::CheckpointContentsNotFound(digest),
4350 })
4351 }
4352
4353 #[instrument(level = "trace", skip_all)]
4354 pub fn get_checkpoint_contents_by_sequence_number(
4355 &self,
4356 sequence_number: CheckpointSequenceNumber,
4357 ) -> IotaResult<CheckpointContents> {
4358 let verified_checkpoint = self
4359 .get_checkpoint_store()
4360 .get_checkpoint_by_sequence_number(sequence_number)?;
4361 match verified_checkpoint {
4362 Some(verified_checkpoint) => {
4363 let content_digest = verified_checkpoint.into_inner().content_digest;
4364 self.get_checkpoint_contents(content_digest)
4365 }
4366 None => Err(IotaError::UserInput {
4367 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4368 }),
4369 }
4370 }
4371
4372 #[instrument(level = "trace", skip_all)]
4373 pub async fn query_events(
4374 &self,
4375 kv_store: &Arc<TransactionKeyValueStore>,
4376 query: EventFilter,
4377 cursor: Option<EventID>,
4379 limit: usize,
4380 descending: bool,
4381 ) -> IotaResult<Vec<IotaEvent>> {
4382 let index_store = self.get_indexes()?;
4383
4384 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4386 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4387 IotaError::TransactionNotFound {
4388 digest: cursor.tx_digest,
4389 },
4390 )?;
4391 (tx_seq, cursor.event_seq as usize)
4392 } else if descending {
4393 (u64::MAX, usize::MAX)
4394 } else {
4395 (0, 0)
4396 };
4397
4398 let limit = limit + 1;
4399 let mut event_keys = match query {
4400 EventFilter::All(filters) => {
4401 if filters.is_empty() {
4402 index_store.all_events(tx_num, event_num, limit, descending)?
4403 } else {
4404 return Err(IotaError::UserInput {
4405 error: UserInputError::Unsupported(
4406 "This query type does not currently support filter combinations"
4407 .to_string(),
4408 ),
4409 });
4410 }
4411 }
4412 EventFilter::Transaction(digest) => {
4413 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4414 }
4415 EventFilter::MoveModule { package, module } => {
4416 let module_id = ModuleId::new(
4417 AccountAddress::new(package.into_bytes()),
4418 move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4419 );
4420 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4421 }
4422 EventFilter::MoveEventType(struct_name) => index_store
4423 .events_by_move_event_struct_name(
4424 &struct_name,
4425 tx_num,
4426 event_num,
4427 limit,
4428 descending,
4429 )?,
4430 EventFilter::Sender(sender) => {
4431 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4432 }
4433 EventFilter::TimeRange {
4434 start_time,
4435 end_time,
4436 } => index_store
4437 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4438 EventFilter::MoveEventModule { package, module } => index_store
4439 .events_by_move_event_module(
4440 &ModuleId::new(
4441 AccountAddress::new(package.into_bytes()),
4442 move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4443 ),
4444 tx_num,
4445 event_num,
4446 limit,
4447 descending,
4448 )?,
4449 EventFilter::Package(_)
4451 | EventFilter::MoveEventField { .. }
4452 | EventFilter::Any(_)
4453 | EventFilter::And(_, _)
4454 | EventFilter::Or(_, _) => {
4455 return Err(IotaError::UserInput {
4456 error: UserInputError::Unsupported(
4457 "This query type is not supported by the full node.".to_string(),
4458 ),
4459 });
4460 }
4461 };
4462
4463 if cursor.is_some() {
4466 if !event_keys.is_empty() {
4467 event_keys.remove(0);
4468 }
4469 } else {
4470 event_keys.truncate(limit - 1);
4471 }
4472
4473 let transaction_digests = event_keys
4475 .iter()
4476 .map(|(_, digest, _, _)| *digest)
4477 .collect::<HashSet<_>>()
4478 .into_iter()
4479 .collect::<Vec<_>>();
4480
4481 let events = kv_store
4482 .multi_get_events_by_tx_digests(&transaction_digests)
4483 .await?;
4484
4485 let events_map: HashMap<_, _> =
4486 transaction_digests.iter().zip(events.into_iter()).collect();
4487
4488 let stored_events = event_keys
4489 .into_iter()
4490 .map(|k| {
4491 (
4492 k,
4493 events_map
4494 .get(&k.1)
4495 .expect("fetched digest is missing")
4496 .clone()
4497 .and_then(|e| e.get(k.2).cloned()),
4498 )
4499 })
4500 .map(
4501 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4502 event
4503 .map(|e| (e, tx_digest, event_seq, timestamp))
4504 .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4505 },
4506 )
4507 .collect::<Result<Vec<_>, _>>()?;
4508
4509 let epoch_store = self.load_epoch_store_one_call_per_task();
4510 let backing_store = self.get_backing_package_store().as_ref();
4511 let mut layout_resolver = epoch_store
4512 .executor()
4513 .type_layout_resolver(Box::new(backing_store));
4514 let mut events = vec![];
4515 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4516 events.push(IotaEvent::try_from(
4517 e.clone(),
4518 tx_digest,
4519 event_seq as u64,
4520 Some(timestamp),
4521 layout_resolver.get_annotated_layout(&e.type_)?,
4522 )?)
4523 }
4524 Ok(events)
4525 }
4526
4527 pub async fn insert_genesis_object(&self, object: Object) {
4528 self.get_reconfig_api()
4529 .try_insert_genesis_object(object)
4530 .expect("Cannot insert genesis object")
4531 }
4532
4533 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4534 futures::future::join_all(
4535 objects
4536 .iter()
4537 .map(|o| self.insert_genesis_object(o.clone())),
4538 )
4539 .await;
4540 }
4541
4542 #[instrument(level = "trace", skip_all)]
4544 pub fn get_transaction_status(
4545 &self,
4546 transaction_digest: &TransactionDigest,
4547 epoch_store: &Arc<AuthorityPerEpochStore>,
4548 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4549 if let Some(effects) =
4551 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4552 {
4553 if let Some(transaction) = self
4554 .get_transaction_cache_reader()
4555 .try_get_transaction_block(transaction_digest)?
4556 {
4557 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4558 let events = if effects.events_digest().is_some() {
4559 self.get_transaction_events(effects.transaction_digest())?
4560 } else {
4561 TransactionEvents::default()
4562 };
4563 return Ok(Some((
4564 (*transaction).clone().into_message(),
4565 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4566 )));
4567 } else {
4568 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4573 }
4574 }
4575 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4576 self.metrics.tx_already_processed.inc();
4577 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4578 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4579 } else {
4580 Ok(None)
4581 }
4582 }
4583
4584 #[instrument(level = "trace", skip_all)]
4588 pub fn get_signed_effects_and_maybe_resign(
4589 &self,
4590 transaction_digest: &TransactionDigest,
4591 epoch_store: &Arc<AuthorityPerEpochStore>,
4592 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4593 let effects = self
4594 .get_transaction_cache_reader()
4595 .try_get_executed_effects(transaction_digest)?;
4596 match effects {
4597 Some(effects) => {
4598 if effects.epoch() != epoch_store.epoch() {
4621 debug!(
4622 tx_digest=?transaction_digest,
4623 effects_epoch=?effects.epoch(),
4624 epoch=?epoch_store.epoch(),
4625 "Re-signing the effects with the current epoch"
4626 );
4627 }
4628 Ok(Some(self.sign_effects(effects, epoch_store)?))
4629 }
4630 None => Ok(None),
4631 }
4632 }
4633
4634 #[instrument(level = "trace", skip_all)]
4635 pub(crate) fn sign_effects(
4636 &self,
4637 effects: TransactionEffects,
4638 epoch_store: &Arc<AuthorityPerEpochStore>,
4639 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4640 let tx_digest = *effects.transaction_digest();
4641 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4642 Some(sig) => {
4643 debug_assert!(sig.epoch == epoch_store.epoch());
4644 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4645 }
4646 _ => {
4647 let sig = AuthoritySignInfo::new(
4648 epoch_store.epoch(),
4649 &effects,
4650 Intent::iota_app(IntentScope::TransactionEffects),
4651 self.name,
4652 &*self.secret,
4653 );
4654
4655 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4656
4657 epoch_store.insert_effects_digest_and_signature(
4658 &tx_digest,
4659 effects.digest(),
4660 &sig,
4661 )?;
4662
4663 effects
4664 }
4665 };
4666
4667 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4668 signed_effects,
4669 ))
4670 }
4671
4672 #[instrument(level = "trace", skip_all)]
4674 fn fullnode_only_get_tx_coins_for_indexing(
4675 &self,
4676 effects: &TransactionEffects,
4677 inner_temporary_store: &InnerTemporaryStore,
4678 epoch_store: &Arc<AuthorityPerEpochStore>,
4679 ) -> Option<TxCoins> {
4680 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4681 return None;
4682 }
4683 let written_coin_objects = inner_temporary_store
4684 .written
4685 .iter()
4686 .filter_map(|(k, v)| {
4687 if v.is_coin() {
4688 Some((*k, v.clone()))
4689 } else {
4690 None
4691 }
4692 })
4693 .collect();
4694 let mut input_coin_objects = inner_temporary_store
4695 .input_objects
4696 .iter()
4697 .filter_map(|(k, v)| {
4698 if v.is_coin() {
4699 Some((*k, v.clone()))
4700 } else {
4701 None
4702 }
4703 })
4704 .collect::<ObjectMap>();
4705
4706 for (object_id, version) in effects.modified_at_versions() {
4711 if inner_temporary_store
4712 .loaded_runtime_objects
4713 .contains_key(&object_id)
4714 {
4715 if let Some(object) = self
4716 .get_object_store()
4717 .get_object_by_key(&object_id, version)
4718 {
4719 if object.is_coin() {
4720 input_coin_objects.insert(object_id, object);
4721 }
4722 }
4723 }
4724 }
4725
4726 Some((input_coin_objects, written_coin_objects))
4727 }
4728
4729 #[instrument(level = "trace", skip_all)]
4741 pub async fn get_transaction_lock(
4742 &self,
4743 object_ref: &ObjectRef,
4744 epoch_store: &AuthorityPerEpochStore,
4745 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4746 let lock_info = self
4747 .get_object_cache_reader()
4748 .try_get_lock(*object_ref, epoch_store)?;
4749 let lock_info = match lock_info {
4750 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4751 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4752 provided_obj_ref: *object_ref,
4753 current_version: locked_ref.version,
4754 }
4755 .into());
4756 }
4757 ObjectLockStatus::Initialized => {
4758 return Ok(None);
4759 }
4760 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4761 };
4762
4763 epoch_store.get_signed_transaction(&lock_info)
4764 }
4765
4766 pub async fn try_get_objects(&self, objects: &[ObjectId]) -> IotaResult<Vec<Option<Object>>> {
4767 self.get_object_cache_reader().try_get_objects(objects)
4768 }
4769
4770 pub async fn get_objects(&self, objects: &[ObjectId]) -> Vec<Option<Object>> {
4772 self.try_get_objects(objects)
4773 .await
4774 .expect("storage access failed")
4775 }
4776
4777 pub async fn try_get_object_or_tombstone(
4778 &self,
4779 object_id: ObjectId,
4780 ) -> IotaResult<Option<ObjectRef>> {
4781 self.get_object_cache_reader()
4782 .try_get_latest_object_ref_or_tombstone(object_id)
4783 }
4784
4785 pub async fn get_object_or_tombstone(&self, object_id: ObjectId) -> Option<ObjectRef> {
4787 self.try_get_object_or_tombstone(object_id)
4788 .await
4789 .expect("storage access failed")
4790 }
4791
4792 pub fn set_override_protocol_upgrade_buffer_stake(
4802 &self,
4803 expected_epoch: EpochId,
4804 buffer_stake_bps: u64,
4805 ) -> IotaResult {
4806 let epoch_store = self.load_epoch_store_one_call_per_task();
4807 let actual_epoch = epoch_store.epoch();
4808 if actual_epoch != expected_epoch {
4809 return Err(IotaError::WrongEpoch {
4810 expected_epoch,
4811 actual_epoch,
4812 });
4813 }
4814
4815 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4816 }
4817
4818 pub fn clear_override_protocol_upgrade_buffer_stake(
4819 &self,
4820 expected_epoch: EpochId,
4821 ) -> IotaResult {
4822 let epoch_store = self.load_epoch_store_one_call_per_task();
4823 let actual_epoch = epoch_store.epoch();
4824 if actual_epoch != expected_epoch {
4825 return Err(IotaError::WrongEpoch {
4826 expected_epoch,
4827 actual_epoch,
4828 });
4829 }
4830
4831 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4832 }
4833
4834 pub async fn get_available_system_packages(
4838 &self,
4839 binary_config: &BinaryConfig,
4840 ) -> Vec<ObjectRef> {
4841 let mut results = vec![];
4842
4843 let system_packages = BuiltInFramework::iter_system_packages();
4844
4845 #[cfg(msim)]
4847 let extra_packages = framework_injection::get_extra_packages(self.name);
4848 #[cfg(msim)]
4849 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4850
4851 for system_package in system_packages {
4852 let modules = system_package.modules().to_vec();
4853 #[cfg(msim)]
4855 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4856 .unwrap_or(modules);
4857
4858 let Some(obj_ref) = iota_framework::compare_system_package(
4859 &self.get_object_store(),
4860 &system_package.id,
4861 &modules,
4862 system_package.dependencies.to_vec(),
4863 binary_config,
4864 )
4865 .await
4866 else {
4867 return vec![];
4868 };
4869 results.push(obj_ref);
4870 }
4871
4872 results
4873 }
4874
4875 async fn get_system_package_bytes(
4892 &self,
4893 system_packages: Vec<ObjectRef>,
4894 binary_config: &BinaryConfig,
4895 ) -> Option<Vec<SystemPackage>> {
4896 let ids: Vec<_> = system_packages
4897 .iter()
4898 .map(|object_ref| object_ref.object_id)
4899 .collect();
4900 let objects = self.get_objects(&ids).await;
4901
4902 let mut res = Vec::with_capacity(system_packages.len());
4903 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4904 let prev_transaction = match object {
4905 Some(cur_object) if cur_object.object_ref() == system_package_ref => {
4906 info!(
4908 "Framework {} does not need updating",
4909 system_package_ref.object_id
4910 );
4911 continue;
4912 }
4913
4914 Some(cur_object) => cur_object.previous_transaction,
4915 None => TransactionDigest::GENESIS_MARKER,
4916 };
4917
4918 #[cfg(msim)]
4919 let FrameworkSystemPackage {
4920 id: _,
4921 bytes,
4922 dependencies,
4923 } = framework_injection::get_override_system_package(
4924 &system_package_ref.object_id,
4925 self.name,
4926 )
4927 .unwrap_or_else(|| {
4928 BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone()
4929 });
4930
4931 #[cfg(not(msim))]
4932 let FrameworkSystemPackage {
4933 id: _,
4934 bytes,
4935 dependencies,
4936 } = BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone();
4937
4938 let modules: Vec<_> = bytes
4939 .iter()
4940 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4941 .collect();
4942
4943 let new_object = Object::new_system_package(
4944 &modules,
4945 system_package_ref.version,
4946 dependencies.clone(),
4947 prev_transaction,
4948 );
4949
4950 let new_ref = new_object.object_ref();
4951 if new_ref != system_package_ref {
4952 error!(
4953 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4954 );
4955 return None;
4956 }
4957
4958 res.push(SystemPackage {
4959 version: system_package_ref.version,
4960 modules: bytes,
4961 dependencies,
4962 });
4963 }
4964
4965 Some(res)
4966 }
4967
4968 fn is_protocol_version_supported_v1(
4972 proposed_protocol_version: ProtocolVersion,
4973 committee: &Committee,
4974 capabilities: Vec<AuthorityCapabilitiesV1>,
4975 mut buffer_stake_bps: u64,
4976 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4977 if buffer_stake_bps > 10000 {
4978 warn!("clamping buffer_stake_bps to 10000");
4979 buffer_stake_bps = 10000;
4980 }
4981
4982 let mut desired_upgrades: Vec<_> = capabilities
4985 .into_iter()
4986 .filter_map(|mut cap| {
4987 if cap.available_system_packages.is_empty() {
4989 return None;
4990 }
4991
4992 cap.available_system_packages.sort();
4993
4994 info!(
4995 "validator {:?} supports {:?} with system packages: {:?}",
4996 cap.authority.concise(),
4997 cap.supported_protocol_versions,
4998 cap.available_system_packages,
4999 );
5000
5001 cap.supported_protocol_versions
5005 .get_version_digest(proposed_protocol_version)
5006 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5007 })
5008 .collect();
5009
5010 desired_upgrades.sort();
5013 desired_upgrades
5014 .into_iter()
5015 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5016 .into_iter()
5017 .find_map(|((digest, packages), group)| {
5018 assert!(!packages.is_empty());
5020
5021 let mut stake_aggregator: StakeAggregator<(), true> =
5022 StakeAggregator::new(Arc::new(committee.clone()));
5023
5024 for (_, _, authority) in group {
5025 stake_aggregator.insert_generic(authority, ());
5026 }
5027
5028 let total_votes = stake_aggregator.total_votes();
5029 let quorum_threshold = committee.quorum_threshold();
5030 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5031
5032 info!(
5033 protocol_config_digest = ?digest,
5034 ?total_votes,
5035 ?quorum_threshold,
5036 ?buffer_stake_bps,
5037 ?effective_threshold,
5038 ?proposed_protocol_version,
5039 ?packages,
5040 "support for upgrade"
5041 );
5042
5043 let has_support = total_votes >= effective_threshold;
5044 has_support.then_some((proposed_protocol_version, digest, packages))
5045 })
5046 }
5047
5048 fn choose_protocol_version_and_system_packages_v1(
5052 current_protocol_version: ProtocolVersion,
5053 current_protocol_digest: Digest,
5054 committee: &Committee,
5055 capabilities: Vec<AuthorityCapabilitiesV1>,
5056 buffer_stake_bps: u64,
5057 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
5058 let mut next_protocol_version = current_protocol_version;
5059 let mut system_packages = vec![];
5060 let mut protocol_version_digest = current_protocol_digest;
5061
5062 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
5066 next_protocol_version + 1,
5067 committee,
5068 capabilities.clone(),
5069 buffer_stake_bps,
5070 ) {
5071 next_protocol_version = version;
5072 protocol_version_digest = digest;
5073 system_packages = packages;
5074 }
5075
5076 (
5077 next_protocol_version,
5078 protocol_version_digest,
5079 system_packages,
5080 )
5081 }
5082
5083 fn get_validators_supporting_protocol_version(
5088 target_protocol_version: ProtocolVersion,
5089 target_digest: Digest,
5090 active_validators: &[AuthorityPublicKey],
5091 capabilities: &[AuthorityCapabilitiesV1],
5092 ) -> Vec<u64> {
5093 let mut eligible_validators = Vec::new();
5094
5095 for capability in capabilities {
5096 if let Some(digest) = capability
5098 .supported_protocol_versions
5099 .get_version_digest(target_protocol_version)
5100 {
5101 if digest == target_digest {
5102 if let Some(index) = active_validators
5104 .iter()
5105 .position(|name| AuthorityName::from(name) == capability.authority)
5106 {
5107 eligible_validators.push(index as u64);
5108 }
5109 }
5110 }
5111 }
5112
5113 eligible_validators.sort();
5115 eligible_validators
5116 }
5117
5118 fn calculate_eligible_validators_weight(
5123 eligible_validator_indices: &[u64],
5124 active_validators: &[AuthorityPublicKey],
5125 committee: &Committee,
5126 ) -> u64 {
5127 let mut total_weight = 0u64;
5128
5129 for &index in eligible_validator_indices {
5130 let authority_pubkey = &active_validators[index as usize];
5131 if let Some((_, weight)) = committee
5133 .members()
5134 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5135 {
5136 total_weight += weight;
5137 }
5138 }
5139
5140 total_weight
5141 }
5142
5143 #[instrument(level = "error", skip_all)]
5156 pub async fn create_and_execute_advance_epoch_tx(
5157 &self,
5158 epoch_store: &Arc<AuthorityPerEpochStore>,
5159 gas_cost_summary: &GasCostSummary,
5160 checkpoint: CheckpointSequenceNumber,
5161 epoch_start_timestamp_ms: CheckpointTimestamp,
5162 scores: Vec<u64>,
5163 ) -> anyhow::Result<(
5164 IotaSystemState,
5165 Option<SystemEpochInfoEvent>,
5166 TransactionEffects,
5167 )> {
5168 let mut txns = Vec::new();
5169
5170 let next_epoch = epoch_store.epoch() + 1;
5171
5172 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5173 let authority_capabilities = epoch_store
5174 .get_capabilities_v1()
5175 .expect("read capabilities from db cannot fail");
5176 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5177 Self::choose_protocol_version_and_system_packages_v1(
5178 epoch_store.protocol_version(),
5179 SupportedProtocolVersionsWithHashes::protocol_config_digest(
5180 epoch_store.protocol_config(),
5181 ),
5182 epoch_store.committee(),
5183 authority_capabilities.clone(),
5184 buffer_stake_bps,
5185 );
5186
5187 let config = epoch_store.protocol_config();
5191 let binary_config = to_binary_config(config);
5192 let Some(next_epoch_system_package_bytes) = self
5193 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5194 .await
5195 else {
5196 error!(
5197 "upgraded system packages {:?} are not locally available, cannot create \
5198 ChangeEpochTx. validator binary must be upgraded to the correct version!",
5199 next_epoch_system_packages
5200 );
5201 bail!("missing system packages: cannot form ChangeEpochTx");
5211 };
5212
5213 if config.select_committee_from_eligible_validators() {
5216 let active_validators = epoch_store.epoch_start_state().get_active_validators();
5218
5219 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5220
5221 if config.select_committee_supporting_next_epoch_version() {
5225 eligible_active_validators = Self::get_validators_supporting_protocol_version(
5226 next_epoch_protocol_version,
5227 next_epoch_protocol_digest,
5228 &active_validators,
5229 &authority_capabilities,
5230 );
5231
5232 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5234 &eligible_active_validators,
5235 &active_validators,
5236 epoch_store.committee(),
5237 );
5238
5239 let committee = epoch_store.committee();
5243 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5244
5245 if eligible_validators_weight < effective_threshold {
5246 error!(
5247 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5248 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5249 );
5250 eligible_active_validators = (0..active_validators.len() as u64).collect();
5253 }
5254 }
5255
5256 if config.pass_validator_scores_to_advance_epoch() {
5259 txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5260 next_epoch,
5261 next_epoch_protocol_version.as_u64(),
5262 gas_cost_summary.storage_cost,
5263 gas_cost_summary.computation_cost,
5264 gas_cost_summary.computation_cost_burned,
5265 gas_cost_summary.storage_rebate,
5266 gas_cost_summary.non_refundable_storage_fee,
5267 epoch_start_timestamp_ms,
5268 next_epoch_system_package_bytes,
5269 eligible_active_validators,
5270 scores,
5271 config.adjust_rewards_by_score(),
5272 ));
5273 } else {
5274 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5275 next_epoch,
5276 next_epoch_protocol_version.as_u64(),
5277 gas_cost_summary.storage_cost,
5278 gas_cost_summary.computation_cost,
5279 gas_cost_summary.computation_cost_burned,
5280 gas_cost_summary.storage_rebate,
5281 gas_cost_summary.non_refundable_storage_fee,
5282 epoch_start_timestamp_ms,
5283 next_epoch_system_package_bytes,
5284 eligible_active_validators,
5285 ));
5286 }
5287 } else if config.protocol_defined_base_fee()
5288 && config.max_committee_members_count_as_option().is_some()
5289 {
5290 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5291 next_epoch,
5292 next_epoch_protocol_version.as_u64(),
5293 gas_cost_summary.storage_cost,
5294 gas_cost_summary.computation_cost,
5295 gas_cost_summary.computation_cost_burned,
5296 gas_cost_summary.storage_rebate,
5297 gas_cost_summary.non_refundable_storage_fee,
5298 epoch_start_timestamp_ms,
5299 next_epoch_system_package_bytes,
5300 ));
5301 } else {
5302 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5303 next_epoch,
5304 next_epoch_protocol_version.as_u64(),
5305 gas_cost_summary.storage_cost,
5306 gas_cost_summary.computation_cost,
5307 gas_cost_summary.storage_rebate,
5308 gas_cost_summary.non_refundable_storage_fee,
5309 epoch_start_timestamp_ms,
5310 next_epoch_system_package_bytes,
5311 ));
5312 }
5313
5314 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5315
5316 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5317 tx.clone(),
5318 epoch_store.epoch(),
5319 checkpoint,
5320 );
5321
5322 let tx_digest = executable_tx.digest();
5323
5324 info!(
5325 ?next_epoch,
5326 ?next_epoch_protocol_version,
5327 ?next_epoch_system_packages,
5328 computation_cost=?gas_cost_summary.computation_cost,
5329 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5330 storage_cost=?gas_cost_summary.storage_cost,
5331 storage_rebate=?gas_cost_summary.storage_rebate,
5332 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5333 ?tx_digest,
5334 "Creating advance epoch transaction"
5335 );
5336
5337 fail_point_async!("change_epoch_tx_delay");
5338 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5339
5340 if self
5344 .get_transaction_cache_reader()
5345 .try_is_tx_already_executed(tx_digest)?
5346 {
5347 warn!("change epoch tx has already been executed via state sync");
5348 bail!("change epoch tx has already been executed via state sync",);
5349 }
5350
5351 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5352
5353 epoch_store.assign_shared_object_versions_idempotent(
5357 self.get_object_cache_reader().as_ref(),
5358 std::slice::from_ref(&executable_tx),
5359 )?;
5360
5361 let (input_objects, _) =
5362 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5363
5364 let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5365 &execution_guard,
5366 &executable_tx,
5367 input_objects,
5368 vec![],
5369 epoch_store,
5370 )?;
5371 let system_obj = get_iota_system_state(&temporary_store.written)
5372 .expect("change epoch tx must write to system object");
5373 let system_epoch_info_event = temporary_store
5375 .events
5376 .0
5377 .into_iter()
5378 .find(|event| event.is_system_epoch_info_event())
5379 .map(SystemEpochInfoEvent::from);
5380 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5383
5384 self.get_state_sync_store()
5388 .try_insert_transaction_and_effects(&tx, &effects)
5389 .map_err(|err| {
5390 let err: anyhow::Error = err.into();
5391 err
5392 })?;
5393
5394 info!(
5395 "Effects summary of the change epoch transaction: {:?}",
5396 effects.summary_for_debug()
5397 );
5398 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5399 assert!(effects.status().is_success());
5401 Ok((system_obj, system_epoch_info_event, effects))
5402 }
5403
5404 #[instrument(level = "error", skip_all)]
5408 async fn revert_uncommitted_epoch_transactions(
5409 &self,
5410 epoch_store: &AuthorityPerEpochStore,
5411 ) -> IotaResult {
5412 {
5413 let state = epoch_store.get_reconfig_state_write_lock_guard();
5414 if state.should_accept_user_certs() {
5415 epoch_store.close_user_certs(state);
5424 }
5425 }
5427 let pending_certificates = epoch_store.pending_consensus_certificates();
5428 info!(
5429 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5430 pending_certificates.len(),
5431 pending_certificates,
5432 );
5433 for digest in pending_certificates {
5434 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5435 info!(
5436 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5437 digest
5438 );
5439 continue;
5440 }
5441 info!("Reverting {:?} at the end of epoch", digest);
5442 epoch_store.revert_executed_transaction(&digest)?;
5443 self.get_reconfig_api().try_revert_state_update(&digest)?;
5444 }
5445 info!("All uncommitted local transactions reverted");
5446 Ok(())
5447 }
5448
5449 #[instrument(level = "error", skip_all)]
5450 async fn reopen_epoch_db(
5451 &self,
5452 cur_epoch_store: &AuthorityPerEpochStore,
5453 new_committee: Committee,
5454 epoch_start_configuration: EpochStartConfiguration,
5455 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5456 epoch_last_checkpoint: CheckpointSequenceNumber,
5457 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5458 let new_epoch = new_committee.epoch;
5459 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5460 assert_eq!(
5461 epoch_start_configuration.epoch_start_state().epoch(),
5462 new_committee.epoch
5463 );
5464 fail_point!("before-open-new-epoch-store");
5465 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5466 self.name,
5467 new_committee,
5468 epoch_start_configuration,
5469 self.get_backing_package_store().clone(),
5470 expensive_safety_check_config,
5471 epoch_last_checkpoint,
5472 )?;
5473 self.epoch_store.store(new_epoch_store.clone());
5474 Ok(new_epoch_store)
5475 }
5476
5477 fn check_move_account(
5480 &self,
5481 auth_account_object_id: ObjectId,
5482 auth_account_object_seq_number: Option<SequenceNumber>,
5483 auth_account_object_digest: Option<ObjectDigest>,
5484 account_object: ObjectReadResult,
5485 signer: &IotaAddress,
5486 ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5487 let account_object = match account_object.object {
5488 ObjectReadResultKind::Object(object) => Ok(object),
5489 ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5490 Err(UserInputError::AccountObjectDeleted {
5491 account_id: account_object.id(),
5492 account_version: version,
5493 transaction_digest: digest,
5494 })
5495 }
5496 ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5499 Err(UserInputError::AccountObjectInCanceledTransaction {
5500 account_id: account_object.id(),
5501 account_version: version,
5502 })
5503 }
5504 }?;
5505
5506 let account_object_addr = IotaAddress::from(auth_account_object_id);
5507
5508 fp_ensure!(
5509 signer == &account_object_addr,
5510 UserInputError::IncorrectUserSignature {
5511 error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5512 }
5513 .into()
5514 );
5515
5516 fp_ensure!(
5517 account_object.is_shared() || account_object.is_immutable(),
5518 UserInputError::AccountObjectNotSupported {
5519 object_id: auth_account_object_id
5520 }
5521 .into()
5522 );
5523
5524 let auth_account_object_seq_number =
5525 if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5526 let account_object_version = account_object.version();
5527
5528 fp_ensure!(
5529 account_object_version == auth_account_object_seq_number,
5530 UserInputError::AccountObjectVersionMismatch {
5531 object_id: auth_account_object_id,
5532 expected_version: auth_account_object_seq_number,
5533 actual_version: account_object_version,
5534 }
5535 .into()
5536 );
5537
5538 auth_account_object_seq_number
5539 } else {
5540 account_object.version()
5541 };
5542
5543 if let Some(auth_account_object_digest) = auth_account_object_digest {
5544 let expected_digest = account_object.digest();
5545 fp_ensure!(
5546 expected_digest == auth_account_object_digest,
5547 UserInputError::InvalidAccountObjectDigest {
5548 object_id: auth_account_object_id,
5549 expected_digest,
5550 actual_digest: auth_account_object_digest,
5551 }
5552 .into()
5553 );
5554 }
5555
5556 let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5557 auth_account_object_id,
5558 &AuthenticatorFunctionRefV1Key::tag().into(),
5559 &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5560 )
5561 .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5562 account_object_id: auth_account_object_id,
5563 })?;
5564
5565 let authenticator_function_ref_field = self
5566 .get_object_cache_reader()
5567 .try_find_object_lt_or_eq_version(
5568 authenticator_function_ref_field_id,
5569 auth_account_object_seq_number,
5570 )?;
5571
5572 if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5573 let field_move_object = authenticator_function_ref_field_obj
5574 .data
5575 .as_struct_opt()
5576 .expect("dynamic field should never be a package object");
5577
5578 let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5579 field_move_object.to_rust().map_err(|_| {
5580 UserInputError::InvalidAuthenticatorFunctionRefField {
5581 account_object_id: auth_account_object_id,
5582 }
5583 })?;
5584
5585 Ok(AuthenticatorFunctionRefForExecution::new_v1(
5586 field.value,
5587 authenticator_function_ref_field_obj.object_ref(),
5588 authenticator_function_ref_field_obj.owner,
5589 authenticator_function_ref_field_obj.storage_rebate,
5590 authenticator_function_ref_field_obj.previous_transaction,
5591 ))
5592 } else {
5593 Err(UserInputError::MoveAuthenticatorNotFound {
5594 authenticator_function_ref_id: authenticator_function_ref_field_id,
5595 account_object_id: auth_account_object_id,
5596 account_object_version: auth_account_object_seq_number,
5597 }
5598 .into())
5599 }
5600 }
5601
5602 #[allow(clippy::type_complexity)]
5603 fn read_objects_for_signing(
5604 &self,
5605 transaction: &VerifiedTransaction,
5606 epoch: u64,
5607 ) -> IotaResult<(
5608 InputObjects,
5609 ReceivingObjects,
5610 Vec<(InputObjects, ObjectReadResult)>,
5611 )> {
5612 let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5613 Some(transaction.digest()),
5614 &transaction.collect_all_input_object_kind_for_reading()?,
5615 &transaction.data().transaction_data().receiving_objects(),
5616 epoch,
5617 )?;
5618
5619 transaction
5620 .split_input_objects_into_groups_for_reading(input_objects)
5621 .map(|(tx_input_objects, per_authenticator_inputs)| {
5622 (
5623 tx_input_objects,
5624 tx_receiving_objects,
5625 per_authenticator_inputs,
5626 )
5627 })
5628 }
5629
5630 #[allow(clippy::type_complexity)]
5631 fn check_transaction_inputs_for_signing(
5632 &self,
5633 protocol_config: &ProtocolConfig,
5634 reference_gas_price: u64,
5635 tx_data: &TransactionData,
5636 tx_input_objects: InputObjects,
5637 tx_receiving_objects: &ReceivingObjects,
5638 move_authenticators: &Vec<&MoveAuthenticator>,
5639 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5640 ) -> IotaResult<(
5641 IotaGasStatus,
5642 CheckedInputObjects,
5643 Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5644 )> {
5645 let authenticator_gas_budget = if move_authenticators.is_empty() {
5646 0
5647 } else {
5648 protocol_config.max_auth_gas()
5651 };
5652
5653 debug_assert_eq!(
5654 move_authenticators.len(),
5655 per_authenticator_inputs.len(),
5656 "Move authenticators amount must match the number of authenticator inputs"
5657 );
5658
5659 let per_authenticator_checked_inputs = move_authenticators
5660 .iter()
5661 .zip(per_authenticator_inputs)
5662 .map(
5663 |(move_authenticator, (authenticator_input_objects, account_object))| {
5664 let (
5666 auth_account_object_id,
5667 auth_account_object_seq_number,
5668 auth_account_object_digest,
5669 ) = move_authenticator.object_to_authenticate_components()?;
5670
5671 let signer = move_authenticator.address()?;
5672
5673 let AuthenticatorFunctionRefForExecution {
5675 authenticator_function_ref,
5676 ..
5677 } = self.check_move_account(
5678 auth_account_object_id,
5679 auth_account_object_seq_number,
5680 auth_account_object_digest,
5681 account_object,
5682 &signer,
5683 )?;
5684
5685 let authenticator_checked_input_objects =
5687 iota_transaction_checks::check_move_authenticator_input_for_signing(
5688 authenticator_input_objects,
5689 )?;
5690
5691 Ok((
5692 authenticator_checked_input_objects,
5693 authenticator_function_ref,
5694 ))
5695 },
5696 )
5697 .collect::<IotaResult<Vec<_>>>()?;
5698
5699 let (gas_status, tx_checked_input_objects) =
5701 iota_transaction_checks::check_transaction_input(
5702 protocol_config,
5703 reference_gas_price,
5704 tx_data,
5705 tx_input_objects,
5706 tx_receiving_objects,
5707 &self.metrics.bytecode_verifier_metrics,
5708 &self.config.verifier_signing_config,
5709 authenticator_gas_budget,
5710 )?;
5711
5712 Ok((
5713 gas_status,
5714 tx_checked_input_objects,
5715 per_authenticator_checked_inputs,
5716 ))
5717 }
5718
5719 #[cfg(test)]
5720 pub(crate) fn iter_live_object_set_for_testing(
5721 &self,
5722 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5723 self.get_global_state_hash_store()
5724 .iter_cached_live_object_set_for_testing()
5725 }
5726
5727 #[cfg(test)]
5728 pub(crate) fn shutdown_execution_for_test(&self) {
5729 self.tx_execution_shutdown
5730 .lock()
5731 .take()
5732 .unwrap()
5733 .send(())
5734 .unwrap();
5735 }
5736
5737 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5740 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5741 self.get_object_cache_reader()
5742 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5743 self.get_reconfig_api()
5744 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5745 }
5746}
5747
5748pub struct RandomnessRoundReceiver {
5749 authority_state: Arc<AuthorityState>,
5750 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5751}
5752
5753impl RandomnessRoundReceiver {
5754 pub fn spawn(
5755 authority_state: Arc<AuthorityState>,
5756 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5757 ) -> JoinHandle<()> {
5758 let rrr = RandomnessRoundReceiver {
5759 authority_state,
5760 randomness_rx,
5761 };
5762 spawn_monitored_task!(rrr.run())
5763 }
5764
5765 async fn run(mut self) {
5766 info!("RandomnessRoundReceiver event loop started");
5767
5768 loop {
5769 tokio::select! {
5770 maybe_recv = self.randomness_rx.recv() => {
5771 if let Some((epoch, round, bytes)) = maybe_recv {
5772 self.handle_new_randomness(epoch, round, bytes);
5773 } else {
5774 break;
5775 }
5776 },
5777 }
5778 }
5779
5780 info!("RandomnessRoundReceiver event loop ended");
5781 }
5782
5783 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5784 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5785 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5786 if epoch_store.epoch() != epoch {
5787 warn!(
5788 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5789 epoch_store.epoch()
5790 );
5791 return;
5792 }
5793 let transaction = VerifiedTransaction::new_randomness_state_update(
5794 epoch,
5795 round,
5796 bytes,
5797 epoch_store
5798 .epoch_start_config()
5799 .randomness_obj_initial_shared_version(),
5800 );
5801 debug!(
5802 "created randomness state update transaction with digest: {:?}",
5803 transaction.digest()
5804 );
5805 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5806 let digest = *transaction.digest();
5807
5808 self.authority_state
5813 .get_cache_commit()
5814 .persist_transaction(&transaction);
5815
5816 self.authority_state
5818 .transaction_manager()
5819 .enqueue(vec![transaction], &epoch_store);
5820
5821 let authority_state = self.authority_state.clone();
5822 spawn_monitored_task!(async move {
5823 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5832 let result = tokio::time::timeout(
5833 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5834 authority_state
5835 .get_transaction_cache_reader()
5836 .try_notify_read_executed_effects(&[digest]),
5837 )
5838 .await;
5839 let result = match result {
5840 Ok(result) => result,
5841 Err(_) => {
5842 if cfg!(debug_assertions) {
5843 panic!(
5845 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5846 );
5847 }
5848 warn!(
5849 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5850 );
5851 authority_state
5853 .get_transaction_cache_reader()
5854 .try_notify_read_executed_effects(&[digest])
5855 .await
5856 }
5857 };
5858
5859 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5860 let effects = effects.pop().expect("should return effects");
5861 if *effects.status() != ExecutionStatus::Success {
5862 fatal!(
5863 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5864 );
5865 }
5866 debug!(
5867 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5868 );
5869 });
5870 }
5871}
5872
5873#[async_trait]
5874impl TransactionKeyValueStoreTrait for AuthorityState {
5875 async fn multi_get(
5876 &self,
5877 transaction_keys: &[TransactionDigest],
5878 effects_keys: &[TransactionDigest],
5879 ) -> IotaResult<KVStoreTransactionData> {
5880 let txns = if !transaction_keys.is_empty() {
5881 self.get_transaction_cache_reader()
5882 .try_multi_get_transaction_blocks(transaction_keys)?
5883 .into_iter()
5884 .map(|t| t.map(|t| (*t).clone().into_inner()))
5885 .collect()
5886 } else {
5887 vec![]
5888 };
5889
5890 let fx = if !effects_keys.is_empty() {
5891 self.get_transaction_cache_reader()
5892 .try_multi_get_executed_effects(effects_keys)?
5893 } else {
5894 vec![]
5895 };
5896
5897 Ok((txns, fx))
5898 }
5899
5900 async fn multi_get_checkpoints(
5901 &self,
5902 checkpoint_summaries: &[CheckpointSequenceNumber],
5903 checkpoint_contents: &[CheckpointSequenceNumber],
5904 checkpoint_summaries_by_digest: &[CheckpointDigest],
5905 ) -> IotaResult<(
5906 Vec<Option<CertifiedCheckpointSummary>>,
5907 Vec<Option<CheckpointContents>>,
5908 Vec<Option<CertifiedCheckpointSummary>>,
5909 )> {
5910 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5912 let store = self.get_checkpoint_store();
5913 for seq in checkpoint_summaries {
5914 let checkpoint = store
5915 .get_checkpoint_by_sequence_number(*seq)?
5916 .map(|c| c.into_inner());
5917
5918 summaries.push(checkpoint);
5919 }
5920
5921 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5922 for seq in checkpoint_contents {
5923 let checkpoint = store
5924 .get_checkpoint_by_sequence_number(*seq)?
5925 .and_then(|summary| {
5926 store
5927 .get_checkpoint_contents(&summary.content_digest)
5928 .expect("db read cannot fail")
5929 });
5930 contents.push(checkpoint);
5931 }
5932
5933 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5934 for digest in checkpoint_summaries_by_digest {
5935 let checkpoint = store
5936 .get_checkpoint_by_digest(digest)?
5937 .map(|c| c.into_inner());
5938 summaries_by_digest.push(checkpoint);
5939 }
5940
5941 Ok((summaries, contents, summaries_by_digest))
5942 }
5943
5944 async fn get_transaction_perpetual_checkpoint(
5945 &self,
5946 digest: TransactionDigest,
5947 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5948 self.get_checkpoint_cache()
5949 .try_get_transaction_perpetual_checkpoint(&digest)
5950 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5951 }
5952
5953 async fn get_object(
5954 &self,
5955 object_id: ObjectId,
5956 version: VersionNumber,
5957 ) -> IotaResult<Option<Object>> {
5958 self.get_object_cache_reader()
5959 .try_get_object_by_key(&object_id, version)
5960 }
5961
5962 #[instrument(skip_all)]
5963 async fn multi_get_objects(
5964 &self,
5965 object_keys: &[ObjectKey],
5966 ) -> IotaResult<Vec<Option<Object>>> {
5967 Ok(self
5968 .get_object_cache_reader()
5969 .multi_get_objects_by_key(object_keys))
5970 }
5971
5972 async fn multi_get_transactions_perpetual_checkpoints(
5973 &self,
5974 digests: &[TransactionDigest],
5975 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5976 let res = self
5977 .get_checkpoint_cache()
5978 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5979
5980 Ok(res
5981 .into_iter()
5982 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5983 .collect())
5984 }
5985
5986 #[instrument(skip(self, digests), fields(digests = digests.iter().map(|d| d.to_string()).collect::<Vec<String>>().join(", ")))]
5987 async fn multi_get_events_by_tx_digests(
5988 &self,
5989 digests: &[TransactionDigest],
5990 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5991 if digests.is_empty() {
5992 return Ok(vec![]);
5993 }
5994
5995 Ok(self
5996 .get_transaction_cache_reader()
5997 .multi_get_events(digests))
5998 }
5999}
6000
6001#[cfg(msim)]
6002pub mod framework_injection {
6003 use std::{
6004 cell::RefCell,
6005 collections::{BTreeMap, BTreeSet},
6006 };
6007
6008 use iota_framework::{BuiltInFramework, SystemPackage};
6009 use iota_sdk_types::ObjectId;
6010 use iota_types::base_types::AuthorityName;
6011 use move_binary_format::CompiledModule;
6012
6013 type FrameworkOverrideConfig = BTreeMap<ObjectId, PackageOverrideConfig>;
6014
6015 thread_local! {
6017 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6018 }
6019
6020 type Framework = Vec<CompiledModule>;
6021
6022 pub type PackageUpgradeCallback =
6023 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6024
6025 enum PackageOverrideConfig {
6026 Global(Framework),
6027 PerValidator(PackageUpgradeCallback),
6028 }
6029
6030 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6031 modules
6032 .iter()
6033 .map(|m| {
6034 let mut buf = Vec::new();
6035 m.serialize_with_version(m.version, &mut buf).unwrap();
6036 buf
6037 })
6038 .collect()
6039 }
6040
6041 pub fn set_override(package_id: ObjectId, modules: Vec<CompiledModule>) {
6042 OVERRIDE.with(|bs| {
6043 bs.borrow_mut()
6044 .insert(package_id, PackageOverrideConfig::Global(modules))
6045 });
6046 }
6047
6048 pub fn set_override_cb(package_id: ObjectId, func: PackageUpgradeCallback) {
6049 OVERRIDE.with(|bs| {
6050 bs.borrow_mut()
6051 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6052 });
6053 }
6054
6055 pub fn get_override_bytes(package_id: &ObjectId, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6056 OVERRIDE.with(|cfg| {
6057 cfg.borrow().get(package_id).and_then(|entry| match entry {
6058 PackageOverrideConfig::Global(framework) => {
6059 Some(compiled_modules_to_bytes(framework))
6060 }
6061 PackageOverrideConfig::PerValidator(func) => {
6062 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6063 }
6064 })
6065 })
6066 }
6067
6068 pub fn get_override_modules(
6069 package_id: &ObjectId,
6070 name: AuthorityName,
6071 ) -> Option<Vec<CompiledModule>> {
6072 OVERRIDE.with(|cfg| {
6073 cfg.borrow().get(package_id).and_then(|entry| match entry {
6074 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6075 PackageOverrideConfig::PerValidator(func) => func(name),
6076 })
6077 })
6078 }
6079
6080 pub fn get_override_system_package(
6081 package_id: &ObjectId,
6082 name: AuthorityName,
6083 ) -> Option<SystemPackage> {
6084 let bytes = get_override_bytes(package_id, name)?;
6085 let dependencies = if package_id.is_system_package() {
6086 BuiltInFramework::get_package_by_id(package_id)
6087 .dependencies
6088 .to_vec()
6089 } else {
6090 BuiltInFramework::all_package_ids()
6093 };
6094 Some(SystemPackage {
6095 id: *package_id,
6096 bytes,
6097 dependencies,
6098 })
6099 }
6100
6101 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6102 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6103 let extra: Vec<ObjectId> = OVERRIDE.with(|cfg| {
6104 cfg.borrow()
6105 .keys()
6106 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6107 .collect()
6108 });
6109
6110 extra
6111 .into_iter()
6112 .map(|package| SystemPackage {
6113 id: package,
6114 bytes: get_override_bytes(&package, name).unwrap(),
6115 dependencies: BuiltInFramework::all_package_ids(),
6116 })
6117 .collect()
6118 }
6119}
6120
6121#[derive(Debug, Serialize, Deserialize, Clone)]
6122pub struct ObjDumpFormat {
6123 pub id: ObjectId,
6124 pub version: VersionNumber,
6125 pub digest: ObjectDigest,
6126 pub object: Object,
6127}
6128
6129impl ObjDumpFormat {
6130 fn new(object: Object) -> Self {
6131 let oref = object.object_ref();
6132 Self {
6133 id: oref.object_id,
6134 version: oref.version,
6135 digest: oref.digest,
6136 object,
6137 }
6138 }
6139}
6140
6141#[derive(Debug, Serialize, Deserialize, Clone)]
6142pub struct NodeStateDump {
6143 pub tx_digest: TransactionDigest,
6144 pub sender_signed_data: SenderSignedData,
6145 pub executed_epoch: u64,
6146 pub reference_gas_price: u64,
6147 pub protocol_version: u64,
6148 pub epoch_start_timestamp_ms: u64,
6149 pub computed_effects: TransactionEffects,
6150 pub expected_effects_digest: TransactionEffectsDigest,
6151 pub relevant_system_packages: Vec<ObjDumpFormat>,
6152 pub shared_objects: Vec<ObjDumpFormat>,
6153 pub loaded_child_objects: Vec<ObjDumpFormat>,
6154 pub modified_at_versions: Vec<ObjDumpFormat>,
6155 pub runtime_reads: Vec<ObjDumpFormat>,
6156 pub input_objects: Vec<ObjDumpFormat>,
6157}
6158
6159impl NodeStateDump {
6160 pub fn new(
6161 tx_digest: &TransactionDigest,
6162 effects: &TransactionEffects,
6163 expected_effects_digest: TransactionEffectsDigest,
6164 object_store: &dyn ObjectStore,
6165 epoch_store: &Arc<AuthorityPerEpochStore>,
6166 inner_temporary_store: &InnerTemporaryStore,
6167 certificate: &VerifiedExecutableTransaction,
6168 ) -> IotaResult<Self> {
6169 let executed_epoch = epoch_store.epoch();
6171 let reference_gas_price = epoch_store.reference_gas_price();
6172 let epoch_start_config = epoch_store.epoch_start_config();
6173 let protocol_version = epoch_store.protocol_version().as_u64();
6174 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6175
6176 let mut relevant_system_packages = Vec::new();
6178 for sys_package_id in BuiltInFramework::all_package_ids() {
6179 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6180 relevant_system_packages.push(ObjDumpFormat::new(w))
6181 }
6182 }
6183
6184 let mut shared_objects = Vec::new();
6186 for kind in effects.input_shared_objects() {
6187 match kind {
6188 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6189 if let Some(w) =
6190 object_store.try_get_object_by_key(&obj_ref.object_id, obj_ref.version)?
6191 {
6192 shared_objects.push(ObjDumpFormat::new(w))
6193 }
6194 }
6195 InputSharedObject::ReadDeleted(..)
6196 | InputSharedObject::MutateDeleted(..)
6197 | InputSharedObject::Cancelled(..) => (), }
6200 }
6201
6202 let mut loaded_child_objects = Vec::new();
6205 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6206 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6207 loaded_child_objects.push(ObjDumpFormat::new(w))
6208 }
6209 }
6210
6211 let mut modified_at_versions = Vec::new();
6213 for (id, ver) in effects.modified_at_versions() {
6214 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6215 modified_at_versions.push(ObjDumpFormat::new(w))
6216 }
6217 }
6218
6219 let mut runtime_reads = Vec::new();
6223 for obj in inner_temporary_store
6224 .runtime_packages_loaded_from_db
6225 .values()
6226 {
6227 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6228 }
6229
6230 Ok(Self {
6233 tx_digest: *tx_digest,
6234 executed_epoch,
6235 reference_gas_price,
6236 epoch_start_timestamp_ms,
6237 protocol_version,
6238 relevant_system_packages,
6239 shared_objects,
6240 loaded_child_objects,
6241 modified_at_versions,
6242 runtime_reads,
6243 sender_signed_data: certificate.clone().into_message(),
6244 input_objects: inner_temporary_store
6245 .input_objects
6246 .values()
6247 .map(|o| ObjDumpFormat::new(o.clone()))
6248 .collect(),
6249 computed_effects: effects.clone(),
6250 expected_effects_digest,
6251 })
6252 }
6253
6254 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6255 let mut objects = Vec::new();
6256 objects.extend(self.relevant_system_packages.clone());
6257 objects.extend(self.shared_objects.clone());
6258 objects.extend(self.loaded_child_objects.clone());
6259 objects.extend(self.modified_at_versions.clone());
6260 objects.extend(self.runtime_reads.clone());
6261 objects.extend(self.input_objects.clone());
6262 objects
6263 }
6264
6265 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6266 let file_name = format!(
6267 "{}_{}_NODE_DUMP.json",
6268 self.tx_digest,
6269 AuthorityState::unixtime_now_ms()
6270 );
6271 let mut path = path.to_path_buf();
6272 path.push(&file_name);
6273 let mut file = File::create(path.clone())?;
6274 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6275 Ok(path)
6276 }
6277
6278 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6279 let file = File::open(path)?;
6280 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6281 }
6282}
6283
6284fn pre_consensus_move_authenticators<'a>(
6296 tx: &'a VerifiedTransaction,
6297 protocol_config: &ProtocolConfig,
6298) -> Vec<&'a MoveAuthenticator> {
6299 if protocol_config.pre_consensus_sponsor_only_move_authentication() {
6300 if tx.transaction_data().is_sponsored_tx() {
6301 if let Some(sponsor_move_authenticator) = tx.sponsor_move_authenticator() {
6302 vec![sponsor_move_authenticator]
6303 } else {
6304 vec![]
6305 }
6306 } else {
6307 tx.move_authenticators()
6308 }
6309 } else {
6310 tx.move_authenticators()
6311 }
6312}