1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage as FrameworkSystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::{
48 ExecutionStatus, ObjectId, Owner, StructTag, TypeTag,
49 crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion},
50};
51use iota_storage::{
52 key_value_store::{
53 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
54 },
55 key_value_store_metrics::KeyValueStoreMetrics,
56};
57#[cfg(msim)]
58use iota_types::committee::CommitteeTrait;
59use iota_types::{
60 account_abstraction::{
61 account::AuthenticatorFunctionRefV1Key,
62 authenticator_function::{
63 AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
64 AuthenticatorFunctionRefV1, extract_auth_fun_refs,
65 },
66 },
67 auth_context::AuthContextData,
68 base_types::{
69 AuthorityName, ConciseableName, IotaAddress, ObjectInfo, ObjectRef, ObjectType,
70 SequenceNumber, VersionNumber,
71 },
72 committee::{Committee, EpochId, ProtocolVersion},
73 crypto::{AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer},
74 deny_list_v1::check_coin_deny_list_v1_during_signing,
75 digests::{ChainIdentifier, Digest, ObjectDigest, TransactionDigest, TransactionEffectsDigest},
76 dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
77 effects::{
78 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
79 TransactionEffectsExt, TransactionEvents, VerifiedSignedTransactionEffects,
80 },
81 error::{ExecutionError, IotaError, IotaResult, UserInputError},
82 event::{Event, EventID, SystemEpochInfoEvent},
83 executable_transaction::VerifiedExecutableTransaction,
84 execution_config_utils::to_binary_config,
85 fp_ensure,
86 gas::{GasCostSummary, IotaGasStatus},
87 gas_coin::NANOS_PER_IOTA,
88 inner_temporary_store::{
89 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
90 WrittenObjects,
91 },
92 iota_sdk_types_conversions::type_tag_core_to_sdk,
93 iota_system_state::{
94 IotaSystemState, IotaSystemStateTrait,
95 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
96 },
97 layout_resolver::{LayoutResolver, into_struct_layout},
98 message_envelope::Message,
99 messages_checkpoint::{
100 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
101 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
102 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
103 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
104 },
105 messages_consensus::AuthorityCapabilitiesV1,
106 messages_grpc::{
107 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
108 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
109 TransactionStatus,
110 },
111 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
112 move_authenticator::MoveAuthenticator,
113 object::{
114 MoveObject, MoveObjectExt, OBJECT_START_VERSION, Object, ObjectRead, PastObjectRead,
115 bounded_visitor::BoundedVisitor,
116 },
117 storage::{
118 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
119 },
120 supported_protocol_versions::{
121 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
122 },
123 traffic_control::{PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams},
124 transaction::*,
125 transaction_executor::{SimulateTransactionResult, VmChecks},
126};
127use itertools::Itertools;
128use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
129use move_core_types::{
130 account_address::AccountAddress, annotated_value::MoveStructLayout, language_storage::ModuleId,
131};
132use parking_lot::Mutex;
133use prometheus::{
134 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
135 register_histogram_vec_with_registry, register_histogram_with_registry,
136 register_int_counter_vec_with_registry, register_int_counter_with_registry,
137 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
138};
139use serde::{Deserialize, Serialize, de::DeserializeOwned};
140use tap::TapFallible;
141use tokio::{
142 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
143 task::JoinHandle,
144};
145use tracing::{debug, error, info, instrument, trace, warn};
146use typed_store::TypedStoreError;
147
148use self::{
149 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
150};
151#[cfg(msim)]
152pub use crate::checkpoints::checkpoint_executor::utils::{
153 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
154};
155use crate::{
156 authority::{
157 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
158 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
159 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
160 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
161 authority_store_tables::AuthorityPrunerTables,
162 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
163 },
164 authority_client::NetworkAuthorityClient,
165 checkpoint_progress_tracker::CheckpointProgressTracker,
166 checkpoints::CheckpointStore,
167 congestion_tracker::CongestionTracker,
168 consensus_adapter::ConsensusAdapter,
169 epoch::committee_store::CommitteeStore,
170 execution_cache::{
171 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
172 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
173 TransactionCacheRead,
174 },
175 execution_driver::execution_process,
176 global_state_hasher::{GlobalStateHashStore, GlobalStateHasher},
177 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
178 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
179 metrics::{LatencyObserver, RateTracker},
180 module_cache_metrics::ResolverMetrics,
181 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
182 stake_aggregator::StakeAggregator,
183 subscription_handler::SubscriptionHandler,
184 traffic_controller::{TrafficController, metrics::TrafficControllerMetrics},
185 transaction_input_loader::TransactionInputLoader,
186 transaction_manager::TransactionManager,
187 transaction_outputs::TransactionOutputs,
188 validator_tx_finalizer::ValidatorTxFinalizer,
189 verify_indexes::verify_indexes,
190};
191
192#[cfg(test)]
193#[path = "unit_tests/authority_tests.rs"]
194pub mod authority_tests;
195
196#[cfg(test)]
197#[path = "unit_tests/transaction_tests.rs"]
198pub mod transaction_tests;
199
200#[cfg(test)]
201#[path = "unit_tests/batch_transaction_tests.rs"]
202mod batch_transaction_tests;
203
204#[cfg(test)]
205#[path = "unit_tests/move_integration_tests.rs"]
206pub mod move_integration_tests;
207
208#[cfg(test)]
209#[path = "unit_tests/gas_tests.rs"]
210mod gas_tests;
211
212#[cfg(test)]
213#[path = "unit_tests/batch_verification_tests.rs"]
214mod batch_verification_tests;
215
216#[cfg(test)]
217#[path = "unit_tests/coin_deny_list_tests.rs"]
218mod coin_deny_list_tests;
219
220#[cfg(test)]
221#[path = "unit_tests/auth_unit_test_utils.rs"]
222pub mod auth_unit_test_utils;
223
224pub mod authority_test_utils;
225
226pub mod authority_per_epoch_store;
227pub mod authority_per_epoch_store_pruner;
228
229mod authority_store_migrations;
230pub mod authority_store_pruner;
231pub mod authority_store_tables;
232pub mod authority_store_types;
233pub mod epoch_start_configuration;
234pub mod shared_object_congestion_tracker;
235pub mod shared_object_version_manager;
236pub mod suggested_gas_price_calculator;
237pub mod test_authority_builder;
238pub mod transaction_deferral;
239
240pub(crate) mod authority_store;
241pub mod backpressure;
242
243pub struct AuthorityMetrics {
245 tx_orders: IntCounter,
246 total_certs: IntCounter,
247 total_cert_attempts: IntCounter,
248 total_effects: IntCounter,
249 pub shared_obj_tx: IntCounter,
250 sponsored_tx: IntCounter,
251 tx_already_processed: IntCounter,
252 num_input_objs: Histogram,
253 num_shared_objects: Histogram,
254 batch_size: Histogram,
255
256 authority_state_handle_transaction_latency: Histogram,
257
258 execute_certificate_latency_single_writer: Histogram,
259 execute_certificate_latency_shared_object: Histogram,
260
261 internal_execution_latency: Histogram,
262 execution_load_input_objects_latency: Histogram,
263 prepare_certificate_latency: Histogram,
264 commit_certificate_latency: Histogram,
265 db_checkpoint_latency: Histogram,
266
267 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
268 pub(crate) transaction_manager_num_missing_objects: IntGauge,
269 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
270 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
271 pub(crate) transaction_manager_num_ready: IntGauge,
272 pub(crate) transaction_manager_object_cache_size: IntGauge,
273 pub(crate) transaction_manager_object_cache_hits: IntCounter,
274 pub(crate) transaction_manager_object_cache_misses: IntCounter,
275 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
276 pub(crate) transaction_manager_package_cache_size: IntGauge,
277 pub(crate) transaction_manager_package_cache_hits: IntCounter,
278 pub(crate) transaction_manager_package_cache_misses: IntCounter,
279 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
280 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
281
282 pub(crate) execution_driver_executed_transactions: IntCounter,
283 pub(crate) execution_driver_dispatch_queue: IntGauge,
284 pub(crate) execution_queueing_delay_s: Histogram,
285 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
286 pub(crate) execution_gas_latency_ratio: Histogram,
287
288 pub(crate) skipped_consensus_txns: IntCounter,
289 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
290
291 pub(crate) authority_overload_status: IntGauge,
292 pub(crate) authority_load_shedding_percentage: IntGauge,
293
294 pub(crate) transaction_overload_sources: IntCounterVec,
295
296 post_processing_total_events_emitted: IntCounter,
298 post_processing_total_tx_indexed: IntCounter,
299 post_processing_total_tx_had_event_processed: IntCounter,
300 post_processing_total_failures: IntCounter,
301
302 pub consensus_handler_processed: IntCounterVec,
304 pub consensus_handler_transaction_sizes: HistogramVec,
305 pub consensus_handler_num_low_scoring_authorities: IntGauge,
306 pub consensus_handler_scores: IntGaugeVec,
307 pub consensus_handler_deferred_transactions: IntCounter,
308 pub consensus_handler_congested_transactions: IntCounter,
309 pub consensus_handler_cancelled_transactions: IntCounter,
310 pub consensus_handler_max_object_costs: IntGaugeVec,
311 pub consensus_committed_subdags: IntCounterVec,
312 pub consensus_committed_messages: IntGaugeVec,
313 pub consensus_committed_user_transactions: IntGaugeVec,
314 pub consensus_handler_leader_round: IntGauge,
315 pub consensus_calculated_throughput: IntGauge,
316 pub consensus_calculated_throughput_profile: IntGauge,
317
318 pub validator_scoreboard_scores: IntGaugeVec,
319 pub invalid_misbehavior_reports_by_authority: IntGaugeVec,
320
321 pub limits_metrics: Arc<LimitsMetrics>,
322
323 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
325
326 pub multisig_sig_count: IntCounter,
328
329 pub execution_queueing_latency: LatencyObserver,
332
333 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
340
341 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
344}
345
346const POSITIVE_INT_BUCKETS: &[f64] = &[
348 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
349 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
350 7000000., 10000000.,
351];
352
353const LATENCY_SEC_BUCKETS: &[f64] = &[
354 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
355 10., 20., 30., 60., 90.,
356];
357
358const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
360 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
361 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
362];
363
364const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
365 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
366 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
367];
368
369pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
373 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
374 let execute_certificate_latency = register_histogram_vec_with_registry!(
375 "authority_state_execute_certificate_latency",
376 "Latency of executing certificates, including waiting for inputs",
377 &["tx_type"],
378 LATENCY_SEC_BUCKETS.to_vec(),
379 registry,
380 )
381 .unwrap();
382
383 let execute_certificate_latency_single_writer =
384 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
385 let execute_certificate_latency_shared_object =
386 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
387
388 Self {
389 tx_orders: register_int_counter_with_registry!(
390 "total_transaction_orders",
391 "Total number of transaction orders",
392 registry,
393 )
394 .unwrap(),
395 total_certs: register_int_counter_with_registry!(
396 "total_transaction_certificates",
397 "Total number of transaction certificates handled",
398 registry,
399 )
400 .unwrap(),
401 total_cert_attempts: register_int_counter_with_registry!(
402 "total_handle_certificate_attempts",
403 "Number of calls to handle_certificate",
404 registry,
405 )
406 .unwrap(),
407 total_effects: register_int_counter_with_registry!(
409 "total_transaction_effects",
410 "Total number of transaction effects produced",
411 registry,
412 )
413 .unwrap(),
414
415 shared_obj_tx: register_int_counter_with_registry!(
416 "num_shared_obj_tx",
417 "Number of transactions involving shared objects",
418 registry,
419 )
420 .unwrap(),
421
422 sponsored_tx: register_int_counter_with_registry!(
423 "num_sponsored_tx",
424 "Number of sponsored transactions",
425 registry,
426 )
427 .unwrap(),
428
429 tx_already_processed: register_int_counter_with_registry!(
430 "num_tx_already_processed",
431 "Number of transaction orders already processed previously",
432 registry,
433 )
434 .unwrap(),
435 num_input_objs: register_histogram_with_registry!(
436 "num_input_objects",
437 "Distribution of number of input TX objects per TX",
438 POSITIVE_INT_BUCKETS.to_vec(),
439 registry,
440 )
441 .unwrap(),
442 num_shared_objects: register_histogram_with_registry!(
443 "num_shared_objects",
444 "Number of shared input objects per TX",
445 POSITIVE_INT_BUCKETS.to_vec(),
446 registry,
447 )
448 .unwrap(),
449 batch_size: register_histogram_with_registry!(
450 "batch_size",
451 "Distribution of size of transaction batch",
452 POSITIVE_INT_BUCKETS.to_vec(),
453 registry,
454 )
455 .unwrap(),
456 authority_state_handle_transaction_latency: register_histogram_with_registry!(
457 "authority_state_handle_transaction_latency",
458 "Latency of handling transactions",
459 LATENCY_SEC_BUCKETS.to_vec(),
460 registry,
461 )
462 .unwrap(),
463 execute_certificate_latency_single_writer,
464 execute_certificate_latency_shared_object,
465 internal_execution_latency: register_histogram_with_registry!(
466 "authority_state_internal_execution_latency",
467 "Latency of actual certificate executions",
468 LATENCY_SEC_BUCKETS.to_vec(),
469 registry,
470 )
471 .unwrap(),
472 execution_load_input_objects_latency: register_histogram_with_registry!(
473 "authority_state_execution_load_input_objects_latency",
474 "Latency of loading input objects for execution",
475 LOW_LATENCY_SEC_BUCKETS.to_vec(),
476 registry,
477 )
478 .unwrap(),
479 prepare_certificate_latency: register_histogram_with_registry!(
480 "authority_state_prepare_certificate_latency",
481 "Latency of executing certificates, before committing the results",
482 LATENCY_SEC_BUCKETS.to_vec(),
483 registry,
484 )
485 .unwrap(),
486 commit_certificate_latency: register_histogram_with_registry!(
487 "authority_state_commit_certificate_latency",
488 "Latency of committing certificate execution results",
489 LATENCY_SEC_BUCKETS.to_vec(),
490 registry,
491 )
492 .unwrap(),
493 db_checkpoint_latency: register_histogram_with_registry!(
494 "db_checkpoint_latency",
495 "Latency of checkpointing dbs",
496 LATENCY_SEC_BUCKETS.to_vec(),
497 registry,
498 ).unwrap(),
499 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
500 "transaction_manager_num_enqueued_certificates",
501 "Current number of certificates enqueued to TransactionManager",
502 &["result"],
503 registry,
504 )
505 .unwrap(),
506 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
507 "transaction_manager_num_missing_objects",
508 "Current number of missing objects in TransactionManager",
509 registry,
510 )
511 .unwrap(),
512 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
513 "transaction_manager_num_pending_certificates",
514 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
515 registry,
516 )
517 .unwrap(),
518 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
519 "transaction_manager_num_executing_certificates",
520 "Number of executing certificates, including queued and actually running certificates",
521 registry,
522 )
523 .unwrap(),
524 transaction_manager_num_ready: register_int_gauge_with_registry!(
525 "transaction_manager_num_ready",
526 "Number of ready transactions in TransactionManager",
527 registry,
528 )
529 .unwrap(),
530 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
531 "transaction_manager_object_cache_size",
532 "Current size of object-availability cache in TransactionManager",
533 registry,
534 )
535 .unwrap(),
536 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
537 "transaction_manager_object_cache_hits",
538 "Number of object-availability cache hits in TransactionManager",
539 registry,
540 )
541 .unwrap(),
542 authority_overload_status: register_int_gauge_with_registry!(
543 "authority_overload_status",
544 "Whether authority is current experiencing overload and enters load shedding mode.",
545 registry)
546 .unwrap(),
547 authority_load_shedding_percentage: register_int_gauge_with_registry!(
548 "authority_load_shedding_percentage",
549 "The percentage of transactions is shed when the authority is in load shedding mode.",
550 registry)
551 .unwrap(),
552 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
553 "transaction_manager_object_cache_misses",
554 "Number of object-availability cache misses in TransactionManager",
555 registry,
556 )
557 .unwrap(),
558 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
559 "transaction_manager_object_cache_evictions",
560 "Number of object-availability cache evictions in TransactionManager",
561 registry,
562 )
563 .unwrap(),
564 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
565 "transaction_manager_package_cache_size",
566 "Current size of package-availability cache in TransactionManager",
567 registry,
568 )
569 .unwrap(),
570 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
571 "transaction_manager_package_cache_hits",
572 "Number of package-availability cache hits in TransactionManager",
573 registry,
574 )
575 .unwrap(),
576 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
577 "transaction_manager_package_cache_misses",
578 "Number of package-availability cache misses in TransactionManager",
579 registry,
580 )
581 .unwrap(),
582 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
583 "transaction_manager_package_cache_evictions",
584 "Number of package-availability cache evictions in TransactionManager",
585 registry,
586 )
587 .unwrap(),
588 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
589 "transaction_manager_transaction_queue_age_s",
590 "Time spent in waiting for transaction in the queue",
591 LATENCY_SEC_BUCKETS.to_vec(),
592 registry,
593 )
594 .unwrap(),
595 transaction_overload_sources: register_int_counter_vec_with_registry!(
596 "transaction_overload_sources",
597 "Number of times each source indicates transaction overload.",
598 &["source"],
599 registry)
600 .unwrap(),
601 execution_driver_executed_transactions: register_int_counter_with_registry!(
602 "execution_driver_executed_transactions",
603 "Cumulative number of transaction executed by execution driver",
604 registry,
605 )
606 .unwrap(),
607 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
608 "execution_driver_dispatch_queue",
609 "Number of transaction pending in execution driver dispatch queue",
610 registry,
611 )
612 .unwrap(),
613 execution_queueing_delay_s: register_histogram_with_registry!(
614 "execution_queueing_delay_s",
615 "Queueing delay between a transaction is ready for execution until it starts executing.",
616 LATENCY_SEC_BUCKETS.to_vec(),
617 registry
618 )
619 .unwrap(),
620 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
621 "prepare_cert_gas_latency_ratio",
622 "The ratio of computation gas divided by VM execution latency.",
623 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
624 registry
625 )
626 .unwrap(),
627 execution_gas_latency_ratio: register_histogram_with_registry!(
628 "execution_gas_latency_ratio",
629 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
630 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
631 registry
632 )
633 .unwrap(),
634 skipped_consensus_txns: register_int_counter_with_registry!(
635 "skipped_consensus_txns",
636 "Total number of consensus transactions skipped",
637 registry,
638 )
639 .unwrap(),
640 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
641 "skipped_consensus_txns_cache_hit",
642 "Total number of consensus transactions skipped because of local cache hit",
643 registry,
644 )
645 .unwrap(),
646 post_processing_total_events_emitted: register_int_counter_with_registry!(
647 "post_processing_total_events_emitted",
648 "Total number of events emitted in post processing",
649 registry,
650 )
651 .unwrap(),
652 post_processing_total_tx_indexed: register_int_counter_with_registry!(
653 "post_processing_total_tx_indexed",
654 "Total number of txes indexed in post processing",
655 registry,
656 )
657 .unwrap(),
658 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
659 "post_processing_total_tx_had_event_processed",
660 "Total number of txes finished event processing in post processing",
661 registry,
662 )
663 .unwrap(),
664 post_processing_total_failures: register_int_counter_with_registry!(
665 "post_processing_total_failures",
666 "Total number of failure in post processing",
667 registry,
668 )
669 .unwrap(),
670 consensus_handler_processed: register_int_counter_vec_with_registry!(
671 "consensus_handler_processed",
672 "Number of transactions processed by consensus handler",
673 &["class"],
674 registry
675 ).unwrap(),
676 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
677 "consensus_handler_transaction_sizes",
678 "Sizes of each type of transactions processed by consensus handler",
679 &["class"],
680 POSITIVE_INT_BUCKETS.to_vec(),
681 registry
682 ).unwrap(),
683 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
684 "consensus_handler_num_low_scoring_authorities",
685 "Number of low scoring authorities based on reputation scores from consensus",
686 registry
687 ).unwrap(),
688 consensus_handler_scores: register_int_gauge_vec_with_registry!(
689 "consensus_handler_scores",
690 "scores from consensus for each authority",
691 &["authority"],
692 registry,
693 ).unwrap(),
694 validator_scoreboard_scores: register_int_gauge_vec_with_registry!(
695 "validator_scoreboard_scores",
696 "Per-authority validator scores published by the local Scoreboard after each consensus commit. Range [0, MAX_SCORE].",
697 &["authority"],
698 registry,
699 ).unwrap(),
700 invalid_misbehavior_reports_by_authority: register_int_gauge_vec_with_registry!(
701 "invalid_misbehavior_reports_by_authority",
702 "Cumulative count of invalid misbehavior reports received from each reporting authority in the current epoch. Bumped when a `MisbehaviorReport` consensus transaction fails sender/authority match or payload validation. Snapshot republished after each consensus commit.",
703 &["authority"],
704 registry,
705 ).unwrap(),
706 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
707 "consensus_handler_deferred_transactions",
708 "Number of transactions deferred by consensus handler",
709 registry,
710 ).unwrap(),
711 consensus_handler_congested_transactions: register_int_counter_with_registry!(
712 "consensus_handler_congested_transactions",
713 "Number of transactions deferred by consensus handler due to congestion",
714 registry,
715 ).unwrap(),
716 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
717 "consensus_handler_cancelled_transactions",
718 "Number of transactions cancelled by consensus handler",
719 registry,
720 ).unwrap(),
721 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
722 "consensus_handler_max_congestion_control_object_costs",
723 "Max object costs for congestion control in the current consensus commit",
724 &["commit_type"],
725 registry,
726 ).unwrap(),
727 consensus_committed_subdags: register_int_counter_vec_with_registry!(
728 "consensus_committed_subdags",
729 "Number of committed subdags, sliced by leader",
730 &["authority"],
731 registry,
732 ).unwrap(),
733 consensus_committed_messages: register_int_gauge_vec_with_registry!(
734 "consensus_committed_messages",
735 "Total number of committed consensus messages, sliced by author",
736 &["authority"],
737 registry,
738 ).unwrap(),
739 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
740 "consensus_committed_user_transactions",
741 "Number of committed user transactions, sliced by submitter",
742 &["authority"],
743 registry,
744 ).unwrap(),
745 consensus_handler_leader_round: register_int_gauge_with_registry!(
746 "consensus_handler_leader_round",
747 "The leader round of the current consensus output being processed in the consensus handler",
748 registry,
749 ).unwrap(),
750 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
751 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
752 multisig_sig_count: register_int_counter_with_registry!(
753 "multisig_sig_count",
754 "Count of multisig signatures",
755 registry,
756 )
757 .unwrap(),
758 consensus_calculated_throughput: register_int_gauge_with_registry!(
759 "consensus_calculated_throughput",
760 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
761 registry,
762 ).unwrap(),
763 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
764 "consensus_calculated_throughput_profile",
765 "The current active calculated throughput profile",
766 registry
767 ).unwrap(),
768 execution_queueing_latency: LatencyObserver::new(),
769 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
770 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
771 }
772 }
773
774 pub fn reset_on_reconfigure(&self) {
778 self.consensus_committed_messages.reset();
779 self.consensus_handler_scores.reset();
780 self.validator_scoreboard_scores.reset();
781 self.invalid_misbehavior_reports_by_authority.reset();
782 self.consensus_committed_user_transactions.reset();
783 }
784}
785
786pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
793
794pub struct AuthorityState {
795 pub name: AuthorityName,
798 pub secret: StableSyncAuthoritySigner,
800
801 input_loader: TransactionInputLoader,
803 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
804
805 epoch_store: ArcSwap<AuthorityPerEpochStore>,
806
807 execution_lock: RwLock<EpochId>,
813
814 pub indexes: Option<Arc<IndexStore>>,
815 pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
816
817 pub subscription_handler: Arc<SubscriptionHandler>,
818 pub checkpoint_store: Arc<CheckpointStore>,
819
820 committee_store: Arc<CommitteeStore>,
821
822 transaction_manager: Arc<TransactionManager>,
824
825 #[cfg_attr(not(test), expect(unused))]
827 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
828
829 pub metrics: Arc<AuthorityMetrics>,
830 _pruner: AuthorityStorePruner,
831 authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
832 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
833
834 db_checkpoint_config: DBCheckpointConfig,
836
837 pub config: NodeConfig,
838
839 pub overload_info: AuthorityOverloadInfo,
841
842 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
843
844 chain_identifier: ChainIdentifier,
847
848 pub(crate) congestion_tracker: Arc<CongestionTracker>,
849
850 pub traffic_controller: Option<Arc<TrafficController>>,
852}
853
854impl AuthorityState {
863 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
864 epoch_store.committee().authority_exists(&self.name)
865 }
866
867 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
868 epoch_store
869 .active_validators()
870 .iter()
871 .any(|a| AuthorityName::from(a) == self.name)
872 }
873
874 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
875 !self.is_committee_validator(epoch_store)
876 }
877
878 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
879 &self.committee_store
880 }
881
882 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
883 self.committee_store.clone()
884 }
885
886 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
887 &self.config.authority_overload_config
888 }
889
890 pub fn get_epoch_state_commitments(
891 &self,
892 epoch: EpochId,
893 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
894 self.checkpoint_store.get_epoch_state_commitments(epoch)
895 }
896
897 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
901 async fn handle_transaction_impl(
902 &self,
903 transaction: VerifiedTransaction,
904 epoch_store: &Arc<AuthorityPerEpochStore>,
905 ) -> IotaResult<VerifiedSignedTransaction> {
906 let _execution_lock = self.execution_lock_for_signing()?;
908
909 let protocol_config = epoch_store.protocol_config();
910 let reference_gas_price = epoch_store.reference_gas_price();
911
912 let epoch = epoch_store.epoch();
913
914 let tx_data = transaction.data().transaction_data();
915
916 iota_transaction_checks::deny::check_transaction_for_signing(
920 tx_data,
921 transaction.tx_signatures(),
922 &transaction.input_objects()?,
923 &tx_data.receiving_objects(),
924 &self.config.transaction_deny_config,
925 self.get_backing_package_store().as_ref(),
926 )?;
927
928 let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
933 self.read_objects_for_signing(&transaction, epoch)?;
934
935 let move_authenticators = transaction.move_authenticators();
936
937 let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
943 .check_transaction_inputs_for_signing(
944 protocol_config,
945 reference_gas_price,
946 tx_data,
947 tx_input_objects,
948 &tx_receiving_objects,
949 &move_authenticators,
950 per_authenticator_inputs,
951 )?;
952
953 let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
956 .iter()
957 .map(|i| &i.0)
958 .collect();
959
960 check_coin_deny_list_v1_during_signing(
964 tx_data.sender(),
965 &tx_checked_input_objects,
966 &tx_receiving_objects,
967 &per_authenticator_checked_input_objects,
968 &self.get_object_store(),
969 )?;
970
971 let (kind, signer, gas_data) = tx_data.execution_parts();
972
973 let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
974 extract_auth_fun_refs(signer, gas_data.owner, |address| {
975 move_authenticators
976 .iter()
977 .zip(per_authenticator_checked_inputs.iter())
978 .find(|(move_authenticator, _)| {
979 move_authenticator.address().ok().as_ref() == Some(&address)
980 })
981 .map(|(_, (_, auth_fun_ref))| auth_fun_ref.clone())
982 });
983
984 let pre_consensus_move_authenticators =
989 pre_consensus_move_authenticators(&transaction, protocol_config);
990 let (move_authenticators, per_authenticator_checked_inputs): (Vec<_>, Vec<_>) =
991 move_authenticators
992 .into_iter()
993 .zip(per_authenticator_checked_inputs)
994 .filter(|(a, _)| pre_consensus_move_authenticators.contains(a))
995 .unzip();
996 let per_authenticator_checked_input_objects: Vec<_> = per_authenticator_checked_inputs
997 .iter()
998 .map(|i| &i.0)
999 .collect();
1000
1001 if !move_authenticators.is_empty() {
1004 let aggregated_authenticator_input_objects =
1005 iota_transaction_checks::aggregate_authenticator_input_objects(
1006 &per_authenticator_checked_input_objects,
1007 )?;
1008
1009 debug_assert_eq!(
1010 move_authenticators.len(),
1011 per_authenticator_checked_inputs.len(),
1012 "Move authenticators amount must match the number of checked authenticator inputs"
1013 );
1014
1015 let move_authenticators = move_authenticators
1016 .into_iter()
1017 .zip(per_authenticator_checked_inputs)
1018 .map(
1019 |(
1020 move_authenticator,
1021 (authenticator_checked_input_objects, authenticator_function_ref),
1022 )| {
1023 (
1024 move_authenticator.to_owned(),
1025 authenticator_function_ref,
1026 authenticator_checked_input_objects,
1027 )
1028 },
1029 )
1030 .collect();
1031
1032 let tx_data_bytes =
1037 bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
1038
1039 let (sender_auth_digest, sponsor_auth_digest) =
1040 transaction.data().compute_auth_digests()?;
1041
1042 let auth_context_data = AuthContextData {
1043 transaction_data_bytes: tx_data_bytes,
1044 sender_auth_digest,
1045 sponsor_auth_digest,
1046 sender_authenticator_function_ref,
1047 sponsor_authenticator_function_ref,
1048 };
1049
1050 let validation_result = epoch_store.executor().authenticate_transaction(
1052 self.get_backing_store().as_ref(),
1053 protocol_config,
1054 self.metrics.limits_metrics.clone(),
1055 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1056 epoch_store
1057 .epoch_start_config()
1058 .epoch_data()
1059 .epoch_start_timestamp(),
1060 gas_data,
1061 gas_status,
1062 move_authenticators,
1063 aggregated_authenticator_input_objects,
1064 kind,
1065 signer,
1066 transaction.digest().to_owned(),
1067 auth_context_data,
1068 &mut None,
1069 );
1070
1071 if let Err(validation_error) = validation_result {
1072 return Err(IotaError::MoveAuthenticatorExecutionFailure {
1073 error: validation_error.to_string(),
1074 });
1075 }
1076 }
1077
1078 let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
1079
1080 let signed_transaction =
1081 VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1082
1083 self.get_cache_writer().try_acquire_transaction_locks(
1088 epoch_store,
1089 &owned_objects,
1090 signed_transaction.clone(),
1091 )?;
1092
1093 Ok(signed_transaction)
1094 }
1095
1096 #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()))]
1098 pub async fn handle_transaction(
1099 &self,
1100 epoch_store: &Arc<AuthorityPerEpochStore>,
1101 transaction: VerifiedTransaction,
1102 ) -> IotaResult<HandleTransactionResponse> {
1103 let tx_digest = *transaction.digest();
1104 debug!("handle_transaction");
1105
1106 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1108 return Ok(HandleTransactionResponse { status });
1109 }
1110
1111 let _metrics_guard = self
1112 .metrics
1113 .authority_state_handle_transaction_latency
1114 .start_timer();
1115 self.metrics.tx_orders.inc();
1116
1117 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1118 match signed {
1119 Ok(s) => {
1120 if self.is_committee_validator(epoch_store) {
1121 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1122 let tx = s.clone();
1123 let validator_tx_finalizer = validator_tx_finalizer.clone();
1124 let cache_reader = self.get_transaction_cache_reader().clone();
1125 let epoch_store = epoch_store.clone();
1126 spawn_monitored_task!(epoch_store.within_alive_epoch(
1127 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1128 ));
1129 }
1130 }
1131 Ok(HandleTransactionResponse {
1132 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1133 })
1134 }
1135 Err(err) => Ok(HandleTransactionResponse {
1139 status: self
1140 .get_transaction_status(&tx_digest, epoch_store)?
1141 .ok_or(err)?
1142 .1,
1143 }),
1144 }
1145 }
1146
1147 pub fn check_system_overload_at_signing(&self) -> bool {
1148 self.config
1149 .authority_overload_config
1150 .check_system_overload_at_signing
1151 }
1152
1153 pub fn check_system_overload_at_execution(&self) -> bool {
1154 self.config
1155 .authority_overload_config
1156 .check_system_overload_at_execution
1157 }
1158
1159 pub(crate) fn check_system_overload(
1160 &self,
1161 consensus_adapter: &Arc<ConsensusAdapter>,
1162 tx_data: &SenderSignedData,
1163 do_authority_overload_check: bool,
1164 ) -> IotaResult {
1165 if do_authority_overload_check {
1166 self.check_authority_overload(tx_data).tap_err(|_| {
1167 self.update_overload_metrics("execution_queue");
1168 })?;
1169 }
1170 self.transaction_manager
1171 .check_execution_overload(self.overload_config(), tx_data)
1172 .tap_err(|_| {
1173 self.update_overload_metrics("execution_pending");
1174 })?;
1175 consensus_adapter.check_consensus_overload().tap_err(|_| {
1176 self.update_overload_metrics("consensus");
1177 })?;
1178
1179 let pending_tx_count = self
1180 .get_cache_commit()
1181 .approximate_pending_transaction_count();
1182 if pending_tx_count
1183 > self
1184 .config
1185 .execution_cache_config
1186 .writeback_cache
1187 .backpressure_threshold_for_rpc()
1188 {
1189 return Err(IotaError::ValidatorOverloadedRetryAfter {
1190 retry_after_secs: 10,
1191 });
1192 }
1193
1194 Ok(())
1195 }
1196
1197 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1198 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1199 return Ok(());
1200 }
1201
1202 let load_shedding_percentage = self
1203 .overload_info
1204 .load_shedding_percentage
1205 .load(Ordering::Relaxed);
1206 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1207 }
1208
1209 fn update_overload_metrics(&self, source: &str) {
1210 self.metrics
1211 .transaction_overload_sources
1212 .with_label_values(&[source])
1213 .inc();
1214 }
1215
1216 #[instrument(level = "trace", skip_all)]
1218 pub async fn execute_certificate(
1219 &self,
1220 certificate: &VerifiedCertificate,
1221 epoch_store: &Arc<AuthorityPerEpochStore>,
1222 ) -> IotaResult<TransactionEffects> {
1223 let _metrics_guard = if certificate.contains_shared_object() {
1224 self.metrics
1225 .execute_certificate_latency_shared_object
1226 .start_timer()
1227 } else {
1228 self.metrics
1229 .execute_certificate_latency_single_writer
1230 .start_timer()
1231 };
1232 trace!("execute_certificate");
1233
1234 self.metrics.total_cert_attempts.inc();
1235
1236 if !certificate.contains_shared_object() {
1237 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1242 }
1243
1244 epoch_store
1247 .within_alive_epoch(self.notify_read_effects(certificate))
1248 .await
1249 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1250 .and_then(|r| r)
1251 }
1252
1253 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1268 pub fn try_execute_immediately(
1269 &self,
1270 certificate: &VerifiedExecutableTransaction,
1271 expected_effects_digest: Option<TransactionEffectsDigest>,
1272 epoch_store: &Arc<AuthorityPerEpochStore>,
1273 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1274 let _scope = monitored_scope("Execution::try_execute_immediately");
1275 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1276
1277 let tx_digest = certificate.digest();
1278
1279 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1281
1282 if let Some(effects) = self
1285 .get_transaction_cache_reader()
1286 .try_get_executed_effects(tx_digest)?
1287 {
1288 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1289 assert_eq!(
1290 effects.digest(),
1291 expected_effects_digest_inner,
1292 "Unexpected effects digest for transaction {tx_digest}"
1293 );
1294 }
1295 tx_guard.release();
1296 return Ok((effects, None));
1297 }
1298
1299 let (tx_input_objects, per_authenticator_inputs) =
1300 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1301
1302 let expected_effects_digest =
1308 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1309
1310 self.process_certificate(
1311 tx_guard,
1312 certificate,
1313 tx_input_objects,
1314 per_authenticator_inputs,
1315 expected_effects_digest,
1316 epoch_store,
1317 )
1318 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1319 .tap_ok(
1320 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1321 )
1322 }
1323
1324 pub fn read_objects_for_execution(
1325 &self,
1326 tx_lock: &CertLockGuard,
1327 certificate: &VerifiedExecutableTransaction,
1328 epoch_store: &Arc<AuthorityPerEpochStore>,
1329 ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1330 let _scope = monitored_scope("Execution::load_input_objects");
1331 let _metrics_guard = self
1332 .metrics
1333 .execution_load_input_objects_latency
1334 .start_timer();
1335
1336 let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1337
1338 let input_objects = self.input_loader.read_objects_for_execution(
1339 epoch_store,
1340 &certificate.key(),
1341 tx_lock,
1342 &input_objects,
1343 epoch_store.epoch(),
1344 )?;
1345
1346 certificate.split_input_objects_into_groups_for_reading(input_objects)
1347 }
1348
1349 pub fn try_execute_for_test(
1353 &self,
1354 certificate: &VerifiedCertificate,
1355 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1356 let epoch_store = self.epoch_store_for_testing();
1357 let (effects, execution_error_opt) = self.try_execute_immediately(
1358 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1359 None,
1360 &epoch_store,
1361 )?;
1362 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1363 Ok((signed_effects, execution_error_opt))
1364 }
1365
1366 pub fn execute_for_test(
1368 &self,
1369 certificate: &VerifiedCertificate,
1370 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1371 self.try_execute_for_test(certificate)
1372 .expect("try_execute_for_test should not fail")
1373 }
1374
1375 pub async fn notify_read_effects(
1376 &self,
1377 certificate: &VerifiedCertificate,
1378 ) -> IotaResult<TransactionEffects> {
1379 self.get_transaction_cache_reader()
1380 .try_notify_read_executed_effects(&[*certificate.digest()])
1381 .await
1382 .map(|mut r| r.pop().expect("must return correct number of effects"))
1383 }
1384
1385 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1386 self.get_object_cache_reader()
1387 .try_check_owned_objects_are_live(owned_object_refs)
1388 }
1389
1390 pub(crate) fn debug_dump_transaction_state(
1395 &self,
1396 tx_digest: &TransactionDigest,
1397 effects: &TransactionEffects,
1398 expected_effects_digest: TransactionEffectsDigest,
1399 inner_temporary_store: &InnerTemporaryStore,
1400 certificate: &VerifiedExecutableTransaction,
1401 debug_dump_config: &StateDebugDumpConfig,
1402 ) -> IotaResult<PathBuf> {
1403 let dump_dir = debug_dump_config
1406 .dump_file_directory
1407 .as_ref()
1408 .cloned()
1409 .unwrap_or(std::env::temp_dir());
1410 let epoch_store = self.load_epoch_store_one_call_per_task();
1411
1412 NodeStateDump::new(
1413 tx_digest,
1414 effects,
1415 expected_effects_digest,
1416 self.get_object_store().as_ref(),
1417 &epoch_store,
1418 inner_temporary_store,
1419 certificate,
1420 )?
1421 .write_to_file(&dump_dir)
1422 .map_err(|e| IotaError::FileIO(e.to_string()))
1423 }
1424
1425 #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?certificate.digest(), sender = ?certificate.data().transaction_data().gas_owner().to_string()))]
1426 pub(crate) fn process_certificate(
1427 &self,
1428 tx_guard: CertTxGuard,
1429 certificate: &VerifiedExecutableTransaction,
1430 tx_input_objects: InputObjects,
1431 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1432 expected_effects_digest: Option<TransactionEffectsDigest>,
1433 epoch_store: &Arc<AuthorityPerEpochStore>,
1434 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1435 let process_certificate_start_time = tokio::time::Instant::now();
1436 let digest = *certificate.digest();
1437
1438 let _scope = monitored_scope("Execution::process_certificate");
1439
1440 fail_point_if!("correlated-crash-process-certificate", || {
1441 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1442 iota_simulator::task::kill_current_node(None);
1443 }
1444 });
1445
1446 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1447 let execution_guard = match execution_guard {
1453 Ok(execution_guard) => execution_guard,
1454 Err(err) => {
1455 tx_guard.release();
1456 return Err(err);
1457 }
1458 };
1459 if *execution_guard != epoch_store.epoch() {
1463 tx_guard.release();
1464 info!("The epoch of the execution_guard doesn't match the epoch store");
1465 return Err(IotaError::WrongEpoch {
1466 expected_epoch: epoch_store.epoch(),
1467 actual_epoch: *execution_guard,
1468 });
1469 }
1470
1471 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1477 &execution_guard,
1478 certificate,
1479 tx_input_objects,
1480 per_authenticator_inputs,
1481 epoch_store,
1482 ) {
1483 Err(e) => {
1484 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1485 tx_guard.release();
1486 return Err(e);
1487 }
1488 Ok(res) => res,
1489 };
1490
1491 if let Some(expected_effects_digest) = expected_effects_digest {
1492 if effects.digest() != expected_effects_digest {
1493 match self.debug_dump_transaction_state(
1495 &digest,
1496 &effects,
1497 expected_effects_digest,
1498 &inner_temporary_store,
1499 certificate,
1500 &self.config.state_debug_dump_config,
1501 ) {
1502 Ok(out_path) => {
1503 info!(
1504 "Dumped node state for transaction {} to {}",
1505 digest,
1506 out_path.as_path().display().to_string()
1507 );
1508 }
1509 Err(e) => {
1510 error!("Error dumping state for transaction {}: {e}", digest);
1511 }
1512 }
1513 error!(
1514 tx_digest = ?digest,
1515 ?expected_effects_digest,
1516 actual_effects = ?effects,
1517 "fork detected!"
1518 );
1519 panic!(
1520 "Transaction {} is expected to have effects digest {}, but got {}!",
1521 digest,
1522 expected_effects_digest,
1523 effects.digest(),
1524 );
1525 }
1526 }
1527
1528 fail_point!("crash");
1529
1530 self.commit_certificate(
1531 certificate,
1532 inner_temporary_store,
1533 &effects,
1534 tx_guard,
1535 execution_guard,
1536 epoch_store,
1537 )?;
1538
1539 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1540 if elapsed > 0.0 {
1541 self.metrics
1542 .execution_gas_latency_ratio
1543 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1544 };
1545 Ok((effects, execution_error_opt))
1546 }
1547
1548 pub async fn reconfigure_traffic_control(
1549 &self,
1550 params: TrafficControlReconfigParams,
1551 ) -> Result<TrafficControlReconfigParams, IotaError> {
1552 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1553 traffic_controller.admin_reconfigure(params).await
1554 } else {
1555 Err(IotaError::InvalidAdminRequest(
1556 "Traffic controller is not configured on this node".to_string(),
1557 ))
1558 }
1559 }
1560
1561 #[instrument(level = "trace", skip_all)]
1562 fn commit_certificate(
1563 &self,
1564 certificate: &VerifiedExecutableTransaction,
1565 inner_temporary_store: InnerTemporaryStore,
1566 effects: &TransactionEffects,
1567 tx_guard: CertTxGuard,
1568 _execution_guard: ExecutionLockReadGuard<'_>,
1569 epoch_store: &Arc<AuthorityPerEpochStore>,
1570 ) -> IotaResult {
1571 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1572 monitored_scope("Execution::commit_certificate");
1573 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1574
1575 let tx_key = certificate.key();
1576 let tx_digest = certificate.digest();
1577 let input_object_count = inner_temporary_store.input_objects.len();
1578 let shared_object_count = effects.input_shared_objects().len();
1579
1580 let output_keys = inner_temporary_store.get_output_keys(effects);
1581
1582 let _ = self
1584 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1585 .tap_err(|e| {
1586 self.metrics.post_processing_total_failures.inc();
1587 error!(?tx_digest, "tx post processing failed: {e}");
1588 });
1589
1590 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1594
1595 fail_point!("crash");
1597
1598 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1599 certificate.clone().into_unsigned(),
1600 effects.clone(),
1601 inner_temporary_store,
1602 );
1603 self.get_cache_writer()
1604 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1605
1606 if certificate.transaction_data().is_end_of_epoch_tx() {
1607 self.get_object_cache_reader()
1610 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1611 }
1612
1613 tx_guard.commit_tx();
1615
1616 self.transaction_manager
1620 .notify_commit(tx_digest, output_keys, epoch_store);
1621
1622 self.update_metrics(certificate, input_object_count, shared_object_count);
1623
1624 Ok(())
1625 }
1626
1627 fn update_metrics(
1628 &self,
1629 certificate: &VerifiedExecutableTransaction,
1630 input_object_count: usize,
1631 shared_object_count: usize,
1632 ) {
1633 if certificate.has_upgraded_multisig() {
1635 self.metrics.multisig_sig_count.inc();
1636 }
1637
1638 self.metrics.total_effects.inc();
1639 self.metrics.total_certs.inc();
1640
1641 if shared_object_count > 0 {
1642 self.metrics.shared_obj_tx.inc();
1643 }
1644
1645 if certificate.is_sponsored_tx() {
1646 self.metrics.sponsored_tx.inc();
1647 }
1648
1649 self.metrics
1650 .num_input_objs
1651 .observe(input_object_count as f64);
1652 self.metrics
1653 .num_shared_objects
1654 .observe(shared_object_count as f64);
1655 self.metrics.batch_size.observe(
1656 certificate
1657 .data()
1658 .intent_message()
1659 .value
1660 .kind()
1661 .num_commands() as f64,
1662 );
1663 }
1664
1665 #[instrument(level = "trace", skip_all)]
1677 fn prepare_certificate(
1678 &self,
1679 _execution_guard: &ExecutionLockReadGuard<'_>,
1680 certificate: &VerifiedExecutableTransaction,
1681 tx_input_objects: InputObjects,
1682 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1683 epoch_store: &Arc<AuthorityPerEpochStore>,
1684 ) -> IotaResult<(
1685 InnerTemporaryStore,
1686 TransactionEffects,
1687 Option<ExecutionError>,
1688 )> {
1689 let _scope = monitored_scope("Execution::prepare_certificate");
1690 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1691 let prepare_certificate_start_time = tokio::time::Instant::now();
1692
1693 let protocol_config = epoch_store.protocol_config();
1694
1695 let reference_gas_price = epoch_store.reference_gas_price();
1696
1697 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1698 let epoch_start_timestamp = epoch_store
1699 .epoch_start_config()
1700 .epoch_data()
1701 .epoch_start_timestamp();
1702
1703 let backing_store = self.get_backing_store().as_ref();
1704
1705 let tx_digest = *certificate.digest();
1706
1707 let tx_data = certificate.data().transaction_data();
1710 tx_data.validity_check(protocol_config)?;
1711
1712 let (kind, signer, gas_data) = tx_data.execution_parts();
1713
1714 let move_authenticators = certificate.move_authenticators();
1715
1716 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1717 let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1718 .is_empty()
1719 {
1720 let (tx_gas_status, tx_checked_input_objects) =
1725 iota_transaction_checks::check_certificate_input(
1726 certificate,
1727 tx_input_objects,
1728 protocol_config,
1729 reference_gas_price,
1730 )?;
1731
1732 let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1733 self.check_owned_locks(&owned_object_refs)?;
1734 epoch_store.executor().execute_transaction_to_effects(
1735 backing_store,
1736 protocol_config,
1737 self.metrics.limits_metrics.clone(),
1738 self.config
1741 .expensive_safety_check_config
1742 .enable_deep_per_tx_iota_conservation_check(),
1743 self.config.certificate_deny_config.certificate_deny_set(),
1744 &epoch_id,
1745 epoch_start_timestamp,
1746 tx_checked_input_objects,
1747 gas_data,
1748 tx_gas_status,
1749 kind,
1750 signer,
1751 tx_digest,
1752 &mut None,
1753 )
1754 } else {
1755 debug_assert_eq!(
1761 move_authenticators.len(),
1762 per_authenticator_inputs.len(),
1763 "Move authenticators amount must match the number of authenticator inputs"
1764 );
1765
1766 let per_authenticator_inputs = move_authenticators
1767 .iter()
1768 .zip(per_authenticator_inputs)
1769 .map(
1770 |(move_authenticator, (authenticator_input_objects, account_object))| {
1771 let (
1774 auth_account_object_id,
1775 auth_account_object_seq_number,
1776 auth_account_object_digest,
1777 ) = move_authenticator.object_to_authenticate_components()?;
1778
1779 let signer = move_authenticator.address()?;
1780
1781 let authenticator_function_ref_for_execution = self.check_move_account(
1782 auth_account_object_id,
1783 auth_account_object_seq_number,
1784 auth_account_object_digest,
1785 account_object,
1786 &signer,
1787 )?;
1788
1789 Ok((
1790 authenticator_input_objects,
1791 authenticator_function_ref_for_execution,
1792 ))
1793 },
1794 )
1795 .collect::<IotaResult<Vec<_>>>()?;
1796
1797 let per_authenticator_input_objects = per_authenticator_inputs
1798 .iter()
1799 .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1800 .collect::<Vec<_>>();
1801
1802 let tx_data_bytes =
1804 bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1805
1806 let (sender_auth_digest, sponsor_auth_digest) =
1807 certificate.data().compute_auth_digests()?;
1808
1809 let authenticator_gas_budget = protocol_config.max_auth_gas();
1814 let (
1815 gas_status,
1816 per_authenticator_checked_input_objects,
1817 authenticator_and_tx_checked_input_objects,
1818 ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1819 certificate,
1820 tx_input_objects,
1821 per_authenticator_input_objects,
1822 authenticator_gas_budget,
1823 protocol_config,
1824 reference_gas_price,
1825 )?;
1826
1827 debug_assert_eq!(
1828 move_authenticators.len(),
1829 per_authenticator_checked_input_objects.len(),
1830 "Move authenticators amount must match the number of checked authenticator inputs"
1831 );
1832
1833 let move_authenticators = move_authenticators
1834 .into_iter()
1835 .zip(per_authenticator_inputs)
1836 .zip(per_authenticator_checked_input_objects)
1837 .map(
1838 |(
1839 (move_authenticator, (_, authenticator_function_ref_for_execution)),
1840 authenticator_checked_input_objects,
1841 )| {
1842 (
1843 move_authenticator.to_owned(),
1844 authenticator_function_ref_for_execution,
1845 authenticator_checked_input_objects,
1846 )
1847 },
1848 )
1849 .collect::<Vec<_>>();
1850
1851 let owned_object_refs = authenticator_and_tx_checked_input_objects
1852 .inner()
1853 .filter_owned_objects();
1854 self.check_owned_locks(&owned_object_refs)?;
1855
1856 let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
1857 extract_auth_fun_refs(signer, gas_data.owner, |address| {
1858 move_authenticators
1859 .iter()
1860 .find(|t| t.0.address().ok().as_ref() == Some(&address))
1861 .map(|t| t.1.authenticator_function_ref.clone())
1862 });
1863
1864 let auth_context_data = AuthContextData {
1865 transaction_data_bytes: tx_data_bytes,
1866 sender_auth_digest,
1867 sponsor_auth_digest,
1868 sender_authenticator_function_ref,
1869 sponsor_authenticator_function_ref,
1870 };
1871
1872 epoch_store
1873 .executor()
1874 .authenticate_then_execute_transaction_to_effects(
1875 backing_store,
1876 protocol_config,
1877 self.metrics.limits_metrics.clone(),
1878 self.config
1879 .expensive_safety_check_config
1880 .enable_deep_per_tx_iota_conservation_check(),
1881 self.config.certificate_deny_config.certificate_deny_set(),
1882 &epoch_id,
1883 epoch_start_timestamp,
1884 gas_data,
1885 gas_status,
1886 move_authenticators,
1887 authenticator_and_tx_checked_input_objects,
1888 kind,
1889 signer,
1890 tx_digest,
1891 auth_context_data,
1892 &mut None,
1893 )
1894 };
1895
1896 fail_point_if!("cp_execution_nondeterminism", || {
1897 #[cfg(msim)]
1898 self.create_fail_state(certificate, epoch_store, &mut effects);
1899 });
1900
1901 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1902 if elapsed > 0.0 {
1903 self.metrics
1904 .prepare_cert_gas_latency_ratio
1905 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1906 }
1907
1908 Ok((inner_temp_store, effects, execution_error_opt.err()))
1909 }
1910
1911 pub fn prepare_certificate_for_benchmark(
1912 &self,
1913 certificate: &VerifiedExecutableTransaction,
1914 input_objects: InputObjects,
1915 epoch_store: &Arc<AuthorityPerEpochStore>,
1916 ) -> IotaResult<(
1917 InnerTemporaryStore,
1918 TransactionEffects,
1919 Option<ExecutionError>,
1920 )> {
1921 let lock = RwLock::new(epoch_store.epoch());
1922 let execution_guard = lock.try_read().unwrap();
1923
1924 self.prepare_certificate(
1925 &execution_guard,
1926 certificate,
1927 input_objects,
1928 vec![],
1929 epoch_store,
1930 )
1931 }
1932
1933 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1936 #[allow(clippy::type_complexity)]
1937 pub fn dry_exec_transaction(
1938 &self,
1939 transaction: TransactionData,
1940 transaction_digest: TransactionDigest,
1941 ) -> IotaResult<(
1942 DryRunTransactionBlockResponse,
1943 BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1944 TransactionEffects,
1945 Option<ObjectId>,
1946 )> {
1947 let epoch_store = self.load_epoch_store_one_call_per_task();
1948 if !self.is_fullnode(&epoch_store) {
1949 return Err(IotaError::UnsupportedFeature {
1950 error: "dry-exec is only supported on fullnodes".to_string(),
1951 });
1952 }
1953
1954 if transaction.kind().is_system() {
1955 return Err(IotaError::UnsupportedFeature {
1956 error: "dry-exec does not support system transactions".to_string(),
1957 });
1958 }
1959
1960 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1961 }
1962
1963 #[allow(clippy::type_complexity)]
1964 pub fn dry_exec_transaction_for_benchmark(
1965 &self,
1966 transaction: TransactionData,
1967 transaction_digest: TransactionDigest,
1968 ) -> IotaResult<(
1969 DryRunTransactionBlockResponse,
1970 BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1971 TransactionEffects,
1972 Option<ObjectId>,
1973 )> {
1974 let epoch_store = self.load_epoch_store_one_call_per_task();
1975 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1976 }
1977
1978 #[instrument(level = "trace", skip_all)]
1979 #[allow(clippy::type_complexity)]
1980 fn dry_exec_transaction_impl(
1981 &self,
1982 epoch_store: &AuthorityPerEpochStore,
1983 transaction: TransactionData,
1984 transaction_digest: TransactionDigest,
1985 ) -> IotaResult<(
1986 DryRunTransactionBlockResponse,
1987 BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1988 TransactionEffects,
1989 Option<ObjectId>,
1990 )> {
1991 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1993
1994 let input_object_kinds = transaction.input_objects()?;
1995 let receiving_object_refs = transaction.receiving_objects();
1996
1997 iota_transaction_checks::deny::check_transaction_for_signing(
1998 &transaction,
1999 &[],
2000 &input_object_kinds,
2001 &receiving_object_refs,
2002 &self.config.transaction_deny_config,
2003 self.get_backing_package_store().as_ref(),
2004 )?;
2005
2006 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2007 None,
2009 &input_object_kinds,
2010 &receiving_object_refs,
2011 epoch_store.epoch(),
2012 )?;
2013
2014 let mut transaction = transaction;
2016 let reference_gas_price = epoch_store.reference_gas_price();
2017 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2018 let sender = transaction.gas_owner();
2019 let gas_object_id = ObjectId::random();
2020 let gas_object = Object::new_move(
2021 MoveObject::new_gas_coin(
2022 OBJECT_START_VERSION,
2023 gas_object_id,
2024 SIMULATION_GAS_COIN_VALUE,
2025 ),
2026 Owner::Address(sender),
2027 TransactionDigest::GENESIS_MARKER,
2028 );
2029 let gas_object_ref = gas_object.compute_object_reference();
2030 transaction.gas_data_mut().objects = vec![gas_object_ref];
2032 (
2033 iota_transaction_checks::check_transaction_input_with_given_gas(
2034 epoch_store.protocol_config(),
2035 reference_gas_price,
2036 &transaction,
2037 input_objects,
2038 receiving_objects,
2039 gas_object,
2040 &self.metrics.bytecode_verifier_metrics,
2041 &self.config.verifier_signing_config,
2042 )?,
2043 Some(gas_object_id),
2044 )
2045 } else {
2046 let authenticator_gas_budget = 0;
2049
2050 (
2051 iota_transaction_checks::check_transaction_input(
2052 epoch_store.protocol_config(),
2053 reference_gas_price,
2054 &transaction,
2055 input_objects,
2056 &receiving_objects,
2057 &self.metrics.bytecode_verifier_metrics,
2058 &self.config.verifier_signing_config,
2059 authenticator_gas_budget,
2060 )?,
2061 None,
2062 )
2063 };
2064
2065 let protocol_config = epoch_store.protocol_config();
2066 let (kind, signer, gas_data) = transaction.execution_parts();
2067
2068 let silent = true;
2069 let executor = iota_execution::executor(protocol_config, silent, None)
2070 .expect("Creating an executor should not fail here");
2071
2072 let expensive_checks = false;
2073 let (inner_temp_store, _, effects, execution_error) = executor
2074 .execute_transaction_to_effects(
2075 self.get_backing_store().as_ref(),
2076 protocol_config,
2077 self.metrics.limits_metrics.clone(),
2078 expensive_checks,
2079 self.config.certificate_deny_config.certificate_deny_set(),
2080 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2081 epoch_store
2082 .epoch_start_config()
2083 .epoch_data()
2084 .epoch_start_timestamp(),
2085 checked_input_objects,
2086 gas_data,
2087 gas_status,
2088 kind,
2089 signer,
2090 transaction_digest,
2091 &mut None,
2092 );
2093 let tx_digest = *effects.transaction_digest();
2094
2095 let module_cache =
2096 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2097
2098 let mut layout_resolver =
2099 epoch_store
2100 .executor()
2101 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2102 &inner_temp_store,
2103 self.get_backing_package_store(),
2104 )));
2105 let object_changes = Vec::new();
2107
2108 let balance_changes = Vec::new();
2110
2111 let written_with_kind = effects
2112 .created()
2113 .into_iter()
2114 .map(|(oref, _)| (oref, WriteKind::Create))
2115 .chain(
2116 effects
2117 .unwrapped()
2118 .into_iter()
2119 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2120 )
2121 .chain(
2122 effects
2123 .mutated()
2124 .into_iter()
2125 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2126 )
2127 .map(|(oref, kind)| {
2128 let obj = inner_temp_store.written.get(&oref.object_id).unwrap();
2129 (oref.object_id, (oref, obj.clone(), kind))
2131 })
2132 .collect();
2133
2134 let execution_error_source = execution_error
2135 .as_ref()
2136 .err()
2137 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2138
2139 Ok((
2140 DryRunTransactionBlockResponse {
2141 suggested_gas_price: self
2143 .congestion_tracker
2144 .get_prediction_suggested_gas_price(&transaction),
2145 input: IotaTransactionBlockData::try_from_with_module_cache(
2146 transaction,
2147 &module_cache,
2148 tx_digest,
2149 )
2150 .map_err(|e| IotaError::TransactionSerialization {
2151 error: format!(
2152 "Failed to convert transaction to IotaTransactionBlockData: {e}",
2153 ),
2154 })?, effects: effects.clone().try_into()?,
2156 events: IotaTransactionBlockEvents::try_from(
2157 inner_temp_store.events.clone(),
2158 tx_digest,
2159 None,
2160 layout_resolver.as_mut(),
2161 )?,
2162 object_changes,
2163 balance_changes,
2164 execution_error_source,
2165 },
2166 written_with_kind,
2167 effects,
2168 mock_gas,
2169 ))
2170 }
2171
2172 pub fn simulate_transaction(
2173 &self,
2174 mut transaction: TransactionData,
2175 checks: VmChecks,
2176 ) -> IotaResult<SimulateTransactionResult> {
2177 if transaction.kind().is_system() {
2178 return Err(IotaError::UnsupportedFeature {
2179 error: "simulate does not support system transactions".to_string(),
2180 });
2181 }
2182
2183 let epoch_store = self.load_epoch_store_one_call_per_task();
2184 if !self.is_fullnode(&epoch_store) {
2185 return Err(IotaError::UnsupportedFeature {
2186 error: "simulate is only supported on fullnodes".to_string(),
2187 });
2188 }
2189
2190 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2194
2195 let input_object_kinds = transaction.input_objects()?;
2196 let receiving_object_refs = transaction.receiving_objects();
2197
2198 iota_transaction_checks::deny::check_transaction_for_signing(
2201 &transaction,
2202 &[],
2203 &input_object_kinds,
2204 &receiving_object_refs,
2205 &self.config.transaction_deny_config,
2206 self.get_backing_package_store().as_ref(),
2207 )?;
2208
2209 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2211 None,
2213 &input_object_kinds,
2214 &receiving_object_refs,
2215 epoch_store.epoch(),
2216 )?;
2217
2218 let mock_gas_id = if transaction.gas().is_empty() {
2220 let mock_gas_object = Object::new_move(
2221 MoveObject::new_gas_coin(
2222 OBJECT_START_VERSION,
2223 ObjectId::MAX,
2224 SIMULATION_GAS_COIN_VALUE,
2225 ),
2226 Owner::Address(transaction.gas_data().owner),
2227 TransactionDigest::GENESIS_MARKER,
2228 );
2229 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2230 transaction.gas_data_mut().objects = vec![mock_gas_object_ref];
2231 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2232 Some(mock_gas_object.id())
2233 } else {
2234 None
2235 };
2236
2237 let protocol_config = epoch_store.protocol_config();
2238
2239 let authenticator_gas_budget = 0;
2242
2243 let (gas_status, checked_input_objects) = if checks.enabled() {
2246 iota_transaction_checks::check_transaction_input(
2247 protocol_config,
2248 epoch_store.reference_gas_price(),
2249 &transaction,
2250 input_objects,
2251 &receiving_objects,
2252 &self.metrics.bytecode_verifier_metrics,
2253 &self.config.verifier_signing_config,
2254 authenticator_gas_budget,
2255 )?
2256 } else {
2257 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2258 protocol_config,
2259 transaction.kind(),
2260 input_objects,
2261 receiving_objects,
2262 )?;
2263 let gas_status = IotaGasStatus::new(
2264 transaction.gas_budget(),
2265 transaction.gas_price(),
2266 epoch_store.reference_gas_price(),
2267 protocol_config,
2268 )?;
2269
2270 (gas_status, checked_input_objects)
2271 };
2272
2273 let executor = iota_execution::executor(
2275 protocol_config,
2276 true, None,
2278 )
2279 .expect("Creating an executor should not fail here");
2280
2281 let (kind, signer, gas_data) = transaction.execution_parts();
2283 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2284 self.get_backing_store().as_ref(),
2285 protocol_config,
2286 self.metrics.limits_metrics.clone(),
2287 false, self.config.certificate_deny_config.certificate_deny_set(),
2289 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2290 epoch_store
2291 .epoch_start_config()
2292 .epoch_data()
2293 .epoch_start_timestamp(),
2294 checked_input_objects,
2295 gas_data,
2296 gas_status,
2297 kind,
2298 signer,
2299 transaction.digest(),
2300 checks.disabled(),
2301 );
2302
2303 Ok(SimulateTransactionResult {
2306 input_objects: inner_temp_store.input_objects,
2307 output_objects: inner_temp_store.written,
2308 events: effects.events_digest().map(|_| inner_temp_store.events),
2309 effects,
2310 execution_result,
2311 suggested_gas_price: self
2312 .congestion_tracker
2313 .get_prediction_suggested_gas_price(&transaction),
2314 mock_gas_id,
2315 })
2316 }
2317
2318 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2323 pub async fn dev_inspect_transaction_block(
2324 &self,
2325 sender: IotaAddress,
2326 transaction_kind: TransactionKind,
2327 gas_price: Option<u64>,
2328 gas_budget: Option<u64>,
2329 gas_sponsor: Option<IotaAddress>,
2330 gas_objects: Option<Vec<ObjectRef>>,
2331 show_raw_txn_data_and_effects: Option<bool>,
2332 skip_checks: Option<bool>,
2333 ) -> IotaResult<DevInspectResults> {
2334 let epoch_store = self.load_epoch_store_one_call_per_task();
2335
2336 if !self.is_fullnode(&epoch_store) {
2337 return Err(IotaError::UnsupportedFeature {
2338 error: "dev-inspect is only supported on fullnodes".to_string(),
2339 });
2340 }
2341
2342 if transaction_kind.is_system() {
2343 return Err(IotaError::UnsupportedFeature {
2344 error: "system transactions are not supported".to_string(),
2345 });
2346 }
2347
2348 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2349 let skip_checks = skip_checks.unwrap_or(true);
2350 let reference_gas_price = epoch_store.reference_gas_price();
2351 let protocol_config = epoch_store.protocol_config();
2352 let max_tx_gas = protocol_config.max_tx_gas();
2353
2354 let price = gas_price.unwrap_or(reference_gas_price);
2355 let budget = gas_budget.unwrap_or(max_tx_gas);
2356 let owner = gas_sponsor.unwrap_or(sender);
2357 let payment = gas_objects.unwrap_or_default();
2360 let mut transaction = TransactionData::V1(TransactionDataV1 {
2361 kind: transaction_kind.clone(),
2362 sender,
2363 gas_payment: GasData {
2364 objects: payment,
2365 owner,
2366 price,
2367 budget,
2368 },
2369 expiration: TransactionExpiration::None,
2370 });
2371
2372 let raw_txn_data = if show_raw_txn_data_and_effects {
2373 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2374 error: "Failed to serialize transaction during dev inspect".to_string(),
2375 })?
2376 } else {
2377 vec![]
2378 };
2379
2380 transaction.validity_check_no_gas_check(protocol_config)?;
2381
2382 let input_object_kinds = transaction.input_objects()?;
2383 let receiving_object_refs = transaction.receiving_objects();
2384
2385 iota_transaction_checks::deny::check_transaction_for_signing(
2386 &transaction,
2387 &[],
2388 &input_object_kinds,
2389 &receiving_object_refs,
2390 &self.config.transaction_deny_config,
2391 self.get_backing_package_store().as_ref(),
2392 )?;
2393
2394 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2395 None,
2397 &input_object_kinds,
2398 &receiving_object_refs,
2399 epoch_store.epoch(),
2400 )?;
2401
2402 let (gas_status, checked_input_objects) = if skip_checks {
2403 if transaction.gas().is_empty() {
2408 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2410 SIMULATION_GAS_COIN_VALUE,
2411 transaction.gas_owner(),
2412 );
2413 let gas_object_ref = dummy_gas_object.compute_object_reference();
2414 transaction.gas_data_mut().objects = vec![gas_object_ref];
2415 input_objects.push(ObjectReadResult::new(
2416 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2417 dummy_gas_object.into(),
2418 ));
2419 }
2420 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2421 protocol_config,
2422 &transaction_kind,
2423 input_objects,
2424 receiving_objects,
2425 )?;
2426 let gas_status = IotaGasStatus::new(
2427 max_tx_gas,
2428 transaction.gas_price(),
2429 reference_gas_price,
2430 protocol_config,
2431 )?;
2432
2433 (gas_status, checked_input_objects)
2434 } else {
2435 if transaction.gas().is_empty() {
2439 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2441 SIMULATION_GAS_COIN_VALUE,
2442 transaction.gas_owner(),
2443 );
2444 let gas_object_ref = dummy_gas_object.compute_object_reference();
2445 transaction.gas_data_mut().objects = vec![gas_object_ref];
2446 iota_transaction_checks::check_transaction_input_with_given_gas(
2447 epoch_store.protocol_config(),
2448 reference_gas_price,
2449 &transaction,
2450 input_objects,
2451 receiving_objects,
2452 dummy_gas_object,
2453 &self.metrics.bytecode_verifier_metrics,
2454 &self.config.verifier_signing_config,
2455 )?
2456 } else {
2457 let authenticator_gas_budget = 0;
2460
2461 iota_transaction_checks::check_transaction_input(
2462 epoch_store.protocol_config(),
2463 reference_gas_price,
2464 &transaction,
2465 input_objects,
2466 &receiving_objects,
2467 &self.metrics.bytecode_verifier_metrics,
2468 &self.config.verifier_signing_config,
2469 authenticator_gas_budget,
2470 )?
2471 }
2472 };
2473
2474 let executor = iota_execution::executor(protocol_config, true, None)
2475 .expect("Creating an executor should not fail here");
2476 let gas_data = transaction.gas_data().clone();
2477 let intent_msg = IntentMessage::new(
2478 Intent {
2479 version: IntentVersion::V0,
2480 scope: IntentScope::TransactionData,
2481 app_id: IntentAppId::Iota,
2482 },
2483 transaction,
2484 );
2485 let transaction_digest = TransactionDigest::new(intent_msg.value.digest().into_inner());
2486 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2487 self.get_backing_store().as_ref(),
2488 protocol_config,
2489 self.metrics.limits_metrics.clone(),
2490 false,
2492 self.config.certificate_deny_config.certificate_deny_set(),
2493 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2494 epoch_store
2495 .epoch_start_config()
2496 .epoch_data()
2497 .epoch_start_timestamp(),
2498 checked_input_objects,
2499 gas_data,
2500 gas_status,
2501 transaction_kind,
2502 sender,
2503 transaction_digest,
2504 skip_checks,
2505 );
2506
2507 let raw_effects = if show_raw_txn_data_and_effects {
2508 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2509 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2510 })?
2511 } else {
2512 vec![]
2513 };
2514
2515 let mut layout_resolver =
2516 epoch_store
2517 .executor()
2518 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2519 &inner_temp_store,
2520 self.get_backing_package_store(),
2521 )));
2522
2523 DevInspectResults::new(
2524 effects,
2525 inner_temp_store.events.clone(),
2526 execution_result,
2527 raw_txn_data,
2528 raw_effects,
2529 layout_resolver.as_mut(),
2530 )
2531 }
2532
2533 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2535 let epoch_store = self.epoch_store_for_testing();
2536 Ok(epoch_store.reference_gas_price())
2537 }
2538
2539 #[instrument(level = "trace", skip_all)]
2540 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2541 self.get_transaction_cache_reader()
2542 .try_is_tx_already_executed(digest)
2543 }
2544
2545 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2547 self.try_is_tx_already_executed(digest)
2548 .expect("storage access failed")
2549 }
2550
2551 #[instrument(level = "debug", skip_all, err)]
2553 fn index_tx(
2554 &self,
2555 indexes: &IndexStore,
2556 digest: &TransactionDigest,
2557 cert: &VerifiedExecutableTransaction,
2559 effects: &TransactionEffects,
2560 events: &TransactionEvents,
2561 timestamp_ms: u64,
2562 tx_coins: Option<TxCoins>,
2563 written: &WrittenObjects,
2564 inner_temporary_store: &InnerTemporaryStore,
2565 ) -> IotaResult<u64> {
2566 let changes = self
2567 .process_object_index(effects, written, inner_temporary_store)
2568 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2569
2570 indexes.index_tx(
2571 cert.data().intent_message().value.sender(),
2572 cert.data()
2573 .intent_message()
2574 .value
2575 .input_objects()?
2576 .iter()
2577 .map(|o| o.object_id()),
2578 effects
2579 .all_changed_objects()
2580 .into_iter()
2581 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2582 cert.data()
2583 .intent_message()
2584 .value
2585 .move_calls()
2586 .into_iter()
2587 .map(|(package, module, function)| {
2588 (*package, module.to_owned(), function.to_owned())
2589 }),
2590 events,
2591 changes,
2592 digest,
2593 timestamp_ms,
2594 tx_coins,
2595 )
2596 }
2597
2598 #[cfg(msim)]
2599 fn create_fail_state(
2600 &self,
2601 certificate: &VerifiedExecutableTransaction,
2602 epoch_store: &Arc<AuthorityPerEpochStore>,
2603 effects: &mut TransactionEffects,
2604 ) {
2605 use std::cell::RefCell;
2606
2607 use iota_types::effects::TransactionEffectsAPIForTesting;
2608 thread_local! {
2609 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2610 }
2611 if !certificate.data().intent_message().value.is_system_tx() {
2612 let committee = epoch_store.committee();
2613 let cur_stake = (**committee).weight(&self.name);
2614 if cur_stake > 0 {
2615 FAIL_STATE.with_borrow_mut(|fail_state| {
2616 if fail_state.0 < committee.validity_threshold() {
2618 fail_state.0 += cur_stake;
2619 fail_state.1.insert(self.name);
2620 }
2621
2622 if fail_state.1.contains(&self.name) {
2623 info!("cp_exec failing tx");
2624 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2625 }
2626 });
2627 }
2628 }
2629 }
2630
2631 fn process_object_index(
2632 &self,
2633 effects: &TransactionEffects,
2634 written: &WrittenObjects,
2635 inner_temporary_store: &InnerTemporaryStore,
2636 ) -> IotaResult<ObjectIndexChanges> {
2637 let epoch_store = self.load_epoch_store_one_call_per_task();
2638 let mut layout_resolver =
2639 epoch_store
2640 .executor()
2641 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2642 inner_temporary_store,
2643 self.get_backing_package_store(),
2644 )));
2645
2646 let modified_at_version = effects
2647 .modified_at_versions()
2648 .into_iter()
2649 .collect::<HashMap<_, _>>();
2650
2651 let tx_digest = effects.transaction_digest();
2652 let mut deleted_owners = vec![];
2653 let mut deleted_dynamic_fields = vec![];
2654 for object_ref in effects.deleted().into_iter().chain(effects.wrapped()) {
2655 let old_version = modified_at_version.get(&object_ref.object_id).unwrap();
2656 match self.get_owner_at_version(&object_ref.object_id, *old_version).unwrap_or_else(
2659 |e| panic!("tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {} at version {old_version:?}. Err: {e:?}",object_ref.object_id)
2660 ) {
2661 Owner::Address(addr) => deleted_owners.push((addr, object_ref.object_id)),
2662 Owner::Object(object_id) => {
2663 deleted_dynamic_fields.push((object_id, object_ref.object_id))
2664 }
2665 _ => {}
2666 }
2667 }
2668
2669 let mut new_owners = vec![];
2670 let mut new_dynamic_fields = vec![];
2671
2672 for (oref, owner, kind) in effects.all_changed_objects() {
2673 let id = &oref.object_id;
2674 if let WriteKind::Mutate = kind {
2677 let Some(old_version) = modified_at_version.get(id) else {
2678 panic!(
2679 "tx_digest={tx_digest}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2680 );
2681 };
2682 let Some(old_object) = self
2685 .get_object_store()
2686 .try_get_object_by_key(id, *old_version)?
2687 else {
2688 panic!(
2689 "tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {id} at version {old_version:?}"
2690 );
2691 };
2692 if old_object.owner != owner {
2693 match old_object.owner {
2694 Owner::Address(addr) => {
2695 deleted_owners.push((addr, *id));
2696 }
2697 Owner::Object(object_id) => deleted_dynamic_fields.push((object_id, *id)),
2698 _ => {}
2699 }
2700 }
2701 }
2702
2703 match owner {
2704 Owner::Address(addr) => {
2705 let new_object = written.get(id).unwrap_or_else(
2708 || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2709 );
2710 assert_eq!(
2711 new_object.version(),
2712 oref.version,
2713 "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2714 tx_digest,
2715 id,
2716 new_object.version(),
2717 oref.version
2718 );
2719
2720 let type_ = new_object
2721 .type_()
2722 .map(|type_| ObjectType::Struct(type_.clone()))
2723 .unwrap_or(ObjectType::Package);
2724
2725 new_owners.push((
2726 (addr, *id),
2727 ObjectInfo {
2728 object_id: *id,
2729 version: oref.version,
2730 digest: oref.digest,
2731 type_,
2732 owner,
2733 previous_transaction: *effects.transaction_digest(),
2734 },
2735 ));
2736 }
2737 Owner::Object(owner) => {
2738 let new_object = written.get(id).unwrap_or_else(
2739 || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2740 );
2741 assert_eq!(
2742 new_object.version(),
2743 oref.version,
2744 "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2745 tx_digest,
2746 id,
2747 new_object.version(),
2748 oref.version
2749 );
2750
2751 let Some(df_info) = self
2752 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2753 .unwrap_or_else(|e| {
2754 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2755 None
2756 }
2757 )
2758 else {
2759 continue;
2761 };
2762 new_dynamic_fields.push(((owner, *id), df_info))
2763 }
2764 _ => {}
2765 }
2766 }
2767
2768 Ok(ObjectIndexChanges {
2769 deleted_owners,
2770 deleted_dynamic_fields,
2771 new_owners,
2772 new_dynamic_fields,
2773 })
2774 }
2775
2776 fn try_create_dynamic_field_info(
2777 &self,
2778 o: &Object,
2779 written: &WrittenObjects,
2780 resolver: &mut dyn LayoutResolver,
2781 ) -> IotaResult<Option<DynamicFieldInfo>> {
2782 let Some(move_object) = o.data.as_struct_opt().cloned() else {
2784 return Ok(None);
2785 };
2786
2787 if !move_object.struct_tag().is_dynamic_field() {
2789 return Ok(None);
2790 }
2791
2792 let layout = resolver
2793 .get_annotated_layout(move_object.struct_tag())?
2794 .into_layout();
2795
2796 let field =
2797 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2798 IotaError::ObjectDeserialization {
2799 error: e.to_string(),
2800 }
2801 })?;
2802
2803 let type_ = field.kind;
2804 let name_type: TypeTag = type_tag_core_to_sdk(&field.name_layout.into());
2805 let bcs_name = field.name_bytes.to_owned();
2806
2807 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2808 .map_err(|e| {
2809 warn!("{e}");
2810 IotaError::ObjectDeserialization {
2811 error: e.to_string(),
2812 }
2813 })?;
2814
2815 let name = DynamicFieldName {
2816 type_: name_type,
2817 value: IotaMoveValue::from(name_value).to_json_value(),
2818 };
2819
2820 let value_metadata = field.value_metadata().map_err(|e| {
2821 warn!("{e}");
2822 IotaError::ObjectDeserialization {
2823 error: e.to_string(),
2824 }
2825 })?;
2826
2827 Ok(Some(match value_metadata {
2828 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2829 name,
2830 bcs_name,
2831 type_,
2832 object_type: object_type.to_canonical_string(true),
2833 object_id: o.id(),
2834 version: o.version(),
2835 digest: o.digest(),
2836 },
2837
2838 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2839 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2844 (
2845 object.version(),
2846 object.digest(),
2847 object.data.object_type().unwrap().clone(),
2848 )
2849 } else {
2850 let object = self
2852 .get_object_store()
2853 .try_get_object_by_key(&object_id, o.version())?
2854 .ok_or_else(|| UserInputError::ObjectNotFound {
2855 object_id,
2856 version: Some(o.version()),
2857 })?;
2858 let version = object.version();
2859 let digest = object.digest();
2860 let object_type = object.data.object_type().unwrap().clone();
2861 (version, digest, object_type)
2862 };
2863
2864 DynamicFieldInfo {
2865 name,
2866 bcs_name,
2867 type_,
2868 object_type: object_type.to_string(),
2869 object_id,
2870 version,
2871 digest,
2872 }
2873 }
2874 }))
2875 }
2876
2877 #[instrument(level = "trace", skip_all, err)]
2878 fn post_process_one_tx(
2879 &self,
2880 certificate: &VerifiedExecutableTransaction,
2881 effects: &TransactionEffects,
2882 inner_temporary_store: &InnerTemporaryStore,
2883 epoch_store: &Arc<AuthorityPerEpochStore>,
2884 ) -> IotaResult {
2885 if self.indexes.is_none() {
2886 return Ok(());
2887 }
2888
2889 let _scope = monitored_scope("Execution::post_process_one_tx");
2890
2891 let tx_digest = certificate.digest();
2892 let timestamp_ms = Self::unixtime_now_ms();
2893 let events = &inner_temporary_store.events;
2894 let written = &inner_temporary_store.written;
2895 let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2896 effects,
2897 inner_temporary_store,
2898 epoch_store,
2899 );
2900
2901 if let Some(indexes) = &self.indexes {
2903 let _ = self
2904 .index_tx(
2905 indexes.as_ref(),
2906 tx_digest,
2907 certificate,
2908 effects,
2909 events,
2910 timestamp_ms,
2911 tx_coins,
2912 written,
2913 inner_temporary_store,
2914 )
2915 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2916 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2917 .expect("Indexing tx should not fail");
2918
2919 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2920 let events = self.make_transaction_block_events(
2921 events.clone(),
2922 *tx_digest,
2923 timestamp_ms,
2924 epoch_store,
2925 inner_temporary_store,
2926 )?;
2927 self.subscription_handler
2929 .process_tx(certificate.data().transaction_data(), &effects, &events)
2930 .tap_ok(|_| {
2931 self.metrics
2932 .post_processing_total_tx_had_event_processed
2933 .inc()
2934 })
2935 .tap_err(|e| {
2936 warn!(
2937 ?tx_digest,
2938 "Post processing - Couldn't process events for tx: {}", e
2939 )
2940 })?;
2941
2942 self.metrics
2943 .post_processing_total_events_emitted
2944 .inc_by(events.data.len() as u64);
2945 };
2946 Ok(())
2947 }
2948
2949 fn make_transaction_block_events(
2950 &self,
2951 transaction_events: TransactionEvents,
2952 digest: TransactionDigest,
2953 timestamp_ms: u64,
2954 epoch_store: &Arc<AuthorityPerEpochStore>,
2955 inner_temporary_store: &InnerTemporaryStore,
2956 ) -> IotaResult<IotaTransactionBlockEvents> {
2957 let mut layout_resolver =
2958 epoch_store
2959 .executor()
2960 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2961 inner_temporary_store,
2962 self.get_backing_package_store(),
2963 )));
2964 IotaTransactionBlockEvents::try_from(
2965 transaction_events,
2966 digest,
2967 Some(timestamp_ms),
2968 layout_resolver.as_mut(),
2969 )
2970 }
2971
2972 pub fn unixtime_now_ms() -> u64 {
2973 let now = SystemTime::now()
2974 .duration_since(UNIX_EPOCH)
2975 .expect("Time went backwards")
2976 .as_millis();
2977 u64::try_from(now).expect("Travelling in time machine")
2978 }
2979
2980 #[instrument(level = "trace", skip_all)]
2981 pub async fn handle_transaction_info_request(
2982 &self,
2983 request: TransactionInfoRequest,
2984 ) -> IotaResult<TransactionInfoResponse> {
2985 let epoch_store = self.load_epoch_store_one_call_per_task();
2986 let (transaction, status) = self
2987 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2988 .ok_or(IotaError::TransactionNotFound {
2989 digest: request.transaction_digest,
2990 })?;
2991 Ok(TransactionInfoResponse {
2992 transaction,
2993 status,
2994 })
2995 }
2996
2997 #[instrument(level = "trace", skip_all)]
2998 pub async fn handle_object_info_request(
2999 &self,
3000 request: ObjectInfoRequest,
3001 ) -> IotaResult<ObjectInfoResponse> {
3002 let epoch_store = self.load_epoch_store_one_call_per_task();
3003
3004 let requested_object_seq = match request.request_kind {
3005 ObjectInfoRequestKind::LatestObjectInfo => {
3006 self.try_get_object_or_tombstone(request.object_id)
3007 .await?
3008 .ok_or_else(|| {
3009 IotaError::from(UserInputError::ObjectNotFound {
3010 object_id: request.object_id,
3011 version: None,
3012 })
3013 })?
3014 .version
3015 }
3016 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3017 };
3018
3019 let object = self
3020 .get_object_store()
3021 .try_get_object_by_key(&request.object_id, requested_object_seq)?
3022 .ok_or_else(|| {
3023 IotaError::from(UserInputError::ObjectNotFound {
3024 object_id: request.object_id,
3025 version: Some(requested_object_seq),
3026 })
3027 })?;
3028
3029 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3030 (request.generate_layout, object.data.as_struct_opt())
3031 {
3032 Some(into_struct_layout(
3033 epoch_store
3034 .executor()
3035 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3036 .get_annotated_layout(move_obj.struct_tag())?,
3037 )?)
3038 } else {
3039 None
3040 };
3041
3042 let lock = if !object.is_address_owned() {
3043 None
3045 } else {
3046 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
3047 .await?
3048 .map(|s| s.into_inner())
3049 };
3050
3051 Ok(ObjectInfoResponse {
3052 object,
3053 layout,
3054 lock_for_debugging: lock,
3055 })
3056 }
3057
3058 #[instrument(level = "trace", skip_all)]
3059 pub fn handle_checkpoint_request(
3060 &self,
3061 request: &CheckpointRequest,
3062 ) -> IotaResult<CheckpointResponse> {
3063 let summary = if request.certified {
3064 let summary = match request.sequence_number {
3065 Some(seq) => self
3066 .checkpoint_store
3067 .get_checkpoint_by_sequence_number(seq)?,
3068 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3069 }
3070 .map(|v| v.into_inner());
3071 summary.map(CheckpointSummaryResponse::Certified)
3072 } else {
3073 let summary = match request.sequence_number {
3074 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3075 None => self
3076 .checkpoint_store
3077 .get_latest_locally_computed_checkpoint()?,
3078 };
3079 summary.map(CheckpointSummaryResponse::Pending)
3080 };
3081 let contents = match &summary {
3082 Some(s) => self
3083 .checkpoint_store
3084 .get_checkpoint_contents(&s.content_digest())?,
3085 None => None,
3086 };
3087 Ok(CheckpointResponse {
3088 checkpoint: summary,
3089 contents,
3090 })
3091 }
3092
3093 fn check_protocol_version(
3094 supported_protocol_versions: SupportedProtocolVersions,
3095 current_version: ProtocolVersion,
3096 ) {
3097 info!("current protocol version is now {:?}", current_version);
3098 info!("supported versions are: {:?}", supported_protocol_versions);
3099 if !supported_protocol_versions.is_version_supported(current_version) {
3100 let msg = format!(
3101 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3102 );
3103
3104 error!("{}", msg);
3105 eprintln!("{msg}");
3106
3107 #[cfg(not(msim))]
3108 std::process::exit(1);
3109
3110 #[cfg(msim)]
3111 iota_simulator::task::shutdown_current_node();
3112 }
3113 }
3114
3115 #[expect(clippy::disallowed_methods)] #[expect(clippy::too_many_arguments)]
3117 pub async fn new(
3118 name: AuthorityName,
3119 secret: StableSyncAuthoritySigner,
3120 supported_protocol_versions: SupportedProtocolVersions,
3121 store: Arc<AuthorityStore>,
3122 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3123 epoch_store: Arc<AuthorityPerEpochStore>,
3124 committee_store: Arc<CommitteeStore>,
3125 indexes: Option<Arc<IndexStore>>,
3126 grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3127 checkpoint_store: Arc<CheckpointStore>,
3128 prometheus_registry: &Registry,
3129 genesis_objects: &[Object],
3130 db_checkpoint_config: &DBCheckpointConfig,
3131 config: NodeConfig,
3132 archive_readers: ArchiveReaderBalancer,
3133 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3134 chain_identifier: ChainIdentifier,
3135 pruner_db: Option<Arc<AuthorityPrunerTables>>,
3136 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
3137 policy_config: Option<PolicyConfig>,
3138 firewall_config: Option<RemoteFirewallConfig>,
3139 ) -> Arc<Self> {
3140 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3141
3142 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3143 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3144 let transaction_manager = Arc::new(TransactionManager::new(
3145 execution_cache_trait_pointers.object_cache_reader.clone(),
3146 execution_cache_trait_pointers
3147 .transaction_cache_reader
3148 .clone(),
3149 &epoch_store,
3150 tx_ready_certificates,
3151 metrics.clone(),
3152 ));
3153 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3154
3155 let authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3156 epoch_store.get_parent_path(),
3157 config
3158 .authority_store_pruning_config
3159 .num_latest_epoch_dbs_to_retain,
3160 )
3161 .await;
3162 let _pruner = AuthorityStorePruner::new(
3163 store.perpetual_tables.clone(),
3164 checkpoint_store.clone(),
3165 grpc_indexes_store.clone(),
3166 indexes.clone(),
3167 config.authority_store_pruning_config.clone(),
3168 epoch_store.committee().authority_exists(&name),
3169 epoch_store.epoch_start_state().epoch_duration_ms(),
3170 prometheus_registry,
3171 archive_readers,
3172 pruner_db,
3173 checkpoint_progress_tracker.clone(),
3174 );
3175 let input_loader =
3176 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3177 let epoch = epoch_store.epoch();
3178 let rgp = epoch_store.reference_gas_price();
3179 let traffic_controller_metrics =
3180 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3181 let traffic_controller = if let Some(policy_config) = policy_config {
3182 Some(Arc::new(
3183 TrafficController::init(
3184 policy_config,
3185 traffic_controller_metrics,
3186 firewall_config.clone(),
3187 )
3188 .await,
3189 ))
3190 } else {
3191 None
3192 };
3193 let state = Arc::new(AuthorityState {
3194 name,
3195 secret,
3196 execution_lock: RwLock::new(epoch),
3197 epoch_store: ArcSwap::new(epoch_store.clone()),
3198 input_loader,
3199 execution_cache_trait_pointers,
3200 indexes,
3201 grpc_indexes_store,
3202 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3203 checkpoint_store,
3204 committee_store,
3205 transaction_manager,
3206 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3207 metrics,
3208 _pruner,
3209 authority_per_epoch_pruner,
3210 checkpoint_progress_tracker,
3211 db_checkpoint_config: db_checkpoint_config.clone(),
3212 config,
3213 overload_info: AuthorityOverloadInfo::default(),
3214 validator_tx_finalizer,
3215 chain_identifier,
3216 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3217 traffic_controller,
3218 });
3219
3220 let authority_state = Arc::downgrade(&state);
3222 spawn_monitored_task!(execution_process(
3223 authority_state,
3224 rx_ready_certificates,
3225 rx_execution_shutdown,
3226 ));
3227 spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3228
3229 state
3231 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3232 .expect("Error indexing genesis objects.");
3233
3234 state
3235 }
3236
3237 pub fn epoch_db_pruner(&self) -> &AuthorityPerEpochStorePruner {
3238 &self.authority_per_epoch_pruner
3239 }
3240
3241 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3243 &self.execution_cache_trait_pointers.object_cache_reader
3244 }
3245
3246 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3247 &self.execution_cache_trait_pointers.transaction_cache_reader
3248 }
3249
3250 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3251 &self.execution_cache_trait_pointers.cache_writer
3252 }
3253
3254 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3255 &self.execution_cache_trait_pointers.backing_store
3256 }
3257
3258 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3259 &self.execution_cache_trait_pointers.backing_package_store
3260 }
3261
3262 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3263 &self.execution_cache_trait_pointers.object_store
3264 }
3265
3266 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3267 &self.execution_cache_trait_pointers.reconfig_api
3268 }
3269
3270 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3271 &self.execution_cache_trait_pointers.global_state_hash_store
3272 }
3273
3274 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3275 &self.execution_cache_trait_pointers.checkpoint_cache
3276 }
3277
3278 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3279 &self.execution_cache_trait_pointers.state_sync_store
3280 }
3281
3282 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3283 &self.execution_cache_trait_pointers.cache_commit
3284 }
3285
3286 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3287 self.execution_cache_trait_pointers
3288 .testing_api
3289 .database_for_testing()
3290 }
3291
3292 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3293 &self,
3294 config: NodeConfig,
3295 metrics: Arc<AuthorityStorePruningMetrics>,
3296 ) -> anyhow::Result<()> {
3297 let archive_readers =
3298 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3299 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3300 &self.database_for_testing().perpetual_tables,
3301 &self.checkpoint_store,
3302 self.grpc_indexes_store.as_deref(),
3303 None,
3304 config.authority_store_pruning_config,
3305 metrics,
3306 archive_readers,
3307 EPOCH_DURATION_MS_FOR_TESTING,
3308 self.checkpoint_progress_tracker.as_ref(),
3309 )
3310 .await
3311 }
3312
3313 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3314 &self.transaction_manager
3315 }
3316
3317 pub fn enqueue_transactions_for_execution(
3320 &self,
3321 txns: Vec<VerifiedExecutableTransaction>,
3322 epoch_store: &Arc<AuthorityPerEpochStore>,
3323 ) {
3324 self.transaction_manager.enqueue(txns, epoch_store)
3325 }
3326
3327 pub fn enqueue_certificates_for_execution(
3329 &self,
3330 certs: Vec<VerifiedCertificate>,
3331 epoch_store: &Arc<AuthorityPerEpochStore>,
3332 ) {
3333 self.transaction_manager
3334 .enqueue_certificates(certs, epoch_store)
3335 }
3336
3337 pub fn enqueue_with_expected_effects_digest(
3338 &self,
3339 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3340 epoch_store: &AuthorityPerEpochStore,
3341 ) {
3342 self.transaction_manager
3343 .enqueue_with_expected_effects_digest(certs, epoch_store)
3344 }
3345
3346 fn create_owner_index_if_empty(
3347 &self,
3348 genesis_objects: &[Object],
3349 epoch_store: &Arc<AuthorityPerEpochStore>,
3350 ) -> IotaResult {
3351 let Some(index_store) = &self.indexes else {
3352 return Ok(());
3353 };
3354 if !index_store.is_empty() {
3355 return Ok(());
3356 }
3357
3358 let mut new_owners = vec![];
3359 let mut new_dynamic_fields = vec![];
3360 let mut layout_resolver = epoch_store
3361 .executor()
3362 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3363 for o in genesis_objects.iter() {
3364 match o.owner {
3365 Owner::Address(addr) => new_owners.push((
3366 (addr, o.id()),
3367 ObjectInfo::new(&o.compute_object_reference(), o),
3368 )),
3369 Owner::Object(object_id) => {
3370 let id = o.id();
3371 let info = match self.try_create_dynamic_field_info(
3372 o,
3373 &BTreeMap::new(),
3374 layout_resolver.as_mut(),
3375 ) {
3376 Ok(Some(info)) => info,
3377 Ok(None) => continue,
3378 Err(IotaError::UserInput {
3379 error:
3380 UserInputError::ObjectNotFound {
3381 object_id: not_found_id,
3382 version,
3383 },
3384 }) => {
3385 warn!(
3386 ?not_found_id,
3387 ?version,
3388 object_owner=?object_id,
3389 field=?id,
3390 "Skipping dynamic field: referenced genesis object not found"
3391 );
3392 continue;
3393 }
3394 Err(e) => return Err(e),
3395 };
3396 new_dynamic_fields.push(((object_id, id), info));
3397 }
3398 _ => {}
3399 }
3400 }
3401
3402 index_store.insert_genesis_objects(ObjectIndexChanges {
3403 deleted_owners: vec![],
3404 deleted_dynamic_fields: vec![],
3405 new_owners,
3406 new_dynamic_fields,
3407 })
3408 }
3409
3410 pub fn execution_lock_for_executable_transaction(
3414 &self,
3415 transaction: &VerifiedExecutableTransaction,
3416 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3417 let lock = self
3418 .execution_lock
3419 .try_read()
3420 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3421 if *lock == transaction.auth_sig().epoch() {
3422 Ok(lock)
3423 } else {
3424 Err(IotaError::WrongEpoch {
3425 expected_epoch: *lock,
3426 actual_epoch: transaction.auth_sig().epoch(),
3427 })
3428 }
3429 }
3430
3431 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3437 self.execution_lock
3438 .try_read()
3439 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3440 }
3441
3442 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3443 self.execution_lock.write().await
3444 }
3445
3446 #[instrument(level = "error", skip_all)]
3447 pub async fn reconfigure(
3448 &self,
3449 cur_epoch_store: &AuthorityPerEpochStore,
3450 supported_protocol_versions: SupportedProtocolVersions,
3451 new_committee: Committee,
3452 epoch_start_configuration: EpochStartConfiguration,
3453 state_hasher: Arc<GlobalStateHasher>,
3454 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3455 epoch_supply_change: i64,
3456 epoch_last_checkpoint: CheckpointSequenceNumber,
3457 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3458 Self::check_protocol_version(
3459 supported_protocol_versions,
3460 epoch_start_configuration
3461 .epoch_start_state()
3462 .protocol_version(),
3463 );
3464 self.metrics.reset_on_reconfigure();
3465 self.committee_store.insert_new_committee(&new_committee)?;
3466
3467 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3469
3470 cur_epoch_store.epoch_terminated().await;
3472
3473 let highest_locally_built_checkpoint_seq = self
3474 .checkpoint_store
3475 .get_latest_locally_computed_checkpoint()?
3476 .map(|c| *c.sequence_number())
3477 .unwrap_or(0);
3478
3479 assert!(
3480 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3481 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3482 );
3483 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint
3484 || self.is_fullnode(cur_epoch_store)
3485 {
3486 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3490 if num_shared_version_assignments > 10 {
3497 debug_fatal!(
3499 "all shared_version_assignments should have been removed \
3500 (num_shared_version_assignments: {num_shared_version_assignments})"
3501 );
3502 }
3503 }
3504
3505 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3511 .await?;
3512 self.get_reconfig_api()
3513 .clear_state_end_of_epoch(&execution_lock);
3514 self.check_system_consistency(
3515 cur_epoch_store,
3516 state_hasher,
3517 expensive_safety_check_config,
3518 epoch_supply_change,
3519 )?;
3520 self.get_reconfig_api()
3521 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3522 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3523 if self
3524 .db_checkpoint_config
3525 .perform_db_checkpoints_at_epoch_end
3526 {
3527 let checkpoint_indexes = self
3528 .db_checkpoint_config
3529 .perform_index_db_checkpoints_at_epoch_end
3530 .unwrap_or(false);
3531 let current_epoch = cur_epoch_store.epoch();
3532 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3533 self.checkpoint_all_dbs(
3534 &epoch_checkpoint_path,
3535 cur_epoch_store,
3536 checkpoint_indexes,
3537 )?;
3538 }
3539 }
3540
3541 let new_epoch = new_committee.epoch;
3542 let new_epoch_store = self
3543 .reopen_epoch_db(
3544 cur_epoch_store,
3545 new_committee,
3546 epoch_start_configuration,
3547 expensive_safety_check_config,
3548 epoch_last_checkpoint,
3549 )
3550 .await?;
3551 assert_eq!(new_epoch_store.epoch(), new_epoch);
3552 self.transaction_manager.reconfigure(new_epoch);
3553 *execution_lock = new_epoch;
3554 Ok(new_epoch_store)
3558 }
3559
3560 pub async fn reconfigure_for_testing(&self) {
3565 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3566 let epoch_store = self.epoch_store_for_testing().clone();
3567 let protocol_config = epoch_store.protocol_config().clone();
3568 let _guard =
3576 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3577 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3578 self.get_backing_package_store().clone(),
3579 &self.config.expensive_safety_check_config,
3580 self.checkpoint_store
3581 .get_epoch_last_checkpoint(epoch_store.epoch())
3582 .unwrap()
3583 .map(|c| *c.sequence_number())
3584 .unwrap_or_default(),
3585 );
3586 let new_epoch = new_epoch_store.epoch();
3587 self.transaction_manager.reconfigure(new_epoch);
3588 self.epoch_store.store(new_epoch_store);
3589 epoch_store.epoch_terminated().await;
3590 *execution_lock = new_epoch;
3591 }
3592
3593 #[instrument(level = "error", skip_all)]
3594 fn check_system_consistency(
3595 &self,
3596 cur_epoch_store: &AuthorityPerEpochStore,
3597 state_hasher: Arc<GlobalStateHasher>,
3598 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3599 epoch_supply_change: i64,
3600 ) -> IotaResult<()> {
3601 info!(
3602 "Performing iota conservation consistency check for epoch {}",
3603 cur_epoch_store.epoch()
3604 );
3605
3606 if cfg!(debug_assertions) {
3607 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3608 }
3609
3610 self.get_reconfig_api()
3611 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3612
3613 if expensive_safety_check_config.enable_state_consistency_check() {
3615 info!(
3616 "Performing state consistency check for epoch {}",
3617 cur_epoch_store.epoch()
3618 );
3619 self.expensive_check_is_consistent_state(
3620 state_hasher,
3621 cur_epoch_store,
3622 cfg!(debug_assertions), );
3624 }
3625
3626 if expensive_safety_check_config.enable_secondary_index_checks() {
3627 if let Some(indexes) = self.indexes.clone() {
3628 verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
3629 .expect("secondary indexes are inconsistent");
3630 }
3631 }
3632
3633 Ok(())
3634 }
3635
3636 fn expensive_check_is_consistent_state(
3637 &self,
3638 state_hasher: Arc<GlobalStateHasher>,
3639 cur_epoch_store: &AuthorityPerEpochStore,
3640 panic: bool,
3641 ) {
3642 let live_object_set_hash = state_hasher.digest_live_object_set();
3643
3644 let root_state_hash: ECMHLiveObjectSetDigest = self
3645 .get_global_state_hash_store()
3646 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
3647 .expect("Retrieving root state hash cannot fail")
3648 .expect("Root state hash for epoch must exist")
3649 .1
3650 .digest()
3651 .into();
3652
3653 let is_inconsistent = root_state_hash != live_object_set_hash;
3654 if is_inconsistent {
3655 if panic {
3656 panic!(
3657 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3658 );
3659 } else {
3660 error!(
3661 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3662 root_state_hash, live_object_set_hash
3663 );
3664 }
3665 } else {
3666 info!("State consistency check passed");
3667 }
3668
3669 if !panic {
3670 state_hasher.set_inconsistent_state(is_inconsistent);
3671 }
3672 }
3673
3674 pub fn current_epoch_for_testing(&self) -> EpochId {
3675 self.epoch_store_for_testing().epoch()
3676 }
3677
3678 #[instrument(level = "error", skip_all)]
3679 pub fn checkpoint_all_dbs(
3680 &self,
3681 checkpoint_path: &Path,
3682 cur_epoch_store: &AuthorityPerEpochStore,
3683 checkpoint_indexes: bool,
3684 ) -> IotaResult {
3685 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3686 let current_epoch = cur_epoch_store.epoch();
3687
3688 if checkpoint_path.exists() {
3689 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3690 return Ok(());
3691 }
3692
3693 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3694 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3695
3696 if checkpoint_path_tmp.exists() {
3697 fs::remove_dir_all(&checkpoint_path_tmp)
3698 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3699 }
3700
3701 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3702 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3703
3704 self.checkpoint_store
3707 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3708
3709 self.get_reconfig_api()
3710 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3711
3712 self.committee_store
3713 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3714
3715 if checkpoint_indexes {
3716 if let Some(indexes) = self.indexes.as_ref() {
3717 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3718 }
3719 if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3720 grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3721 }
3722 }
3723
3724 fs::rename(checkpoint_path_tmp, checkpoint_path)
3725 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3726 Ok(())
3727 }
3728
3729 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3736 self.epoch_store.load()
3737 }
3738
3739 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3741 self.load_epoch_store_one_call_per_task()
3742 }
3743
3744 pub fn clone_committee_for_testing(&self) -> Committee {
3745 Committee::clone(self.epoch_store_for_testing().committee())
3746 }
3747
3748 #[instrument(level = "trace", skip_all)]
3749 pub async fn try_get_object(&self, object_id: &ObjectId) -> IotaResult<Option<Object>> {
3750 self.get_object_store()
3751 .try_get_object(object_id)
3752 .map_err(Into::into)
3753 }
3754
3755 pub async fn get_object(&self, object_id: &ObjectId) -> Option<Object> {
3757 self.try_get_object(object_id)
3758 .await
3759 .expect("storage access failed")
3760 }
3761
3762 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3763 Ok(self
3764 .try_get_object(&ObjectId::SYSTEM)
3765 .await?
3766 .expect("system package should always exist")
3767 .compute_object_reference())
3768 }
3769
3770 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3772 self.get_object_cache_reader()
3773 .try_get_iota_system_state_object_unsafe()
3774 }
3775
3776 #[instrument(level = "trace", skip_all)]
3777 pub fn get_checkpoint_by_sequence_number(
3778 &self,
3779 sequence_number: CheckpointSequenceNumber,
3780 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3781 Ok(self
3782 .checkpoint_store
3783 .get_checkpoint_by_sequence_number(sequence_number)?)
3784 }
3785
3786 pub async fn wait_for_checkpoint_inclusion(
3793 &self,
3794 digests: &[TransactionDigest],
3795 timeout: Duration,
3796 ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3797 let epoch_store = self.load_epoch_store_one_call_per_task();
3798
3799 let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3802
3803 let results = epoch_store
3804 .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3805 *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3806 self.get_checkpoint_by_sequence_number(seq)
3807 .ok()
3808 .flatten()
3809 .map(|c| c.timestamp_ms)
3810 .unwrap_or(0)
3811 })
3812 })
3813 .await?;
3814
3815 Ok(digests
3816 .iter()
3817 .copied()
3818 .zip(results)
3819 .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3820 .collect())
3821 }
3822
3823 #[instrument(level = "trace", skip_all)]
3824 pub fn get_transaction_checkpoint_for_tests(
3825 &self,
3826 digest: &TransactionDigest,
3827 epoch_store: &AuthorityPerEpochStore,
3828 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3829 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3830 let Some(checkpoint) = checkpoint else {
3831 return Ok(None);
3832 };
3833 let checkpoint = self
3834 .checkpoint_store
3835 .get_checkpoint_by_sequence_number(checkpoint)?;
3836 Ok(checkpoint)
3837 }
3838
3839 #[instrument(level = "trace", skip_all)]
3840 pub fn get_object_read(&self, object_id: &ObjectId) -> IotaResult<ObjectRead> {
3841 Ok(
3842 match self
3843 .get_object_cache_reader()
3844 .try_get_latest_object_or_tombstone(*object_id)?
3845 {
3846 Some((_, ObjectOrTombstone::Object(object))) => {
3847 let layout = self.get_object_layout(&object)?;
3848 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3849 }
3850 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3851 None => ObjectRead::NotExists(*object_id),
3852 },
3853 )
3854 }
3855
3856 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3858 self.chain_identifier
3859 }
3860
3861 #[instrument(level = "trace", skip_all)]
3862 pub fn get_move_object<T>(&self, object_id: &ObjectId) -> IotaResult<T>
3863 where
3864 T: DeserializeOwned,
3865 {
3866 let o = self.get_object_read(object_id)?.into_object()?;
3867 if let Some(move_object) = o.data.as_struct_opt() {
3868 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3869 IotaError::ObjectDeserialization {
3870 error: format!("{e}"),
3871 }
3872 })?)
3873 } else {
3874 Err(IotaError::ObjectDeserialization {
3875 error: format!("Provided object : [{object_id}] is not a Move object."),
3876 })
3877 }
3878 }
3879
3880 #[instrument(level = "trace", skip_all)]
3886 pub fn get_past_object_read(
3887 &self,
3888 object_id: &ObjectId,
3889 version: SequenceNumber,
3890 ) -> IotaResult<PastObjectRead> {
3891 let Some(obj_ref) = self
3893 .get_object_cache_reader()
3894 .try_get_latest_object_ref_or_tombstone(*object_id)?
3895 else {
3896 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3897 };
3898
3899 if version > obj_ref.version {
3900 return Ok(PastObjectRead::VersionTooHigh {
3901 object_id: *object_id,
3902 asked_version: version,
3903 latest_version: obj_ref.version,
3904 });
3905 }
3906
3907 if version < obj_ref.version {
3908 return Ok(match self.read_object_at_version(object_id, version)? {
3910 Some((object, layout)) => {
3911 let obj_ref = object.compute_object_reference();
3912 PastObjectRead::VersionFound(obj_ref, object, layout)
3913 }
3914
3915 None => PastObjectRead::VersionNotFound(*object_id, version),
3916 });
3917 }
3918
3919 if !obj_ref.digest.is_object_alive() {
3920 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3921 }
3922
3923 match self.read_object_at_version(object_id, obj_ref.version)? {
3924 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3925 None => {
3926 error!(
3927 "Object with in parent_entry is missing from object store, datastore is \
3928 inconsistent",
3929 );
3930 Err(UserInputError::ObjectNotFound {
3931 object_id: *object_id,
3932 version: Some(obj_ref.version),
3933 }
3934 .into())
3935 }
3936 }
3937 }
3938
3939 #[instrument(level = "trace", skip_all)]
3940 fn read_object_at_version(
3941 &self,
3942 object_id: &ObjectId,
3943 version: SequenceNumber,
3944 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3945 let Some(object) = self
3946 .get_object_cache_reader()
3947 .try_get_object_by_key(object_id, version)?
3948 else {
3949 return Ok(None);
3950 };
3951
3952 let layout = self.get_object_layout(&object)?;
3953 Ok(Some((object, layout)))
3954 }
3955
3956 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3957 let layout = object
3958 .data
3959 .as_struct_opt()
3960 .map(|object| {
3961 into_struct_layout(
3962 self.load_epoch_store_one_call_per_task()
3963 .executor()
3964 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3966 .get_annotated_layout(object.struct_tag())?,
3967 )
3968 })
3969 .transpose()?;
3970 Ok(layout)
3971 }
3972
3973 fn get_owner_at_version(
3974 &self,
3975 object_id: &ObjectId,
3976 version: SequenceNumber,
3977 ) -> IotaResult<Owner> {
3978 self.get_object_store()
3979 .try_get_object_by_key(object_id, version)?
3980 .ok_or_else(|| {
3981 IotaError::from(UserInputError::ObjectNotFound {
3982 object_id: *object_id,
3983 version: Some(version),
3984 })
3985 })
3986 .map(|o| o.owner)
3987 }
3988
3989 #[instrument(level = "trace", skip_all)]
3990 pub fn get_owner_objects(
3991 &self,
3992 owner: IotaAddress,
3993 cursor: Option<ObjectId>,
3995 limit: usize,
3996 filter: Option<IotaObjectDataFilter>,
3997 ) -> IotaResult<Vec<ObjectInfo>> {
3998 if let Some(indexes) = &self.indexes {
3999 indexes.get_owner_objects(owner, cursor, limit, filter)
4000 } else {
4001 Err(IotaError::IndexStoreNotAvailable)
4002 }
4003 }
4004
4005 #[instrument(level = "trace", skip_all)]
4006 pub fn get_owned_coins_iterator_with_cursor(
4007 &self,
4008 owner: IotaAddress,
4009 cursor: (String, ObjectId),
4011 limit: usize,
4012 one_coin_type_only: bool,
4013 ) -> IotaResult<impl Iterator<Item = (String, ObjectId, CoinInfo)> + '_> {
4014 if let Some(indexes) = &self.indexes {
4015 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4016 } else {
4017 Err(IotaError::IndexStoreNotAvailable)
4018 }
4019 }
4020
4021 #[instrument(level = "trace", skip_all)]
4022 pub fn get_owner_objects_iterator(
4023 &self,
4024 owner: IotaAddress,
4025 cursor: Option<ObjectId>,
4027 filter: Option<IotaObjectDataFilter>,
4028 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
4029 let cursor_u = cursor.unwrap_or(ObjectId::ZERO);
4030 if let Some(indexes) = &self.indexes {
4031 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4032 } else {
4033 Err(IotaError::IndexStoreNotAvailable)
4034 }
4035 }
4036
4037 #[instrument(level = "trace", skip_all)]
4038 pub async fn get_move_objects<T>(
4039 &self,
4040 owner: IotaAddress,
4041 tag: StructTag,
4042 ) -> IotaResult<Vec<T>>
4043 where
4044 T: DeserializeOwned,
4045 {
4046 let object_ids = self
4047 .get_owner_objects_iterator(owner, None, None)?
4048 .filter(|o| match &o.type_ {
4049 ObjectType::Struct(s) => *s == tag,
4050 ObjectType::Package => false,
4051 })
4052 .map(|info| ObjectKey(info.object_id, info.version))
4053 .collect::<Vec<_>>();
4054 let mut move_objects = vec![];
4055
4056 let objects = self
4057 .get_object_store()
4058 .try_multi_get_objects_by_key(&object_ids)?;
4059
4060 for (o, id) in objects.into_iter().zip(object_ids) {
4061 let object = o.ok_or_else(|| {
4062 IotaError::from(UserInputError::ObjectNotFound {
4063 object_id: id.0,
4064 version: Some(id.1),
4065 })
4066 })?;
4067 let move_object = object.data.as_struct_opt().ok_or_else(|| {
4068 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4069 })?;
4070 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4071 IotaError::ObjectDeserialization {
4072 error: format!("{e}"),
4073 }
4074 })?);
4075 }
4076 Ok(move_objects)
4077 }
4078
4079 #[instrument(level = "trace", skip_all)]
4080 pub fn get_dynamic_fields(
4081 &self,
4082 owner: ObjectId,
4083 cursor: Option<ObjectId>,
4085 limit: usize,
4086 ) -> IotaResult<Vec<(ObjectId, DynamicFieldInfo)>> {
4087 Ok(self
4088 .get_dynamic_fields_iterator(owner, cursor)?
4089 .take(limit)
4090 .collect::<Result<Vec<_>, _>>()?)
4091 }
4092
4093 fn get_dynamic_fields_iterator(
4094 &self,
4095 owner: ObjectId,
4096 cursor: Option<ObjectId>,
4098 ) -> IotaResult<impl Iterator<Item = Result<(ObjectId, DynamicFieldInfo), TypedStoreError>> + '_>
4099 {
4100 if let Some(indexes) = &self.indexes {
4101 indexes.get_dynamic_fields_iterator(owner, cursor)
4102 } else {
4103 Err(IotaError::IndexStoreNotAvailable)
4104 }
4105 }
4106
4107 #[instrument(level = "trace", skip_all)]
4108 pub fn get_dynamic_field_object_id(
4109 &self,
4110 owner: ObjectId,
4111 name_type: TypeTag,
4112 name_bcs_bytes: &[u8],
4113 ) -> IotaResult<Option<ObjectId>> {
4114 if let Some(indexes) = &self.indexes {
4115 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4116 } else {
4117 Err(IotaError::IndexStoreNotAvailable)
4118 }
4119 }
4120
4121 #[instrument(level = "trace", skip_all)]
4122 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4123 Ok(self.get_indexes()?.next_sequence_number())
4124 }
4125
4126 #[instrument(level = "trace", skip_all)]
4127 pub async fn get_executed_transaction_and_effects(
4128 &self,
4129 digest: TransactionDigest,
4130 kv_store: Arc<TransactionKeyValueStore>,
4131 ) -> IotaResult<(Transaction, TransactionEffects)> {
4132 let transaction = kv_store.get_tx(digest).await?;
4133 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4134 Ok((transaction, effects))
4135 }
4136
4137 #[instrument(level = "trace", skip_all)]
4138 pub fn multi_get_checkpoint_by_sequence_number(
4139 &self,
4140 sequence_numbers: &[CheckpointSequenceNumber],
4141 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4142 Ok(self
4143 .checkpoint_store
4144 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4145 }
4146
4147 #[instrument(level = "trace", skip_all)]
4148 pub fn get_transaction_events(
4149 &self,
4150 digest: &TransactionDigest,
4151 ) -> IotaResult<TransactionEvents> {
4152 self.get_transaction_cache_reader()
4153 .try_get_events(digest)?
4154 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4155 }
4156
4157 pub fn get_transaction_input_objects(
4158 &self,
4159 effects: &TransactionEffects,
4160 ) -> anyhow::Result<Vec<Object>> {
4161 iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4162 .map_err(Into::into)
4163 }
4164
4165 pub fn get_transaction_output_objects(
4166 &self,
4167 effects: &TransactionEffects,
4168 ) -> anyhow::Result<Vec<Object>> {
4169 iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4170 .map_err(Into::into)
4171 }
4172
4173 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4174 match &self.indexes {
4175 Some(i) => Ok(i.clone()),
4176 None => Err(IotaError::UnsupportedFeature {
4177 error: "extended object indexing is not enabled on this server".into(),
4178 }),
4179 }
4180 }
4181
4182 pub async fn get_transactions_for_tests(
4183 self: &Arc<Self>,
4184 filter: Option<TransactionFilter>,
4185 cursor: Option<TransactionDigest>,
4186 limit: Option<usize>,
4187 reverse: bool,
4188 ) -> IotaResult<Vec<TransactionDigest>> {
4189 let metrics = KeyValueStoreMetrics::new_for_tests();
4190 let kv_store = Arc::new(TransactionKeyValueStore::new(
4191 "rocksdb",
4192 metrics,
4193 self.clone(),
4194 ));
4195 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4196 .await
4197 }
4198
4199 #[instrument(level = "trace", skip_all)]
4200 pub async fn get_transactions(
4201 &self,
4202 kv_store: &Arc<TransactionKeyValueStore>,
4203 filter: Option<TransactionFilter>,
4204 cursor: Option<TransactionDigest>,
4206 limit: Option<usize>,
4207 reverse: bool,
4208 ) -> IotaResult<Vec<TransactionDigest>> {
4209 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4210 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4211 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4212 if reverse {
4213 let iter = iter
4214 .rev()
4215 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4216 .skip(usize::from(cursor.is_some()));
4217 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4218 } else {
4219 let iter = iter
4220 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4221 .skip(usize::from(cursor.is_some()));
4222 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4223 }
4224 }
4225 self.get_indexes()?
4226 .get_transactions(filter, cursor, limit, reverse)
4227 }
4228
4229 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4230 &self.checkpoint_store
4231 }
4232
4233 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4234 self.get_checkpoint_store()
4235 .get_highest_executed_checkpoint_seq_number()?
4236 .ok_or(IotaError::UserInput {
4237 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4238 })
4239 }
4240
4241 #[cfg(msim)]
4242 pub fn get_highest_pruned_checkpoint_for_testing(
4243 &self,
4244 ) -> IotaResult<CheckpointSequenceNumber> {
4245 self.database_for_testing()
4246 .perpetual_tables
4247 .get_highest_pruned_checkpoint()
4248 .map(|c| c.unwrap_or(0))
4249 .map_err(Into::into)
4250 }
4251
4252 #[instrument(level = "trace", skip_all)]
4253 pub fn get_checkpoint_summary_by_sequence_number(
4254 &self,
4255 sequence_number: CheckpointSequenceNumber,
4256 ) -> IotaResult<CheckpointSummary> {
4257 let verified_checkpoint = self
4258 .get_checkpoint_store()
4259 .get_checkpoint_by_sequence_number(sequence_number)?;
4260 match verified_checkpoint {
4261 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4262 None => Err(IotaError::UserInput {
4263 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4264 }),
4265 }
4266 }
4267
4268 #[instrument(level = "trace", skip_all)]
4269 pub fn get_checkpoint_summary_by_digest(
4270 &self,
4271 digest: CheckpointDigest,
4272 ) -> IotaResult<CheckpointSummary> {
4273 let verified_checkpoint = self
4274 .get_checkpoint_store()
4275 .get_checkpoint_by_digest(&digest)?;
4276 match verified_checkpoint {
4277 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4278 None => Err(IotaError::UserInput {
4279 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4280 }),
4281 }
4282 }
4283
4284 #[instrument(level = "trace", skip_all)]
4285 pub fn find_publish_txn_digest(&self, package_id: ObjectId) -> IotaResult<TransactionDigest> {
4286 if package_id.is_system_package() {
4287 return self.find_genesis_txn_digest();
4288 }
4289 Ok(self
4290 .get_object_read(&package_id)?
4291 .into_object()?
4292 .previous_transaction)
4293 }
4294
4295 #[instrument(level = "trace", skip_all)]
4296 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4297 let summary = self
4298 .get_verified_checkpoint_by_sequence_number(0)?
4299 .into_message();
4300 let content = self.get_checkpoint_contents(summary.content_digest)?;
4301 let genesis_transaction = content.enumerate_transactions(&summary).next();
4302 Ok(genesis_transaction
4303 .ok_or(IotaError::UserInput {
4304 error: UserInputError::GenesisTransactionNotFound,
4305 })?
4306 .1
4307 .transaction)
4308 }
4309
4310 #[instrument(level = "trace", skip_all)]
4311 pub fn get_verified_checkpoint_by_sequence_number(
4312 &self,
4313 sequence_number: CheckpointSequenceNumber,
4314 ) -> IotaResult<VerifiedCheckpoint> {
4315 let verified_checkpoint = self
4316 .get_checkpoint_store()
4317 .get_checkpoint_by_sequence_number(sequence_number)?;
4318 match verified_checkpoint {
4319 Some(verified_checkpoint) => Ok(verified_checkpoint),
4320 None => Err(IotaError::UserInput {
4321 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4322 }),
4323 }
4324 }
4325
4326 #[instrument(level = "trace", skip_all)]
4327 pub fn get_verified_checkpoint_summary_by_digest(
4328 &self,
4329 digest: CheckpointDigest,
4330 ) -> IotaResult<VerifiedCheckpoint> {
4331 let verified_checkpoint = self
4332 .get_checkpoint_store()
4333 .get_checkpoint_by_digest(&digest)?;
4334 match verified_checkpoint {
4335 Some(verified_checkpoint) => Ok(verified_checkpoint),
4336 None => Err(IotaError::UserInput {
4337 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4338 }),
4339 }
4340 }
4341
4342 #[instrument(level = "trace", skip_all)]
4343 pub fn get_checkpoint_contents(
4344 &self,
4345 digest: CheckpointContentsDigest,
4346 ) -> IotaResult<CheckpointContents> {
4347 self.get_checkpoint_store()
4348 .get_checkpoint_contents(&digest)?
4349 .ok_or(IotaError::UserInput {
4350 error: UserInputError::CheckpointContentsNotFound(digest),
4351 })
4352 }
4353
4354 #[instrument(level = "trace", skip_all)]
4355 pub fn get_checkpoint_contents_by_sequence_number(
4356 &self,
4357 sequence_number: CheckpointSequenceNumber,
4358 ) -> IotaResult<CheckpointContents> {
4359 let verified_checkpoint = self
4360 .get_checkpoint_store()
4361 .get_checkpoint_by_sequence_number(sequence_number)?;
4362 match verified_checkpoint {
4363 Some(verified_checkpoint) => {
4364 let content_digest = verified_checkpoint.into_inner().content_digest;
4365 self.get_checkpoint_contents(content_digest)
4366 }
4367 None => Err(IotaError::UserInput {
4368 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4369 }),
4370 }
4371 }
4372
4373 #[instrument(level = "trace", skip_all)]
4374 pub async fn query_events(
4375 &self,
4376 kv_store: &Arc<TransactionKeyValueStore>,
4377 query: EventFilter,
4378 cursor: Option<EventID>,
4380 limit: usize,
4381 descending: bool,
4382 ) -> IotaResult<Vec<IotaEvent>> {
4383 let index_store = self.get_indexes()?;
4384
4385 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4387 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4388 IotaError::TransactionNotFound {
4389 digest: cursor.tx_digest,
4390 },
4391 )?;
4392 (tx_seq, cursor.event_seq as usize)
4393 } else if descending {
4394 (u64::MAX, usize::MAX)
4395 } else {
4396 (0, 0)
4397 };
4398
4399 let limit = limit + 1;
4400 let mut event_keys = match query {
4401 EventFilter::All(filters) => {
4402 if filters.is_empty() {
4403 index_store.all_events(tx_num, event_num, limit, descending)?
4404 } else {
4405 return Err(IotaError::UserInput {
4406 error: UserInputError::Unsupported(
4407 "This query type does not currently support filter combinations"
4408 .to_string(),
4409 ),
4410 });
4411 }
4412 }
4413 EventFilter::Transaction(digest) => {
4414 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4415 }
4416 EventFilter::MoveModule { package, module } => {
4417 let module_id = ModuleId::new(
4418 AccountAddress::new(package.into_bytes()),
4419 move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4420 );
4421 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4422 }
4423 EventFilter::MoveEventType(struct_name) => index_store
4424 .events_by_move_event_struct_name(
4425 &struct_name,
4426 tx_num,
4427 event_num,
4428 limit,
4429 descending,
4430 )?,
4431 EventFilter::Sender(sender) => {
4432 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4433 }
4434 EventFilter::TimeRange {
4435 start_time,
4436 end_time,
4437 } => index_store
4438 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4439 EventFilter::MoveEventModule { package, module } => index_store
4440 .events_by_move_event_module(
4441 &ModuleId::new(
4442 AccountAddress::new(package.into_bytes()),
4443 move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4444 ),
4445 tx_num,
4446 event_num,
4447 limit,
4448 descending,
4449 )?,
4450 EventFilter::Package(_)
4452 | EventFilter::MoveEventField { .. }
4453 | EventFilter::Any(_)
4454 | EventFilter::And(_, _)
4455 | EventFilter::Or(_, _) => {
4456 return Err(IotaError::UserInput {
4457 error: UserInputError::Unsupported(
4458 "This query type is not supported by the full node.".to_string(),
4459 ),
4460 });
4461 }
4462 };
4463
4464 if cursor.is_some() {
4467 if !event_keys.is_empty() {
4468 event_keys.remove(0);
4469 }
4470 } else {
4471 event_keys.truncate(limit - 1);
4472 }
4473
4474 let transaction_digests = event_keys
4476 .iter()
4477 .map(|(_, digest, _, _)| *digest)
4478 .collect::<HashSet<_>>()
4479 .into_iter()
4480 .collect::<Vec<_>>();
4481
4482 let events = kv_store
4483 .multi_get_events_by_tx_digests(&transaction_digests)
4484 .await?;
4485
4486 let events_map: HashMap<_, _> =
4487 transaction_digests.iter().zip(events.into_iter()).collect();
4488
4489 let stored_events = event_keys
4490 .into_iter()
4491 .map(|k| {
4492 (
4493 k,
4494 events_map
4495 .get(&k.1)
4496 .expect("fetched digest is missing")
4497 .clone()
4498 .and_then(|e| e.get(k.2).cloned()),
4499 )
4500 })
4501 .map(
4502 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4503 event
4504 .map(|e| (e, tx_digest, event_seq, timestamp))
4505 .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4506 },
4507 )
4508 .collect::<Result<Vec<_>, _>>()?;
4509
4510 let epoch_store = self.load_epoch_store_one_call_per_task();
4511 let backing_store = self.get_backing_package_store().as_ref();
4512 let mut layout_resolver = epoch_store
4513 .executor()
4514 .type_layout_resolver(Box::new(backing_store));
4515 let mut events = vec![];
4516 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4517 events.push(IotaEvent::try_from(
4518 e.clone(),
4519 tx_digest,
4520 event_seq as u64,
4521 Some(timestamp),
4522 layout_resolver.get_annotated_layout(&e.type_)?,
4523 )?)
4524 }
4525 Ok(events)
4526 }
4527
4528 pub async fn insert_genesis_object(&self, object: Object) {
4529 self.get_reconfig_api()
4530 .try_insert_genesis_object(object)
4531 .expect("Cannot insert genesis object")
4532 }
4533
4534 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4535 futures::future::join_all(
4536 objects
4537 .iter()
4538 .map(|o| self.insert_genesis_object(o.clone())),
4539 )
4540 .await;
4541 }
4542
4543 #[instrument(level = "trace", skip_all)]
4545 pub fn get_transaction_status(
4546 &self,
4547 transaction_digest: &TransactionDigest,
4548 epoch_store: &Arc<AuthorityPerEpochStore>,
4549 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4550 if let Some(effects) =
4552 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4553 {
4554 if let Some(transaction) = self
4555 .get_transaction_cache_reader()
4556 .try_get_transaction_block(transaction_digest)?
4557 {
4558 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4559 let events = if effects.events_digest().is_some() {
4560 self.get_transaction_events(effects.transaction_digest())?
4561 } else {
4562 TransactionEvents::default()
4563 };
4564 return Ok(Some((
4565 (*transaction).clone().into_message(),
4566 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4567 )));
4568 } else {
4569 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4574 }
4575 }
4576 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4577 self.metrics.tx_already_processed.inc();
4578 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4579 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4580 } else {
4581 Ok(None)
4582 }
4583 }
4584
4585 #[instrument(level = "trace", skip_all)]
4589 pub fn get_signed_effects_and_maybe_resign(
4590 &self,
4591 transaction_digest: &TransactionDigest,
4592 epoch_store: &Arc<AuthorityPerEpochStore>,
4593 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4594 let effects = self
4595 .get_transaction_cache_reader()
4596 .try_get_executed_effects(transaction_digest)?;
4597 match effects {
4598 Some(effects) => {
4599 if effects.epoch() != epoch_store.epoch() {
4622 debug!(
4623 tx_digest=?transaction_digest,
4624 effects_epoch=?effects.epoch(),
4625 epoch=?epoch_store.epoch(),
4626 "Re-signing the effects with the current epoch"
4627 );
4628 }
4629 Ok(Some(self.sign_effects(effects, epoch_store)?))
4630 }
4631 None => Ok(None),
4632 }
4633 }
4634
4635 #[instrument(level = "trace", skip_all)]
4636 pub(crate) fn sign_effects(
4637 &self,
4638 effects: TransactionEffects,
4639 epoch_store: &Arc<AuthorityPerEpochStore>,
4640 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4641 let tx_digest = *effects.transaction_digest();
4642 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4643 Some(sig) => {
4644 debug_assert!(sig.epoch == epoch_store.epoch());
4645 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4646 }
4647 _ => {
4648 let sig = AuthoritySignInfo::new(
4649 epoch_store.epoch(),
4650 &effects,
4651 Intent::iota_app(IntentScope::TransactionEffects),
4652 self.name,
4653 &*self.secret,
4654 );
4655
4656 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4657
4658 epoch_store.insert_effects_digest_and_signature(
4659 &tx_digest,
4660 effects.digest(),
4661 &sig,
4662 )?;
4663
4664 effects
4665 }
4666 };
4667
4668 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4669 signed_effects,
4670 ))
4671 }
4672
4673 #[instrument(level = "trace", skip_all)]
4675 fn fullnode_only_get_tx_coins_for_indexing(
4676 &self,
4677 effects: &TransactionEffects,
4678 inner_temporary_store: &InnerTemporaryStore,
4679 epoch_store: &Arc<AuthorityPerEpochStore>,
4680 ) -> Option<TxCoins> {
4681 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4682 return None;
4683 }
4684 let written_coin_objects = inner_temporary_store
4685 .written
4686 .iter()
4687 .filter_map(|(k, v)| {
4688 if v.is_coin() {
4689 Some((*k, v.clone()))
4690 } else {
4691 None
4692 }
4693 })
4694 .collect();
4695 let mut input_coin_objects = inner_temporary_store
4696 .input_objects
4697 .iter()
4698 .filter_map(|(k, v)| {
4699 if v.is_coin() {
4700 Some((*k, v.clone()))
4701 } else {
4702 None
4703 }
4704 })
4705 .collect::<ObjectMap>();
4706
4707 for (object_id, version) in effects.modified_at_versions() {
4712 if inner_temporary_store
4713 .loaded_runtime_objects
4714 .contains_key(&object_id)
4715 {
4716 if let Some(object) = self
4717 .get_object_store()
4718 .get_object_by_key(&object_id, version)
4719 {
4720 if object.is_coin() {
4721 input_coin_objects.insert(object_id, object);
4722 }
4723 }
4724 }
4725 }
4726
4727 Some((input_coin_objects, written_coin_objects))
4728 }
4729
4730 #[instrument(level = "trace", skip_all)]
4742 pub async fn get_transaction_lock(
4743 &self,
4744 object_ref: &ObjectRef,
4745 epoch_store: &AuthorityPerEpochStore,
4746 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4747 let lock_info = self
4748 .get_object_cache_reader()
4749 .try_get_lock(*object_ref, epoch_store)?;
4750 let lock_info = match lock_info {
4751 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4752 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4753 provided_obj_ref: *object_ref,
4754 current_version: locked_ref.version,
4755 }
4756 .into());
4757 }
4758 ObjectLockStatus::Initialized => {
4759 return Ok(None);
4760 }
4761 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4762 };
4763
4764 epoch_store.get_signed_transaction(&lock_info)
4765 }
4766
4767 pub async fn try_get_objects(&self, objects: &[ObjectId]) -> IotaResult<Vec<Option<Object>>> {
4768 self.get_object_cache_reader().try_get_objects(objects)
4769 }
4770
4771 pub async fn get_objects(&self, objects: &[ObjectId]) -> Vec<Option<Object>> {
4773 self.try_get_objects(objects)
4774 .await
4775 .expect("storage access failed")
4776 }
4777
4778 pub async fn try_get_object_or_tombstone(
4779 &self,
4780 object_id: ObjectId,
4781 ) -> IotaResult<Option<ObjectRef>> {
4782 self.get_object_cache_reader()
4783 .try_get_latest_object_ref_or_tombstone(object_id)
4784 }
4785
4786 pub async fn get_object_or_tombstone(&self, object_id: ObjectId) -> Option<ObjectRef> {
4788 self.try_get_object_or_tombstone(object_id)
4789 .await
4790 .expect("storage access failed")
4791 }
4792
4793 pub fn set_override_protocol_upgrade_buffer_stake(
4803 &self,
4804 expected_epoch: EpochId,
4805 buffer_stake_bps: u64,
4806 ) -> IotaResult {
4807 let epoch_store = self.load_epoch_store_one_call_per_task();
4808 let actual_epoch = epoch_store.epoch();
4809 if actual_epoch != expected_epoch {
4810 return Err(IotaError::WrongEpoch {
4811 expected_epoch,
4812 actual_epoch,
4813 });
4814 }
4815
4816 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4817 }
4818
4819 pub fn clear_override_protocol_upgrade_buffer_stake(
4820 &self,
4821 expected_epoch: EpochId,
4822 ) -> IotaResult {
4823 let epoch_store = self.load_epoch_store_one_call_per_task();
4824 let actual_epoch = epoch_store.epoch();
4825 if actual_epoch != expected_epoch {
4826 return Err(IotaError::WrongEpoch {
4827 expected_epoch,
4828 actual_epoch,
4829 });
4830 }
4831
4832 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4833 }
4834
4835 pub async fn get_available_system_packages(
4839 &self,
4840 binary_config: &BinaryConfig,
4841 ) -> Vec<ObjectRef> {
4842 let mut results = vec![];
4843
4844 let system_packages = BuiltInFramework::iter_system_packages();
4845
4846 #[cfg(msim)]
4848 let extra_packages = framework_injection::get_extra_packages(self.name);
4849 #[cfg(msim)]
4850 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4851
4852 for system_package in system_packages {
4853 let modules = system_package.modules().to_vec();
4854 #[cfg(msim)]
4856 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4857 .unwrap_or(modules);
4858
4859 let Some(obj_ref) = iota_framework::compare_system_package(
4860 &self.get_object_store(),
4861 &system_package.id,
4862 &modules,
4863 system_package.dependencies.to_vec(),
4864 binary_config,
4865 )
4866 .await
4867 else {
4868 return vec![];
4869 };
4870 results.push(obj_ref);
4871 }
4872
4873 results
4874 }
4875
4876 async fn get_system_package_bytes(
4893 &self,
4894 system_packages: Vec<ObjectRef>,
4895 binary_config: &BinaryConfig,
4896 ) -> Option<Vec<SystemPackage>> {
4897 let ids: Vec<_> = system_packages
4898 .iter()
4899 .map(|object_ref| object_ref.object_id)
4900 .collect();
4901 let objects = self.get_objects(&ids).await;
4902
4903 let mut res = Vec::with_capacity(system_packages.len());
4904 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4905 let prev_transaction = match object {
4906 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4907 info!(
4909 "Framework {} does not need updating",
4910 system_package_ref.object_id
4911 );
4912 continue;
4913 }
4914
4915 Some(cur_object) => cur_object.previous_transaction,
4916 None => TransactionDigest::GENESIS_MARKER,
4917 };
4918
4919 #[cfg(msim)]
4920 let FrameworkSystemPackage {
4921 id: _,
4922 bytes,
4923 dependencies,
4924 } = framework_injection::get_override_system_package(
4925 &system_package_ref.object_id,
4926 self.name,
4927 )
4928 .unwrap_or_else(|| {
4929 BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone()
4930 });
4931
4932 #[cfg(not(msim))]
4933 let FrameworkSystemPackage {
4934 id: _,
4935 bytes,
4936 dependencies,
4937 } = BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone();
4938
4939 let modules: Vec<_> = bytes
4940 .iter()
4941 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4942 .collect();
4943
4944 let new_object = Object::new_system_package(
4945 &modules,
4946 system_package_ref.version,
4947 dependencies.clone(),
4948 prev_transaction,
4949 );
4950
4951 let new_ref = new_object.compute_object_reference();
4952 if new_ref != system_package_ref {
4953 error!(
4954 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4955 );
4956 return None;
4957 }
4958
4959 res.push(SystemPackage {
4960 version: system_package_ref.version,
4961 modules: bytes,
4962 dependencies,
4963 });
4964 }
4965
4966 Some(res)
4967 }
4968
4969 fn is_protocol_version_supported_v1(
4973 proposed_protocol_version: ProtocolVersion,
4974 committee: &Committee,
4975 capabilities: Vec<AuthorityCapabilitiesV1>,
4976 mut buffer_stake_bps: u64,
4977 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4978 if buffer_stake_bps > 10000 {
4979 warn!("clamping buffer_stake_bps to 10000");
4980 buffer_stake_bps = 10000;
4981 }
4982
4983 let mut desired_upgrades: Vec<_> = capabilities
4986 .into_iter()
4987 .filter_map(|mut cap| {
4988 if cap.available_system_packages.is_empty() {
4990 return None;
4991 }
4992
4993 cap.available_system_packages.sort();
4994
4995 info!(
4996 "validator {:?} supports {:?} with system packages: {:?}",
4997 cap.authority.concise(),
4998 cap.supported_protocol_versions,
4999 cap.available_system_packages,
5000 );
5001
5002 cap.supported_protocol_versions
5006 .get_version_digest(proposed_protocol_version)
5007 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5008 })
5009 .collect();
5010
5011 desired_upgrades.sort();
5014 desired_upgrades
5015 .into_iter()
5016 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5017 .into_iter()
5018 .find_map(|((digest, packages), group)| {
5019 assert!(!packages.is_empty());
5021
5022 let mut stake_aggregator: StakeAggregator<(), true> =
5023 StakeAggregator::new(Arc::new(committee.clone()));
5024
5025 for (_, _, authority) in group {
5026 stake_aggregator.insert_generic(authority, ());
5027 }
5028
5029 let total_votes = stake_aggregator.total_votes();
5030 let quorum_threshold = committee.quorum_threshold();
5031 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5032
5033 info!(
5034 protocol_config_digest = ?digest,
5035 ?total_votes,
5036 ?quorum_threshold,
5037 ?buffer_stake_bps,
5038 ?effective_threshold,
5039 ?proposed_protocol_version,
5040 ?packages,
5041 "support for upgrade"
5042 );
5043
5044 let has_support = total_votes >= effective_threshold;
5045 has_support.then_some((proposed_protocol_version, digest, packages))
5046 })
5047 }
5048
5049 fn choose_protocol_version_and_system_packages_v1(
5053 current_protocol_version: ProtocolVersion,
5054 current_protocol_digest: Digest,
5055 committee: &Committee,
5056 capabilities: Vec<AuthorityCapabilitiesV1>,
5057 buffer_stake_bps: u64,
5058 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
5059 let mut next_protocol_version = current_protocol_version;
5060 let mut system_packages = vec![];
5061 let mut protocol_version_digest = current_protocol_digest;
5062
5063 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
5067 next_protocol_version + 1,
5068 committee,
5069 capabilities.clone(),
5070 buffer_stake_bps,
5071 ) {
5072 next_protocol_version = version;
5073 protocol_version_digest = digest;
5074 system_packages = packages;
5075 }
5076
5077 (
5078 next_protocol_version,
5079 protocol_version_digest,
5080 system_packages,
5081 )
5082 }
5083
5084 fn get_validators_supporting_protocol_version(
5089 target_protocol_version: ProtocolVersion,
5090 target_digest: Digest,
5091 active_validators: &[AuthorityPublicKey],
5092 capabilities: &[AuthorityCapabilitiesV1],
5093 ) -> Vec<u64> {
5094 let mut eligible_validators = Vec::new();
5095
5096 for capability in capabilities {
5097 if let Some(digest) = capability
5099 .supported_protocol_versions
5100 .get_version_digest(target_protocol_version)
5101 {
5102 if digest == target_digest {
5103 if let Some(index) = active_validators
5105 .iter()
5106 .position(|name| AuthorityName::from(name) == capability.authority)
5107 {
5108 eligible_validators.push(index as u64);
5109 }
5110 }
5111 }
5112 }
5113
5114 eligible_validators.sort();
5116 eligible_validators
5117 }
5118
5119 fn calculate_eligible_validators_weight(
5124 eligible_validator_indices: &[u64],
5125 active_validators: &[AuthorityPublicKey],
5126 committee: &Committee,
5127 ) -> u64 {
5128 let mut total_weight = 0u64;
5129
5130 for &index in eligible_validator_indices {
5131 let authority_pubkey = &active_validators[index as usize];
5132 if let Some((_, weight)) = committee
5134 .members()
5135 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5136 {
5137 total_weight += weight;
5138 }
5139 }
5140
5141 total_weight
5142 }
5143
5144 #[instrument(level = "error", skip_all)]
5157 pub async fn create_and_execute_advance_epoch_tx(
5158 &self,
5159 epoch_store: &Arc<AuthorityPerEpochStore>,
5160 gas_cost_summary: &GasCostSummary,
5161 checkpoint: CheckpointSequenceNumber,
5162 epoch_start_timestamp_ms: CheckpointTimestamp,
5163 scores: Vec<u64>,
5164 ) -> anyhow::Result<(
5165 IotaSystemState,
5166 Option<SystemEpochInfoEvent>,
5167 TransactionEffects,
5168 )> {
5169 let mut txns = Vec::new();
5170
5171 let next_epoch = epoch_store.epoch() + 1;
5172
5173 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5174 let authority_capabilities = epoch_store
5175 .get_capabilities_v1()
5176 .expect("read capabilities from db cannot fail");
5177 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5178 Self::choose_protocol_version_and_system_packages_v1(
5179 epoch_store.protocol_version(),
5180 SupportedProtocolVersionsWithHashes::protocol_config_digest(
5181 epoch_store.protocol_config(),
5182 ),
5183 epoch_store.committee(),
5184 authority_capabilities.clone(),
5185 buffer_stake_bps,
5186 );
5187
5188 let config = epoch_store.protocol_config();
5192 let binary_config = to_binary_config(config);
5193 let Some(next_epoch_system_package_bytes) = self
5194 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5195 .await
5196 else {
5197 error!(
5198 "upgraded system packages {:?} are not locally available, cannot create \
5199 ChangeEpochTx. validator binary must be upgraded to the correct version!",
5200 next_epoch_system_packages
5201 );
5202 bail!("missing system packages: cannot form ChangeEpochTx");
5212 };
5213
5214 if config.select_committee_from_eligible_validators() {
5217 let active_validators = epoch_store.epoch_start_state().get_active_validators();
5219
5220 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5221
5222 if config.select_committee_supporting_next_epoch_version() {
5226 eligible_active_validators = Self::get_validators_supporting_protocol_version(
5227 next_epoch_protocol_version,
5228 next_epoch_protocol_digest,
5229 &active_validators,
5230 &authority_capabilities,
5231 );
5232
5233 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5235 &eligible_active_validators,
5236 &active_validators,
5237 epoch_store.committee(),
5238 );
5239
5240 let committee = epoch_store.committee();
5244 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5245
5246 if eligible_validators_weight < effective_threshold {
5247 error!(
5248 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5249 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5250 );
5251 eligible_active_validators = (0..active_validators.len() as u64).collect();
5254 }
5255 }
5256
5257 if config.pass_validator_scores_to_advance_epoch() {
5260 txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5261 next_epoch,
5262 next_epoch_protocol_version.as_u64(),
5263 gas_cost_summary.storage_cost,
5264 gas_cost_summary.computation_cost,
5265 gas_cost_summary.computation_cost_burned,
5266 gas_cost_summary.storage_rebate,
5267 gas_cost_summary.non_refundable_storage_fee,
5268 epoch_start_timestamp_ms,
5269 next_epoch_system_package_bytes,
5270 eligible_active_validators,
5271 scores,
5272 config.adjust_rewards_by_score(),
5273 ));
5274 } else {
5275 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5276 next_epoch,
5277 next_epoch_protocol_version.as_u64(),
5278 gas_cost_summary.storage_cost,
5279 gas_cost_summary.computation_cost,
5280 gas_cost_summary.computation_cost_burned,
5281 gas_cost_summary.storage_rebate,
5282 gas_cost_summary.non_refundable_storage_fee,
5283 epoch_start_timestamp_ms,
5284 next_epoch_system_package_bytes,
5285 eligible_active_validators,
5286 ));
5287 }
5288 } else if config.protocol_defined_base_fee()
5289 && config.max_committee_members_count_as_option().is_some()
5290 {
5291 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5292 next_epoch,
5293 next_epoch_protocol_version.as_u64(),
5294 gas_cost_summary.storage_cost,
5295 gas_cost_summary.computation_cost,
5296 gas_cost_summary.computation_cost_burned,
5297 gas_cost_summary.storage_rebate,
5298 gas_cost_summary.non_refundable_storage_fee,
5299 epoch_start_timestamp_ms,
5300 next_epoch_system_package_bytes,
5301 ));
5302 } else {
5303 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5304 next_epoch,
5305 next_epoch_protocol_version.as_u64(),
5306 gas_cost_summary.storage_cost,
5307 gas_cost_summary.computation_cost,
5308 gas_cost_summary.storage_rebate,
5309 gas_cost_summary.non_refundable_storage_fee,
5310 epoch_start_timestamp_ms,
5311 next_epoch_system_package_bytes,
5312 ));
5313 }
5314
5315 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5316
5317 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5318 tx.clone(),
5319 epoch_store.epoch(),
5320 checkpoint,
5321 );
5322
5323 let tx_digest = executable_tx.digest();
5324
5325 info!(
5326 ?next_epoch,
5327 ?next_epoch_protocol_version,
5328 ?next_epoch_system_packages,
5329 computation_cost=?gas_cost_summary.computation_cost,
5330 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5331 storage_cost=?gas_cost_summary.storage_cost,
5332 storage_rebate=?gas_cost_summary.storage_rebate,
5333 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5334 ?tx_digest,
5335 "Creating advance epoch transaction"
5336 );
5337
5338 fail_point_async!("change_epoch_tx_delay");
5339 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5340
5341 if self
5345 .get_transaction_cache_reader()
5346 .try_is_tx_already_executed(tx_digest)?
5347 {
5348 warn!("change epoch tx has already been executed via state sync");
5349 bail!("change epoch tx has already been executed via state sync",);
5350 }
5351
5352 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5353
5354 epoch_store.assign_shared_object_versions_idempotent(
5358 self.get_object_cache_reader().as_ref(),
5359 std::slice::from_ref(&executable_tx),
5360 )?;
5361
5362 let (input_objects, _) =
5363 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5364
5365 let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5366 &execution_guard,
5367 &executable_tx,
5368 input_objects,
5369 vec![],
5370 epoch_store,
5371 )?;
5372 let system_obj = get_iota_system_state(&temporary_store.written)
5373 .expect("change epoch tx must write to system object");
5374 let system_epoch_info_event = temporary_store
5376 .events
5377 .0
5378 .into_iter()
5379 .find(|event| event.is_system_epoch_info_event())
5380 .map(SystemEpochInfoEvent::from);
5381 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5384
5385 self.get_state_sync_store()
5389 .try_insert_transaction_and_effects(&tx, &effects)
5390 .map_err(|err| {
5391 let err: anyhow::Error = err.into();
5392 err
5393 })?;
5394
5395 info!(
5396 "Effects summary of the change epoch transaction: {:?}",
5397 effects.summary_for_debug()
5398 );
5399 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5400 assert!(effects.status().is_success());
5402 Ok((system_obj, system_epoch_info_event, effects))
5403 }
5404
5405 #[instrument(level = "error", skip_all)]
5409 async fn revert_uncommitted_epoch_transactions(
5410 &self,
5411 epoch_store: &AuthorityPerEpochStore,
5412 ) -> IotaResult {
5413 {
5414 let state = epoch_store.get_reconfig_state_write_lock_guard();
5415 if state.should_accept_user_certs() {
5416 epoch_store.close_user_certs(state);
5425 }
5426 }
5428 let pending_certificates = epoch_store.pending_consensus_certificates();
5429 info!(
5430 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5431 pending_certificates.len(),
5432 pending_certificates,
5433 );
5434 for digest in pending_certificates {
5435 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5436 info!(
5437 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5438 digest
5439 );
5440 continue;
5441 }
5442 info!("Reverting {:?} at the end of epoch", digest);
5443 epoch_store.revert_executed_transaction(&digest)?;
5444 self.get_reconfig_api().try_revert_state_update(&digest)?;
5445 }
5446 info!("All uncommitted local transactions reverted");
5447 Ok(())
5448 }
5449
5450 #[instrument(level = "error", skip_all)]
5451 async fn reopen_epoch_db(
5452 &self,
5453 cur_epoch_store: &AuthorityPerEpochStore,
5454 new_committee: Committee,
5455 epoch_start_configuration: EpochStartConfiguration,
5456 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5457 epoch_last_checkpoint: CheckpointSequenceNumber,
5458 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5459 let new_epoch = new_committee.epoch;
5460 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5461 assert_eq!(
5462 epoch_start_configuration.epoch_start_state().epoch(),
5463 new_committee.epoch
5464 );
5465 fail_point!("before-open-new-epoch-store");
5466 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5467 self.name,
5468 new_committee,
5469 epoch_start_configuration,
5470 self.get_backing_package_store().clone(),
5471 expensive_safety_check_config,
5472 epoch_last_checkpoint,
5473 )?;
5474 self.epoch_store.store(new_epoch_store.clone());
5475 Ok(new_epoch_store)
5476 }
5477
5478 fn check_move_account(
5481 &self,
5482 auth_account_object_id: ObjectId,
5483 auth_account_object_seq_number: Option<SequenceNumber>,
5484 auth_account_object_digest: Option<ObjectDigest>,
5485 account_object: ObjectReadResult,
5486 signer: &IotaAddress,
5487 ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5488 let account_object = match account_object.object {
5489 ObjectReadResultKind::Object(object) => Ok(object),
5490 ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5491 Err(UserInputError::AccountObjectDeleted {
5492 account_id: account_object.id(),
5493 account_version: version,
5494 transaction_digest: digest,
5495 })
5496 }
5497 ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5500 Err(UserInputError::AccountObjectInCanceledTransaction {
5501 account_id: account_object.id(),
5502 account_version: version,
5503 })
5504 }
5505 }?;
5506
5507 let account_object_addr = IotaAddress::from(auth_account_object_id);
5508
5509 fp_ensure!(
5510 signer == &account_object_addr,
5511 UserInputError::IncorrectUserSignature {
5512 error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5513 }
5514 .into()
5515 );
5516
5517 fp_ensure!(
5518 account_object.is_shared() || account_object.is_immutable(),
5519 UserInputError::AccountObjectNotSupported {
5520 object_id: auth_account_object_id
5521 }
5522 .into()
5523 );
5524
5525 let auth_account_object_seq_number =
5526 if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5527 let account_object_version = account_object.version();
5528
5529 fp_ensure!(
5530 account_object_version == auth_account_object_seq_number,
5531 UserInputError::AccountObjectVersionMismatch {
5532 object_id: auth_account_object_id,
5533 expected_version: auth_account_object_seq_number,
5534 actual_version: account_object_version,
5535 }
5536 .into()
5537 );
5538
5539 auth_account_object_seq_number
5540 } else {
5541 account_object.version()
5542 };
5543
5544 if let Some(auth_account_object_digest) = auth_account_object_digest {
5545 let expected_digest = account_object.digest();
5546 fp_ensure!(
5547 expected_digest == auth_account_object_digest,
5548 UserInputError::InvalidAccountObjectDigest {
5549 object_id: auth_account_object_id,
5550 expected_digest,
5551 actual_digest: auth_account_object_digest,
5552 }
5553 .into()
5554 );
5555 }
5556
5557 let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5558 auth_account_object_id,
5559 &AuthenticatorFunctionRefV1Key::tag().into(),
5560 &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5561 )
5562 .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5563 account_object_id: auth_account_object_id,
5564 })?;
5565
5566 let authenticator_function_ref_field = self
5567 .get_object_cache_reader()
5568 .try_find_object_lt_or_eq_version(
5569 authenticator_function_ref_field_id,
5570 auth_account_object_seq_number,
5571 )?;
5572
5573 if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5574 let field_move_object = authenticator_function_ref_field_obj
5575 .data
5576 .as_struct_opt()
5577 .expect("dynamic field should never be a package object");
5578
5579 let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5580 field_move_object.to_rust().map_err(|_| {
5581 UserInputError::InvalidAuthenticatorFunctionRefField {
5582 account_object_id: auth_account_object_id,
5583 }
5584 })?;
5585
5586 Ok(AuthenticatorFunctionRefForExecution::new_v1(
5587 field.value,
5588 authenticator_function_ref_field_obj.compute_object_reference(),
5589 authenticator_function_ref_field_obj.owner,
5590 authenticator_function_ref_field_obj.storage_rebate,
5591 authenticator_function_ref_field_obj.previous_transaction,
5592 ))
5593 } else {
5594 Err(UserInputError::MoveAuthenticatorNotFound {
5595 authenticator_function_ref_id: authenticator_function_ref_field_id,
5596 account_object_id: auth_account_object_id,
5597 account_object_version: auth_account_object_seq_number,
5598 }
5599 .into())
5600 }
5601 }
5602
5603 #[allow(clippy::type_complexity)]
5604 fn read_objects_for_signing(
5605 &self,
5606 transaction: &VerifiedTransaction,
5607 epoch: u64,
5608 ) -> IotaResult<(
5609 InputObjects,
5610 ReceivingObjects,
5611 Vec<(InputObjects, ObjectReadResult)>,
5612 )> {
5613 let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5614 Some(transaction.digest()),
5615 &transaction.collect_all_input_object_kind_for_reading()?,
5616 &transaction.data().transaction_data().receiving_objects(),
5617 epoch,
5618 )?;
5619
5620 transaction
5621 .split_input_objects_into_groups_for_reading(input_objects)
5622 .map(|(tx_input_objects, per_authenticator_inputs)| {
5623 (
5624 tx_input_objects,
5625 tx_receiving_objects,
5626 per_authenticator_inputs,
5627 )
5628 })
5629 }
5630
5631 #[allow(clippy::type_complexity)]
5632 fn check_transaction_inputs_for_signing(
5633 &self,
5634 protocol_config: &ProtocolConfig,
5635 reference_gas_price: u64,
5636 tx_data: &TransactionData,
5637 tx_input_objects: InputObjects,
5638 tx_receiving_objects: &ReceivingObjects,
5639 move_authenticators: &Vec<&MoveAuthenticator>,
5640 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5641 ) -> IotaResult<(
5642 IotaGasStatus,
5643 CheckedInputObjects,
5644 Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5645 )> {
5646 let authenticator_gas_budget = if move_authenticators.is_empty() {
5647 0
5648 } else {
5649 protocol_config.max_auth_gas()
5652 };
5653
5654 debug_assert_eq!(
5655 move_authenticators.len(),
5656 per_authenticator_inputs.len(),
5657 "Move authenticators amount must match the number of authenticator inputs"
5658 );
5659
5660 let per_authenticator_checked_inputs = move_authenticators
5661 .iter()
5662 .zip(per_authenticator_inputs)
5663 .map(
5664 |(move_authenticator, (authenticator_input_objects, account_object))| {
5665 let (
5667 auth_account_object_id,
5668 auth_account_object_seq_number,
5669 auth_account_object_digest,
5670 ) = move_authenticator.object_to_authenticate_components()?;
5671
5672 let signer = move_authenticator.address()?;
5673
5674 let AuthenticatorFunctionRefForExecution {
5676 authenticator_function_ref,
5677 ..
5678 } = self.check_move_account(
5679 auth_account_object_id,
5680 auth_account_object_seq_number,
5681 auth_account_object_digest,
5682 account_object,
5683 &signer,
5684 )?;
5685
5686 let authenticator_checked_input_objects =
5688 iota_transaction_checks::check_move_authenticator_input_for_signing(
5689 authenticator_input_objects,
5690 )?;
5691
5692 Ok((
5693 authenticator_checked_input_objects,
5694 authenticator_function_ref,
5695 ))
5696 },
5697 )
5698 .collect::<IotaResult<Vec<_>>>()?;
5699
5700 let (gas_status, tx_checked_input_objects) =
5702 iota_transaction_checks::check_transaction_input(
5703 protocol_config,
5704 reference_gas_price,
5705 tx_data,
5706 tx_input_objects,
5707 tx_receiving_objects,
5708 &self.metrics.bytecode_verifier_metrics,
5709 &self.config.verifier_signing_config,
5710 authenticator_gas_budget,
5711 )?;
5712
5713 Ok((
5714 gas_status,
5715 tx_checked_input_objects,
5716 per_authenticator_checked_inputs,
5717 ))
5718 }
5719
5720 #[cfg(test)]
5721 pub(crate) fn iter_live_object_set_for_testing(
5722 &self,
5723 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5724 self.get_global_state_hash_store()
5725 .iter_cached_live_object_set_for_testing()
5726 }
5727
5728 #[cfg(test)]
5729 pub(crate) fn shutdown_execution_for_test(&self) {
5730 self.tx_execution_shutdown
5731 .lock()
5732 .take()
5733 .unwrap()
5734 .send(())
5735 .unwrap();
5736 }
5737
5738 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5741 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5742 self.get_object_cache_reader()
5743 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5744 self.get_reconfig_api()
5745 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5746 }
5747}
5748
5749pub struct RandomnessRoundReceiver {
5750 authority_state: Arc<AuthorityState>,
5751 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5752}
5753
5754impl RandomnessRoundReceiver {
5755 pub fn spawn(
5756 authority_state: Arc<AuthorityState>,
5757 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5758 ) -> JoinHandle<()> {
5759 let rrr = RandomnessRoundReceiver {
5760 authority_state,
5761 randomness_rx,
5762 };
5763 spawn_monitored_task!(rrr.run())
5764 }
5765
5766 async fn run(mut self) {
5767 info!("RandomnessRoundReceiver event loop started");
5768
5769 loop {
5770 tokio::select! {
5771 maybe_recv = self.randomness_rx.recv() => {
5772 if let Some((epoch, round, bytes)) = maybe_recv {
5773 self.handle_new_randomness(epoch, round, bytes);
5774 } else {
5775 break;
5776 }
5777 },
5778 }
5779 }
5780
5781 info!("RandomnessRoundReceiver event loop ended");
5782 }
5783
5784 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5785 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5786 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5787 if epoch_store.epoch() != epoch {
5788 warn!(
5789 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5790 epoch_store.epoch()
5791 );
5792 return;
5793 }
5794 let transaction = VerifiedTransaction::new_randomness_state_update(
5795 epoch,
5796 round,
5797 bytes,
5798 epoch_store
5799 .epoch_start_config()
5800 .randomness_obj_initial_shared_version(),
5801 );
5802 debug!(
5803 "created randomness state update transaction with digest: {:?}",
5804 transaction.digest()
5805 );
5806 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5807 let digest = *transaction.digest();
5808
5809 self.authority_state
5814 .get_cache_commit()
5815 .persist_transaction(&transaction);
5816
5817 self.authority_state
5819 .transaction_manager()
5820 .enqueue(vec![transaction], &epoch_store);
5821
5822 let authority_state = self.authority_state.clone();
5823 spawn_monitored_task!(async move {
5824 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5833 let result = tokio::time::timeout(
5834 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5835 authority_state
5836 .get_transaction_cache_reader()
5837 .try_notify_read_executed_effects(&[digest]),
5838 )
5839 .await;
5840 let result = match result {
5841 Ok(result) => result,
5842 Err(_) => {
5843 if cfg!(debug_assertions) {
5844 panic!(
5846 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5847 );
5848 }
5849 warn!(
5850 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5851 );
5852 authority_state
5854 .get_transaction_cache_reader()
5855 .try_notify_read_executed_effects(&[digest])
5856 .await
5857 }
5858 };
5859
5860 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5861 let effects = effects.pop().expect("should return effects");
5862 if *effects.status() != ExecutionStatus::Success {
5863 fatal!(
5864 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5865 );
5866 }
5867 debug!(
5868 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5869 );
5870 });
5871 }
5872}
5873
5874#[async_trait]
5875impl TransactionKeyValueStoreTrait for AuthorityState {
5876 async fn multi_get(
5877 &self,
5878 transaction_keys: &[TransactionDigest],
5879 effects_keys: &[TransactionDigest],
5880 ) -> IotaResult<KVStoreTransactionData> {
5881 let txns = if !transaction_keys.is_empty() {
5882 self.get_transaction_cache_reader()
5883 .try_multi_get_transaction_blocks(transaction_keys)?
5884 .into_iter()
5885 .map(|t| t.map(|t| (*t).clone().into_inner()))
5886 .collect()
5887 } else {
5888 vec![]
5889 };
5890
5891 let fx = if !effects_keys.is_empty() {
5892 self.get_transaction_cache_reader()
5893 .try_multi_get_executed_effects(effects_keys)?
5894 } else {
5895 vec![]
5896 };
5897
5898 Ok((txns, fx))
5899 }
5900
5901 async fn multi_get_checkpoints(
5902 &self,
5903 checkpoint_summaries: &[CheckpointSequenceNumber],
5904 checkpoint_contents: &[CheckpointSequenceNumber],
5905 checkpoint_summaries_by_digest: &[CheckpointDigest],
5906 ) -> IotaResult<(
5907 Vec<Option<CertifiedCheckpointSummary>>,
5908 Vec<Option<CheckpointContents>>,
5909 Vec<Option<CertifiedCheckpointSummary>>,
5910 )> {
5911 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5913 let store = self.get_checkpoint_store();
5914 for seq in checkpoint_summaries {
5915 let checkpoint = store
5916 .get_checkpoint_by_sequence_number(*seq)?
5917 .map(|c| c.into_inner());
5918
5919 summaries.push(checkpoint);
5920 }
5921
5922 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5923 for seq in checkpoint_contents {
5924 let checkpoint = store
5925 .get_checkpoint_by_sequence_number(*seq)?
5926 .and_then(|summary| {
5927 store
5928 .get_checkpoint_contents(&summary.content_digest)
5929 .expect("db read cannot fail")
5930 });
5931 contents.push(checkpoint);
5932 }
5933
5934 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5935 for digest in checkpoint_summaries_by_digest {
5936 let checkpoint = store
5937 .get_checkpoint_by_digest(digest)?
5938 .map(|c| c.into_inner());
5939 summaries_by_digest.push(checkpoint);
5940 }
5941
5942 Ok((summaries, contents, summaries_by_digest))
5943 }
5944
5945 async fn get_transaction_perpetual_checkpoint(
5946 &self,
5947 digest: TransactionDigest,
5948 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5949 self.get_checkpoint_cache()
5950 .try_get_transaction_perpetual_checkpoint(&digest)
5951 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5952 }
5953
5954 async fn get_object(
5955 &self,
5956 object_id: ObjectId,
5957 version: VersionNumber,
5958 ) -> IotaResult<Option<Object>> {
5959 self.get_object_cache_reader()
5960 .try_get_object_by_key(&object_id, version)
5961 }
5962
5963 #[instrument(skip_all)]
5964 async fn multi_get_objects(
5965 &self,
5966 object_keys: &[ObjectKey],
5967 ) -> IotaResult<Vec<Option<Object>>> {
5968 Ok(self
5969 .get_object_cache_reader()
5970 .multi_get_objects_by_key(object_keys))
5971 }
5972
5973 async fn multi_get_transactions_perpetual_checkpoints(
5974 &self,
5975 digests: &[TransactionDigest],
5976 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5977 let res = self
5978 .get_checkpoint_cache()
5979 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5980
5981 Ok(res
5982 .into_iter()
5983 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5984 .collect())
5985 }
5986
5987 #[instrument(skip(self, digests), fields(digests = digests.iter().map(|d| d.to_string()).collect::<Vec<String>>().join(", ")))]
5988 async fn multi_get_events_by_tx_digests(
5989 &self,
5990 digests: &[TransactionDigest],
5991 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5992 if digests.is_empty() {
5993 return Ok(vec![]);
5994 }
5995
5996 Ok(self
5997 .get_transaction_cache_reader()
5998 .multi_get_events(digests))
5999 }
6000}
6001
6002#[cfg(msim)]
6003pub mod framework_injection {
6004 use std::{
6005 cell::RefCell,
6006 collections::{BTreeMap, BTreeSet},
6007 };
6008
6009 use iota_framework::{BuiltInFramework, SystemPackage};
6010 use iota_sdk_types::ObjectId;
6011 use iota_types::base_types::AuthorityName;
6012 use move_binary_format::CompiledModule;
6013
6014 type FrameworkOverrideConfig = BTreeMap<ObjectId, PackageOverrideConfig>;
6015
6016 thread_local! {
6018 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6019 }
6020
6021 type Framework = Vec<CompiledModule>;
6022
6023 pub type PackageUpgradeCallback =
6024 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6025
6026 enum PackageOverrideConfig {
6027 Global(Framework),
6028 PerValidator(PackageUpgradeCallback),
6029 }
6030
6031 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6032 modules
6033 .iter()
6034 .map(|m| {
6035 let mut buf = Vec::new();
6036 m.serialize_with_version(m.version, &mut buf).unwrap();
6037 buf
6038 })
6039 .collect()
6040 }
6041
6042 pub fn set_override(package_id: ObjectId, modules: Vec<CompiledModule>) {
6043 OVERRIDE.with(|bs| {
6044 bs.borrow_mut()
6045 .insert(package_id, PackageOverrideConfig::Global(modules))
6046 });
6047 }
6048
6049 pub fn set_override_cb(package_id: ObjectId, func: PackageUpgradeCallback) {
6050 OVERRIDE.with(|bs| {
6051 bs.borrow_mut()
6052 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6053 });
6054 }
6055
6056 pub fn get_override_bytes(package_id: &ObjectId, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6057 OVERRIDE.with(|cfg| {
6058 cfg.borrow().get(package_id).and_then(|entry| match entry {
6059 PackageOverrideConfig::Global(framework) => {
6060 Some(compiled_modules_to_bytes(framework))
6061 }
6062 PackageOverrideConfig::PerValidator(func) => {
6063 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6064 }
6065 })
6066 })
6067 }
6068
6069 pub fn get_override_modules(
6070 package_id: &ObjectId,
6071 name: AuthorityName,
6072 ) -> Option<Vec<CompiledModule>> {
6073 OVERRIDE.with(|cfg| {
6074 cfg.borrow().get(package_id).and_then(|entry| match entry {
6075 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6076 PackageOverrideConfig::PerValidator(func) => func(name),
6077 })
6078 })
6079 }
6080
6081 pub fn get_override_system_package(
6082 package_id: &ObjectId,
6083 name: AuthorityName,
6084 ) -> Option<SystemPackage> {
6085 let bytes = get_override_bytes(package_id, name)?;
6086 let dependencies = if package_id.is_system_package() {
6087 BuiltInFramework::get_package_by_id(package_id)
6088 .dependencies
6089 .to_vec()
6090 } else {
6091 BuiltInFramework::all_package_ids()
6094 };
6095 Some(SystemPackage {
6096 id: *package_id,
6097 bytes,
6098 dependencies,
6099 })
6100 }
6101
6102 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6103 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6104 let extra: Vec<ObjectId> = OVERRIDE.with(|cfg| {
6105 cfg.borrow()
6106 .keys()
6107 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6108 .collect()
6109 });
6110
6111 extra
6112 .into_iter()
6113 .map(|package| SystemPackage {
6114 id: package,
6115 bytes: get_override_bytes(&package, name).unwrap(),
6116 dependencies: BuiltInFramework::all_package_ids(),
6117 })
6118 .collect()
6119 }
6120}
6121
6122#[derive(Debug, Serialize, Deserialize, Clone)]
6123pub struct ObjDumpFormat {
6124 pub id: ObjectId,
6125 pub version: VersionNumber,
6126 pub digest: ObjectDigest,
6127 pub object: Object,
6128}
6129
6130impl ObjDumpFormat {
6131 fn new(object: Object) -> Self {
6132 let oref = object.compute_object_reference();
6133 Self {
6134 id: oref.object_id,
6135 version: oref.version,
6136 digest: oref.digest,
6137 object,
6138 }
6139 }
6140}
6141
6142#[derive(Debug, Serialize, Deserialize, Clone)]
6143pub struct NodeStateDump {
6144 pub tx_digest: TransactionDigest,
6145 pub sender_signed_data: SenderSignedData,
6146 pub executed_epoch: u64,
6147 pub reference_gas_price: u64,
6148 pub protocol_version: u64,
6149 pub epoch_start_timestamp_ms: u64,
6150 pub computed_effects: TransactionEffects,
6151 pub expected_effects_digest: TransactionEffectsDigest,
6152 pub relevant_system_packages: Vec<ObjDumpFormat>,
6153 pub shared_objects: Vec<ObjDumpFormat>,
6154 pub loaded_child_objects: Vec<ObjDumpFormat>,
6155 pub modified_at_versions: Vec<ObjDumpFormat>,
6156 pub runtime_reads: Vec<ObjDumpFormat>,
6157 pub input_objects: Vec<ObjDumpFormat>,
6158}
6159
6160impl NodeStateDump {
6161 pub fn new(
6162 tx_digest: &TransactionDigest,
6163 effects: &TransactionEffects,
6164 expected_effects_digest: TransactionEffectsDigest,
6165 object_store: &dyn ObjectStore,
6166 epoch_store: &Arc<AuthorityPerEpochStore>,
6167 inner_temporary_store: &InnerTemporaryStore,
6168 certificate: &VerifiedExecutableTransaction,
6169 ) -> IotaResult<Self> {
6170 let executed_epoch = epoch_store.epoch();
6172 let reference_gas_price = epoch_store.reference_gas_price();
6173 let epoch_start_config = epoch_store.epoch_start_config();
6174 let protocol_version = epoch_store.protocol_version().as_u64();
6175 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6176
6177 let mut relevant_system_packages = Vec::new();
6179 for sys_package_id in BuiltInFramework::all_package_ids() {
6180 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6181 relevant_system_packages.push(ObjDumpFormat::new(w))
6182 }
6183 }
6184
6185 let mut shared_objects = Vec::new();
6187 for kind in effects.input_shared_objects() {
6188 match kind {
6189 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6190 if let Some(w) =
6191 object_store.try_get_object_by_key(&obj_ref.object_id, obj_ref.version)?
6192 {
6193 shared_objects.push(ObjDumpFormat::new(w))
6194 }
6195 }
6196 InputSharedObject::ReadDeleted(..)
6197 | InputSharedObject::MutateDeleted(..)
6198 | InputSharedObject::Cancelled(..) => (), }
6201 }
6202
6203 let mut loaded_child_objects = Vec::new();
6206 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6207 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6208 loaded_child_objects.push(ObjDumpFormat::new(w))
6209 }
6210 }
6211
6212 let mut modified_at_versions = Vec::new();
6214 for (id, ver) in effects.modified_at_versions() {
6215 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6216 modified_at_versions.push(ObjDumpFormat::new(w))
6217 }
6218 }
6219
6220 let mut runtime_reads = Vec::new();
6224 for obj in inner_temporary_store
6225 .runtime_packages_loaded_from_db
6226 .values()
6227 {
6228 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6229 }
6230
6231 Ok(Self {
6234 tx_digest: *tx_digest,
6235 executed_epoch,
6236 reference_gas_price,
6237 epoch_start_timestamp_ms,
6238 protocol_version,
6239 relevant_system_packages,
6240 shared_objects,
6241 loaded_child_objects,
6242 modified_at_versions,
6243 runtime_reads,
6244 sender_signed_data: certificate.clone().into_message(),
6245 input_objects: inner_temporary_store
6246 .input_objects
6247 .values()
6248 .map(|o| ObjDumpFormat::new(o.clone()))
6249 .collect(),
6250 computed_effects: effects.clone(),
6251 expected_effects_digest,
6252 })
6253 }
6254
6255 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6256 let mut objects = Vec::new();
6257 objects.extend(self.relevant_system_packages.clone());
6258 objects.extend(self.shared_objects.clone());
6259 objects.extend(self.loaded_child_objects.clone());
6260 objects.extend(self.modified_at_versions.clone());
6261 objects.extend(self.runtime_reads.clone());
6262 objects.extend(self.input_objects.clone());
6263 objects
6264 }
6265
6266 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6267 let file_name = format!(
6268 "{}_{}_NODE_DUMP.json",
6269 self.tx_digest,
6270 AuthorityState::unixtime_now_ms()
6271 );
6272 let mut path = path.to_path_buf();
6273 path.push(&file_name);
6274 let mut file = File::create(path.clone())?;
6275 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6276 Ok(path)
6277 }
6278
6279 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6280 let file = File::open(path)?;
6281 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6282 }
6283}
6284
6285fn pre_consensus_move_authenticators<'a>(
6297 tx: &'a VerifiedTransaction,
6298 protocol_config: &ProtocolConfig,
6299) -> Vec<&'a MoveAuthenticator> {
6300 if protocol_config.pre_consensus_sponsor_only_move_authentication() {
6301 if tx.transaction_data().is_sponsored_tx() {
6302 if let Some(sponsor_move_authenticator) = tx.sponsor_move_authenticator() {
6303 vec![sponsor_move_authenticator]
6304 } else {
6305 vec![]
6306 }
6307 } else {
6308 tx.move_authenticators()
6309 }
6310 } else {
6311 tx.move_authenticators()
6312 }
6313}