1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::TxLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage as FrameworkSystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::{
48 Address, EndOfEpochTransactionKind, Event, ExecutionStatus, ObjectId, Owner, RandomnessRound,
49 StructTag, TransactionExpiration, TransactionKind, TypeTag,
50 crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion},
51 gas::GasCostSummary,
52};
53use iota_storage::{
54 key_value_store::{
55 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
56 },
57 key_value_store_metrics::KeyValueStoreMetrics,
58};
59#[cfg(msim)]
60use iota_types::committee::CommitteeTrait;
61use iota_types::{
62 account_abstraction::{
63 account::AuthenticatorFunctionRefV1Key,
64 authenticator_function::{
65 AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
66 AuthenticatorFunctionRefV1, extract_auth_fun_refs,
67 },
68 },
69 auth_context::AuthContextData,
70 base_types::{
71 AuthorityName, ConciseableName, ObjectInfo, ObjectRef, ObjectType, SequenceNumber,
72 VersionNumber,
73 },
74 committee::{Committee, EpochId, ProtocolVersion},
75 crypto::{AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, Signer},
76 deny_list_v1::check_coin_deny_list_v1,
77 digests::{ChainIdentifier, Digest, ObjectDigest, TransactionDigest, TransactionEffectsDigest},
78 dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
79 effects::{
80 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
81 TransactionEffectsExt, TransactionEvents, VerifiedSignedTransactionEffects,
82 },
83 error::{ExecutionError, IotaError, IotaResult, UserInputError},
84 event::{EventID, SystemEpochInfoEvent},
85 executable_transaction::VerifiedExecutableTransaction,
86 execution_config_utils::to_binary_config,
87 fp_ensure,
88 gas::IotaGasStatus,
89 gas_coin::NANOS_PER_IOTA,
90 inner_temporary_store::{
91 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
92 WrittenObjects,
93 },
94 iota_sdk_types_conversions::type_tag_core_to_sdk,
95 iota_system_state::{
96 IotaSystemState, IotaSystemStateTrait,
97 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
98 },
99 layout_resolver::{LayoutResolver, into_struct_layout},
100 message_envelope::Message,
101 messages_checkpoint::{
102 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
103 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
104 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
105 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
106 },
107 messages_consensus::AuthorityCapabilitiesV1,
108 messages_grpc::{
109 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
110 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
111 TransactionStatus,
112 },
113 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
114 move_authenticator::MoveAuthenticator,
115 object::{
116 MoveObject, MoveObjectExt, OBJECT_START_VERSION, Object, ObjectRead, PastObjectRead,
117 bounded_visitor::BoundedVisitor,
118 },
119 storage::{
120 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
121 },
122 supported_protocol_versions::{
123 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
124 },
125 traffic_control::{PolicyConfig, RemoteFirewallConfig, TrafficControlReconfigParams},
126 transaction::*,
127 transaction_executor::{SimulateTransactionResult, VmChecks},
128};
129use itertools::Itertools;
130use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
131use move_core_types::{
132 account_address::AccountAddress, annotated_value::MoveStructLayout, language_storage::ModuleId,
133};
134use parking_lot::Mutex;
135use prometheus::{
136 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
137 register_histogram_vec_with_registry, register_histogram_with_registry,
138 register_int_counter_vec_with_registry, register_int_counter_with_registry,
139 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
140};
141use serde::{Deserialize, Serialize, de::DeserializeOwned};
142use tap::TapFallible;
143use tokio::{
144 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
145 task::JoinHandle,
146};
147use tracing::{debug, error, info, instrument, trace, warn};
148use typed_store::TypedStoreError;
149
150use self::{
151 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
152};
153#[cfg(msim)]
154pub use crate::checkpoints::checkpoint_executor::utils::{
155 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
156};
157use crate::{
158 authority::{
159 authority_per_epoch_store::{AuthorityPerEpochStore, TxGuard},
160 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
161 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
162 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
163 authority_store_tables::AuthorityPrunerTables,
164 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
165 },
166 authority_client::NetworkAuthorityClient,
167 checkpoint_progress_tracker::CheckpointProgressTracker,
168 checkpoints::CheckpointStore,
169 congestion_tracker::CongestionTracker,
170 consensus_adapter::ConsensusAdapter,
171 epoch::committee_store::CommitteeStore,
172 execution_cache::{
173 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
174 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
175 TransactionCacheRead,
176 },
177 execution_driver::execution_process,
178 global_state_hasher::{GlobalStateHashStore, GlobalStateHasher},
179 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
180 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
181 metrics::{LatencyObserver, RateTracker},
182 module_cache_metrics::ResolverMetrics,
183 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
184 stake_aggregator::StakeAggregator,
185 subscription_handler::SubscriptionHandler,
186 traffic_controller::{TrafficController, metrics::TrafficControllerMetrics},
187 transaction_input_loader::TransactionInputLoader,
188 transaction_manager::TransactionManager,
189 transaction_outputs::TransactionOutputs,
190 validator_tx_finalizer::ValidatorTxFinalizer,
191 verify_indexes::verify_indexes,
192};
193
194#[cfg(test)]
195#[path = "unit_tests/authority_tests.rs"]
196pub mod authority_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/transaction_tests.rs"]
200pub mod transaction_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/batch_transaction_tests.rs"]
204mod batch_transaction_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/move_integration_tests.rs"]
208pub mod move_integration_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/gas_tests.rs"]
212mod gas_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/batch_verification_tests.rs"]
216mod batch_verification_tests;
217
218#[cfg(test)]
219#[path = "unit_tests/coin_deny_list_tests.rs"]
220mod coin_deny_list_tests;
221
222#[cfg(test)]
223#[path = "unit_tests/auth_unit_test_utils.rs"]
224pub mod auth_unit_test_utils;
225
226pub mod authority_test_utils;
227
228pub mod authority_per_epoch_store;
229pub mod authority_per_epoch_store_pruner;
230
231mod authority_store_migrations;
232pub mod authority_store_pruner;
233pub mod authority_store_tables;
234pub mod authority_store_types;
235pub mod epoch_start_configuration;
236pub mod shared_object_congestion_tracker;
237pub mod shared_object_version_manager;
238pub mod suggested_gas_price_calculator;
239pub mod test_authority_builder;
240pub mod transaction_deferral;
241
242pub(crate) mod authority_store;
243pub mod backpressure;
244pub(crate) mod dropped_tx_status_cache;
245
246pub struct AuthorityMetrics {
248 tx_orders: IntCounter,
249 total_certs: IntCounter,
250 total_cert_attempts: IntCounter,
251 total_effects: IntCounter,
252 pub shared_obj_tx: IntCounter,
253 sponsored_tx: IntCounter,
254 tx_already_processed: IntCounter,
255 num_input_objs: Histogram,
256 num_shared_objects: Histogram,
257 batch_size: Histogram,
258
259 authority_state_handle_transaction_latency: Histogram,
260
261 execute_certificate_latency_single_writer: Histogram,
262 execute_certificate_latency_shared_object: Histogram,
263
264 internal_execution_latency: Histogram,
265 execution_load_input_objects_latency: Histogram,
266 prepare_certificate_latency: Histogram,
267 commit_certificate_latency: Histogram,
268 db_checkpoint_latency: Histogram,
269
270 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
271 pub(crate) transaction_manager_num_missing_objects: IntGauge,
272 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
273 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
274 pub(crate) transaction_manager_num_ready: IntGauge,
275 pub(crate) transaction_manager_object_cache_size: IntGauge,
276 pub(crate) transaction_manager_object_cache_hits: IntCounter,
277 pub(crate) transaction_manager_object_cache_misses: IntCounter,
278 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
279 pub(crate) transaction_manager_package_cache_size: IntGauge,
280 pub(crate) transaction_manager_package_cache_hits: IntCounter,
281 pub(crate) transaction_manager_package_cache_misses: IntCounter,
282 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
283 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
284
285 pub(crate) execution_driver_executed_transactions: IntCounter,
286 pub(crate) execution_driver_dispatch_queue: IntGauge,
287 pub(crate) execution_queueing_delay_s: Histogram,
288 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
289 pub(crate) execution_gas_latency_ratio: Histogram,
290
291 pub(crate) skipped_consensus_txns: IntCounter,
292 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
293
294 pub(crate) authority_overload_status: IntGauge,
295 pub(crate) authority_load_shedding_percentage: IntGauge,
296
297 pub(crate) transaction_overload_sources: IntCounterVec,
298
299 post_processing_total_events_emitted: IntCounter,
301 post_processing_total_tx_indexed: IntCounter,
302 post_processing_total_tx_had_event_processed: IntCounter,
303 post_processing_total_failures: IntCounter,
304
305 pub consensus_handler_processed: IntCounterVec,
307 pub consensus_handler_transaction_sizes: HistogramVec,
308 pub consensus_handler_num_low_scoring_authorities: IntGauge,
309 pub consensus_handler_scores: IntGaugeVec,
310 pub consensus_handler_deferred_transactions: IntCounter,
311 pub consensus_handler_congested_transactions: IntCounter,
312 pub consensus_handler_cancelled_transactions: IntCounter,
313 pub consensus_handler_validation_dropped_transactions: IntCounter,
314 pub consensus_handler_max_object_costs: IntGaugeVec,
315 pub consensus_committed_subdags: IntCounterVec,
316 pub consensus_committed_messages: IntGaugeVec,
317 pub consensus_committed_user_transactions: IntGaugeVec,
318 pub consensus_handler_leader_round: IntGauge,
319 pub consensus_calculated_throughput: IntGauge,
320 pub consensus_calculated_throughput_profile: IntGauge,
321
322 pub validator_scoreboard_scores: IntGaugeVec,
323 pub invalid_misbehavior_reports_by_authority: IntGaugeVec,
324
325 pub limits_metrics: Arc<LimitsMetrics>,
326
327 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
329
330 pub multisig_sig_count: IntCounter,
332
333 pub execution_queueing_latency: LatencyObserver,
336
337 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
344
345 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
348}
349
350const POSITIVE_INT_BUCKETS: &[f64] = &[
352 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
353 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
354 7000000., 10000000.,
355];
356
357const LATENCY_SEC_BUCKETS: &[f64] = &[
358 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
359 10., 20., 30., 60., 90.,
360];
361
362const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
364 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
365 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
366];
367
368const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
369 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
370 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
371];
372
373pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
377 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
378 let execute_certificate_latency = register_histogram_vec_with_registry!(
379 "authority_state_execute_certificate_latency",
380 "Latency of executing certificates, including waiting for inputs",
381 &["tx_type"],
382 LATENCY_SEC_BUCKETS.to_vec(),
383 registry,
384 )
385 .unwrap();
386
387 let execute_certificate_latency_single_writer =
388 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
389 let execute_certificate_latency_shared_object =
390 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
391
392 Self {
393 tx_orders: register_int_counter_with_registry!(
394 "total_transaction_orders",
395 "Total number of transaction orders",
396 registry,
397 )
398 .unwrap(),
399 total_certs: register_int_counter_with_registry!(
400 "total_transaction_certificates",
401 "Total number of transaction certificates handled",
402 registry,
403 )
404 .unwrap(),
405 total_cert_attempts: register_int_counter_with_registry!(
406 "total_handle_certificate_attempts",
407 "Number of calls to handle_certificate",
408 registry,
409 )
410 .unwrap(),
411 total_effects: register_int_counter_with_registry!(
413 "total_transaction_effects",
414 "Total number of transaction effects produced",
415 registry,
416 )
417 .unwrap(),
418
419 shared_obj_tx: register_int_counter_with_registry!(
420 "num_shared_obj_tx",
421 "Number of transactions involving shared objects",
422 registry,
423 )
424 .unwrap(),
425
426 sponsored_tx: register_int_counter_with_registry!(
427 "num_sponsored_tx",
428 "Number of sponsored transactions",
429 registry,
430 )
431 .unwrap(),
432
433 tx_already_processed: register_int_counter_with_registry!(
434 "num_tx_already_processed",
435 "Number of transaction orders already processed previously",
436 registry,
437 )
438 .unwrap(),
439 num_input_objs: register_histogram_with_registry!(
440 "num_input_objects",
441 "Distribution of number of input TX objects per TX",
442 POSITIVE_INT_BUCKETS.to_vec(),
443 registry,
444 )
445 .unwrap(),
446 num_shared_objects: register_histogram_with_registry!(
447 "num_shared_objects",
448 "Number of shared input objects per TX",
449 POSITIVE_INT_BUCKETS.to_vec(),
450 registry,
451 )
452 .unwrap(),
453 batch_size: register_histogram_with_registry!(
454 "batch_size",
455 "Distribution of size of transaction batch",
456 POSITIVE_INT_BUCKETS.to_vec(),
457 registry,
458 )
459 .unwrap(),
460 authority_state_handle_transaction_latency: register_histogram_with_registry!(
461 "authority_state_handle_transaction_latency",
462 "Latency of handling transactions",
463 LATENCY_SEC_BUCKETS.to_vec(),
464 registry,
465 )
466 .unwrap(),
467 execute_certificate_latency_single_writer,
468 execute_certificate_latency_shared_object,
469 internal_execution_latency: register_histogram_with_registry!(
470 "authority_state_internal_execution_latency",
471 "Latency of actual certificate executions",
472 LATENCY_SEC_BUCKETS.to_vec(),
473 registry,
474 )
475 .unwrap(),
476 execution_load_input_objects_latency: register_histogram_with_registry!(
477 "authority_state_execution_load_input_objects_latency",
478 "Latency of loading input objects for execution",
479 LOW_LATENCY_SEC_BUCKETS.to_vec(),
480 registry,
481 )
482 .unwrap(),
483 prepare_certificate_latency: register_histogram_with_registry!(
484 "authority_state_prepare_certificate_latency",
485 "Latency of executing certificates, before committing the results",
486 LATENCY_SEC_BUCKETS.to_vec(),
487 registry,
488 )
489 .unwrap(),
490 commit_certificate_latency: register_histogram_with_registry!(
491 "authority_state_commit_certificate_latency",
492 "Latency of committing certificate execution results",
493 LATENCY_SEC_BUCKETS.to_vec(),
494 registry,
495 )
496 .unwrap(),
497 db_checkpoint_latency: register_histogram_with_registry!(
498 "db_checkpoint_latency",
499 "Latency of checkpointing dbs",
500 LATENCY_SEC_BUCKETS.to_vec(),
501 registry,
502 ).unwrap(),
503 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
504 "transaction_manager_num_enqueued_certificates",
505 "Current number of certificates enqueued to TransactionManager",
506 &["result"],
507 registry,
508 )
509 .unwrap(),
510 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
511 "transaction_manager_num_missing_objects",
512 "Current number of missing objects in TransactionManager",
513 registry,
514 )
515 .unwrap(),
516 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
517 "transaction_manager_num_pending_certificates",
518 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
519 registry,
520 )
521 .unwrap(),
522 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
523 "transaction_manager_num_executing_certificates",
524 "Number of executing certificates, including queued and actually running certificates",
525 registry,
526 )
527 .unwrap(),
528 transaction_manager_num_ready: register_int_gauge_with_registry!(
529 "transaction_manager_num_ready",
530 "Number of ready transactions in TransactionManager",
531 registry,
532 )
533 .unwrap(),
534 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
535 "transaction_manager_object_cache_size",
536 "Current size of object-availability cache in TransactionManager",
537 registry,
538 )
539 .unwrap(),
540 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
541 "transaction_manager_object_cache_hits",
542 "Number of object-availability cache hits in TransactionManager",
543 registry,
544 )
545 .unwrap(),
546 authority_overload_status: register_int_gauge_with_registry!(
547 "authority_overload_status",
548 "Whether authority is current experiencing overload and enters load shedding mode.",
549 registry)
550 .unwrap(),
551 authority_load_shedding_percentage: register_int_gauge_with_registry!(
552 "authority_load_shedding_percentage",
553 "The percentage of transactions is shed when the authority is in load shedding mode.",
554 registry)
555 .unwrap(),
556 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
557 "transaction_manager_object_cache_misses",
558 "Number of object-availability cache misses in TransactionManager",
559 registry,
560 )
561 .unwrap(),
562 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
563 "transaction_manager_object_cache_evictions",
564 "Number of object-availability cache evictions in TransactionManager",
565 registry,
566 )
567 .unwrap(),
568 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
569 "transaction_manager_package_cache_size",
570 "Current size of package-availability cache in TransactionManager",
571 registry,
572 )
573 .unwrap(),
574 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
575 "transaction_manager_package_cache_hits",
576 "Number of package-availability cache hits in TransactionManager",
577 registry,
578 )
579 .unwrap(),
580 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
581 "transaction_manager_package_cache_misses",
582 "Number of package-availability cache misses in TransactionManager",
583 registry,
584 )
585 .unwrap(),
586 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
587 "transaction_manager_package_cache_evictions",
588 "Number of package-availability cache evictions in TransactionManager",
589 registry,
590 )
591 .unwrap(),
592 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
593 "transaction_manager_transaction_queue_age_s",
594 "Time spent in waiting for transaction in the queue",
595 LATENCY_SEC_BUCKETS.to_vec(),
596 registry,
597 )
598 .unwrap(),
599 transaction_overload_sources: register_int_counter_vec_with_registry!(
600 "transaction_overload_sources",
601 "Number of times each source indicates transaction overload.",
602 &["source"],
603 registry)
604 .unwrap(),
605 execution_driver_executed_transactions: register_int_counter_with_registry!(
606 "execution_driver_executed_transactions",
607 "Cumulative number of transaction executed by execution driver",
608 registry,
609 )
610 .unwrap(),
611 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
612 "execution_driver_dispatch_queue",
613 "Number of transaction pending in execution driver dispatch queue",
614 registry,
615 )
616 .unwrap(),
617 execution_queueing_delay_s: register_histogram_with_registry!(
618 "execution_queueing_delay_s",
619 "Queueing delay between a transaction is ready for execution until it starts executing.",
620 LATENCY_SEC_BUCKETS.to_vec(),
621 registry
622 )
623 .unwrap(),
624 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
625 "prepare_cert_gas_latency_ratio",
626 "The ratio of computation gas divided by VM execution latency.",
627 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
628 registry
629 )
630 .unwrap(),
631 execution_gas_latency_ratio: register_histogram_with_registry!(
632 "execution_gas_latency_ratio",
633 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
634 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
635 registry
636 )
637 .unwrap(),
638 skipped_consensus_txns: register_int_counter_with_registry!(
639 "skipped_consensus_txns",
640 "Total number of consensus transactions skipped",
641 registry,
642 )
643 .unwrap(),
644 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
645 "skipped_consensus_txns_cache_hit",
646 "Total number of consensus transactions skipped because of local cache hit",
647 registry,
648 )
649 .unwrap(),
650 post_processing_total_events_emitted: register_int_counter_with_registry!(
651 "post_processing_total_events_emitted",
652 "Total number of events emitted in post processing",
653 registry,
654 )
655 .unwrap(),
656 post_processing_total_tx_indexed: register_int_counter_with_registry!(
657 "post_processing_total_tx_indexed",
658 "Total number of txes indexed in post processing",
659 registry,
660 )
661 .unwrap(),
662 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
663 "post_processing_total_tx_had_event_processed",
664 "Total number of txes finished event processing in post processing",
665 registry,
666 )
667 .unwrap(),
668 post_processing_total_failures: register_int_counter_with_registry!(
669 "post_processing_total_failures",
670 "Total number of failure in post processing",
671 registry,
672 )
673 .unwrap(),
674 consensus_handler_processed: register_int_counter_vec_with_registry!(
675 "consensus_handler_processed",
676 "Number of transactions processed by consensus handler",
677 &["class"],
678 registry
679 ).unwrap(),
680 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
681 "consensus_handler_transaction_sizes",
682 "Sizes of each type of transactions processed by consensus handler",
683 &["class"],
684 POSITIVE_INT_BUCKETS.to_vec(),
685 registry
686 ).unwrap(),
687 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
688 "consensus_handler_num_low_scoring_authorities",
689 "Number of low scoring authorities based on reputation scores from consensus",
690 registry
691 ).unwrap(),
692 consensus_handler_scores: register_int_gauge_vec_with_registry!(
693 "consensus_handler_scores",
694 "scores from consensus for each authority",
695 &["authority"],
696 registry,
697 ).unwrap(),
698 validator_scoreboard_scores: register_int_gauge_vec_with_registry!(
699 "validator_scoreboard_scores",
700 "Per-authority validator scores published by the local Scoreboard after each consensus commit. Range [0, MAX_SCORE].",
701 &["authority"],
702 registry,
703 ).unwrap(),
704 invalid_misbehavior_reports_by_authority: register_int_gauge_vec_with_registry!(
705 "invalid_misbehavior_reports_by_authority",
706 "Cumulative count of invalid misbehavior reports received from each reporting authority in the current epoch. Bumped when a `MisbehaviorReport` consensus transaction fails sender/authority match or payload validation. Snapshot republished after each consensus commit.",
707 &["authority"],
708 registry,
709 ).unwrap(),
710 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
711 "consensus_handler_deferred_transactions",
712 "Number of transactions deferred by consensus handler",
713 registry,
714 ).unwrap(),
715 consensus_handler_congested_transactions: register_int_counter_with_registry!(
716 "consensus_handler_congested_transactions",
717 "Number of transactions deferred by consensus handler due to congestion",
718 registry,
719 ).unwrap(),
720 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
721 "consensus_handler_cancelled_transactions",
722 "Number of transactions cancelled by consensus handler",
723 registry,
724 ).unwrap(),
725 consensus_handler_validation_dropped_transactions: register_int_counter_with_registry!(
726 "consensus_handler_validation_dropped_transactions",
727 "Number of UserTransactionV1 transactions dropped by post-consensus validation",
728 registry,
729 ).unwrap(),
730 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
731 "consensus_handler_max_congestion_control_object_costs",
732 "Max object costs for congestion control in the current consensus commit",
733 &["commit_type"],
734 registry,
735 ).unwrap(),
736 consensus_committed_subdags: register_int_counter_vec_with_registry!(
737 "consensus_committed_subdags",
738 "Number of committed subdags, sliced by leader",
739 &["authority"],
740 registry,
741 ).unwrap(),
742 consensus_committed_messages: register_int_gauge_vec_with_registry!(
743 "consensus_committed_messages",
744 "Total number of committed consensus messages, sliced by author",
745 &["authority"],
746 registry,
747 ).unwrap(),
748 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
749 "consensus_committed_user_transactions",
750 "Number of committed user transactions, sliced by submitter",
751 &["authority"],
752 registry,
753 ).unwrap(),
754 consensus_handler_leader_round: register_int_gauge_with_registry!(
755 "consensus_handler_leader_round",
756 "The leader round of the current consensus output being processed in the consensus handler",
757 registry,
758 ).unwrap(),
759 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
760 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
761 multisig_sig_count: register_int_counter_with_registry!(
762 "multisig_sig_count",
763 "Count of multisig signatures",
764 registry,
765 )
766 .unwrap(),
767 consensus_calculated_throughput: register_int_gauge_with_registry!(
768 "consensus_calculated_throughput",
769 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
770 registry,
771 ).unwrap(),
772 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
773 "consensus_calculated_throughput_profile",
774 "The current active calculated throughput profile",
775 registry
776 ).unwrap(),
777 execution_queueing_latency: LatencyObserver::new(),
778 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
779 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
780 }
781 }
782
783 pub fn reset_on_reconfigure(&self) {
787 self.consensus_committed_messages.reset();
788 self.consensus_handler_scores.reset();
789 self.validator_scoreboard_scores.reset();
790 self.invalid_misbehavior_reports_by_authority.reset();
791 self.consensus_committed_user_transactions.reset();
792 }
793}
794
795pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
802
803pub struct AuthorityState {
804 pub name: AuthorityName,
807 pub secret: StableSyncAuthoritySigner,
809
810 input_loader: TransactionInputLoader,
812 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
813
814 epoch_store: ArcSwap<AuthorityPerEpochStore>,
815
816 execution_lock: RwLock<EpochId>,
822
823 pub indexes: Option<Arc<IndexStore>>,
824 pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
825
826 pub subscription_handler: Arc<SubscriptionHandler>,
827 pub checkpoint_store: Arc<CheckpointStore>,
828
829 committee_store: Arc<CommitteeStore>,
830
831 transaction_manager: Arc<TransactionManager>,
833
834 #[cfg_attr(not(test), expect(unused))]
836 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
837
838 pub metrics: Arc<AuthorityMetrics>,
839 _pruner: AuthorityStorePruner,
840 authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
841 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
842
843 db_checkpoint_config: DBCheckpointConfig,
845
846 pub config: NodeConfig,
847
848 pub overload_info: AuthorityOverloadInfo,
850
851 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
852
853 chain_identifier: ChainIdentifier,
856
857 pub(crate) congestion_tracker: Arc<CongestionTracker>,
858
859 pub traffic_controller: Option<Arc<TrafficController>>,
861}
862
863impl AuthorityState {
872 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
873 epoch_store.committee().authority_exists(&self.name)
874 }
875
876 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
877 epoch_store
878 .active_validators()
879 .iter()
880 .any(|a| AuthorityName::from(a) == self.name)
881 }
882
883 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
884 !self.is_committee_validator(epoch_store)
885 }
886
887 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
888 &self.committee_store
889 }
890
891 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
892 self.committee_store.clone()
893 }
894
895 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
896 &self.config.authority_overload_config
897 }
898
899 pub fn get_epoch_state_commitments(
900 &self,
901 epoch: EpochId,
902 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
903 self.checkpoint_store.get_epoch_state_commitments(epoch)
904 }
905
906 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
910 pub(crate) async fn handle_transaction_validation_checks(
911 &self,
912 transaction: &VerifiedTransaction,
913 epoch_store: &Arc<AuthorityPerEpochStore>,
914 ) -> IotaResult<Vec<ObjectRef>> {
915 let protocol_config = epoch_store.protocol_config();
916 let reference_gas_price = epoch_store.reference_gas_price();
917
918 let epoch = epoch_store.epoch();
919
920 let tx_data = transaction.data().transaction_data();
921
922 iota_transaction_checks::deny::check_transaction_for_validation(
926 tx_data,
927 transaction.tx_signatures(),
928 &transaction.input_objects()?,
929 &tx_data.receiving_objects(),
930 &self.config.transaction_deny_config,
931 self.get_backing_package_store().as_ref(),
932 )?;
933
934 let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
939 self.read_objects_for_validation(transaction, epoch)?;
940
941 let move_authenticators = transaction.move_authenticators();
942
943 let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
949 .check_transaction_inputs_for_validation(
950 protocol_config,
951 reference_gas_price,
952 tx_data,
953 tx_input_objects,
954 &tx_receiving_objects,
955 &move_authenticators,
956 per_authenticator_inputs,
957 )?;
958
959 let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
962 .iter()
963 .map(|i| &i.0)
964 .collect();
965
966 check_coin_deny_list_v1(
970 tx_data.sender(),
971 &tx_checked_input_objects,
972 &tx_receiving_objects,
973 &per_authenticator_checked_input_objects,
974 &self.get_object_store(),
975 )?;
976
977 let (kind, signer, gas_data) = tx_data.execution_parts();
978
979 let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
980 extract_auth_fun_refs(signer, gas_data.owner, |address| {
981 move_authenticators
982 .iter()
983 .zip(per_authenticator_checked_inputs.iter())
984 .find(|(move_authenticator, _)| {
985 move_authenticator.address().ok().as_ref() == Some(&address)
986 })
987 .map(|(_, (_, auth_fun_ref))| auth_fun_ref.clone())
988 });
989
990 let pre_consensus_move_authenticators =
995 pre_consensus_move_authenticators(transaction, protocol_config);
996 let (move_authenticators, per_authenticator_checked_inputs): (Vec<_>, Vec<_>) =
997 move_authenticators
998 .into_iter()
999 .zip(per_authenticator_checked_inputs)
1000 .filter(|(a, _)| pre_consensus_move_authenticators.contains(a))
1001 .unzip();
1002 let per_authenticator_checked_input_objects: Vec<_> = per_authenticator_checked_inputs
1003 .iter()
1004 .map(|i| &i.0)
1005 .collect();
1006
1007 if !move_authenticators.is_empty() {
1010 let aggregated_authenticator_input_objects =
1011 iota_transaction_checks::aggregate_authenticator_input_objects(
1012 &per_authenticator_checked_input_objects,
1013 )?;
1014
1015 debug_assert_eq!(
1016 move_authenticators.len(),
1017 per_authenticator_checked_inputs.len(),
1018 "Move authenticators amount must match the number of checked authenticator inputs"
1019 );
1020
1021 let move_authenticators = move_authenticators
1022 .into_iter()
1023 .zip(per_authenticator_checked_inputs)
1024 .map(
1025 |(
1026 move_authenticator,
1027 (authenticator_checked_input_objects, authenticator_function_ref),
1028 )| {
1029 (
1030 move_authenticator.to_owned(),
1031 authenticator_function_ref,
1032 authenticator_checked_input_objects,
1033 )
1034 },
1035 )
1036 .collect();
1037
1038 let tx_data_bytes =
1043 bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
1044
1045 let (sender_auth_digest, sponsor_auth_digest) =
1046 transaction.data().compute_auth_digests()?;
1047
1048 let auth_context_data = AuthContextData {
1049 transaction_data_bytes: tx_data_bytes,
1050 sender_auth_digest,
1051 sponsor_auth_digest,
1052 sender_authenticator_function_ref,
1053 sponsor_authenticator_function_ref,
1054 };
1055
1056 let validation_result = epoch_store.executor().authenticate_transaction(
1058 self.get_backing_store().as_ref(),
1059 protocol_config,
1060 self.metrics.limits_metrics.clone(),
1061 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1062 epoch_store
1063 .epoch_start_config()
1064 .epoch_data()
1065 .epoch_start_timestamp(),
1066 gas_data,
1067 gas_status,
1068 move_authenticators,
1069 aggregated_authenticator_input_objects,
1070 kind,
1071 signer,
1072 transaction.digest().to_owned(),
1073 auth_context_data,
1074 &mut None,
1075 );
1076
1077 if let Err(validation_error) = validation_result {
1078 return Err(IotaError::MoveAuthenticatorExecutionFailure {
1079 error: validation_error.to_string(),
1080 });
1081 }
1082 }
1083
1084 Ok(tx_checked_input_objects.inner().filter_owned_objects())
1085 }
1086
1087 async fn handle_transaction_impl(
1091 &self,
1092 transaction: VerifiedTransaction,
1093 epoch_store: &Arc<AuthorityPerEpochStore>,
1094 ) -> IotaResult<VerifiedSignedTransaction> {
1095 let _execution_lock = self.execution_lock_for_signing()?;
1097
1098 let owned_objects = self
1099 .handle_transaction_validation_checks(&transaction, epoch_store)
1100 .await?;
1101
1102 let epoch = epoch_store.epoch();
1103 let signed_transaction =
1104 VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1105
1106 self.get_cache_writer().try_acquire_transaction_locks(
1111 epoch_store,
1112 &owned_objects,
1113 signed_transaction.clone(),
1114 )?;
1115
1116 Ok(signed_transaction)
1117 }
1118
1119 #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()
1121 ))]
1122 pub async fn handle_transaction(
1123 &self,
1124 epoch_store: &Arc<AuthorityPerEpochStore>,
1125 transaction: VerifiedTransaction,
1126 ) -> IotaResult<HandleTransactionResponse> {
1127 let tx_digest = *transaction.digest();
1128 debug!("handle_transaction");
1129
1130 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1132 return Ok(HandleTransactionResponse { status });
1133 }
1134
1135 let _metrics_guard = self
1136 .metrics
1137 .authority_state_handle_transaction_latency
1138 .start_timer();
1139 self.metrics.tx_orders.inc();
1140
1141 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1142 match signed {
1143 Ok(s) => {
1144 if self.is_committee_validator(epoch_store) {
1145 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1146 let tx = s.clone();
1147 let validator_tx_finalizer = validator_tx_finalizer.clone();
1148 let cache_reader = self.get_transaction_cache_reader().clone();
1149 let epoch_store = epoch_store.clone();
1150 spawn_monitored_task!(epoch_store.within_alive_epoch(
1151 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1152 ));
1153 }
1154 }
1155 Ok(HandleTransactionResponse {
1156 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1157 })
1158 }
1159 Err(err) => Ok(HandleTransactionResponse {
1163 status: self
1164 .get_transaction_status(&tx_digest, epoch_store)?
1165 .ok_or(err)?
1166 .1,
1167 }),
1168 }
1169 }
1170
1171 pub fn check_system_overload_at_signing(&self) -> bool {
1172 self.config
1173 .authority_overload_config
1174 .check_system_overload_at_signing
1175 }
1176
1177 pub fn check_system_overload_at_execution(&self) -> bool {
1178 self.config
1179 .authority_overload_config
1180 .check_system_overload_at_execution
1181 }
1182
1183 pub(crate) fn check_system_overload(
1184 &self,
1185 consensus_adapter: &Arc<ConsensusAdapter>,
1186 tx_data: &SenderSignedData,
1187 do_authority_overload_check: bool,
1188 ) -> IotaResult {
1189 if do_authority_overload_check {
1190 self.check_authority_overload(tx_data).tap_err(|_| {
1191 self.update_overload_metrics("execution_queue");
1192 })?;
1193 }
1194 self.transaction_manager
1195 .check_execution_overload(self.overload_config(), tx_data)
1196 .tap_err(|_| {
1197 self.update_overload_metrics("execution_pending");
1198 })?;
1199 consensus_adapter.check_consensus_overload().tap_err(|_| {
1200 self.update_overload_metrics("consensus");
1201 })?;
1202
1203 let pending_tx_count = self
1204 .get_cache_commit()
1205 .approximate_pending_transaction_count();
1206 if pending_tx_count
1207 > self
1208 .config
1209 .execution_cache_config
1210 .writeback_cache
1211 .backpressure_threshold_for_rpc()
1212 {
1213 return Err(IotaError::ValidatorOverloadedRetryAfter {
1214 retry_after_secs: 10,
1215 });
1216 }
1217
1218 Ok(())
1219 }
1220
1221 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1222 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1223 return Ok(());
1224 }
1225
1226 let load_shedding_percentage = self
1227 .overload_info
1228 .load_shedding_percentage
1229 .load(Ordering::Relaxed);
1230 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1231 }
1232
1233 fn update_overload_metrics(&self, source: &str) {
1234 self.metrics
1235 .transaction_overload_sources
1236 .with_label_values(&[source])
1237 .inc();
1238 }
1239
1240 #[instrument(level = "trace", skip_all)]
1245 pub async fn wait_for_certificate_execution(
1246 &self,
1247 certificate: &VerifiedCertificate,
1248 epoch_store: &Arc<AuthorityPerEpochStore>,
1249 ) -> IotaResult<TransactionEffects> {
1250 let _metrics_guard = if certificate.contains_shared_object() {
1251 self.metrics
1252 .execute_certificate_latency_shared_object
1253 .start_timer()
1254 } else {
1255 self.metrics
1256 .execute_certificate_latency_single_writer
1257 .start_timer()
1258 };
1259 trace!("wait_for_certificate_execution");
1260
1261 self.metrics.total_cert_attempts.inc();
1262
1263 if !certificate.contains_shared_object() {
1264 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1269 }
1270
1271 epoch_store
1274 .within_alive_epoch(self.notify_read_effects(certificate))
1275 .await
1276 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1277 .and_then(|r| r)
1278 }
1279
1280 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
1295 pub fn try_execute_immediately(
1296 &self,
1297 transaction: &VerifiedExecutableTransaction,
1298 expected_effects_digest: Option<TransactionEffectsDigest>,
1299 epoch_store: &Arc<AuthorityPerEpochStore>,
1300 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1301 let _scope = monitored_scope("Execution::try_execute_immediately");
1302 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1303
1304 let tx_digest = transaction.digest();
1305
1306 let tx_guard = epoch_store.acquire_tx_guard(transaction)?;
1308
1309 if let Some(effects) = self
1312 .get_transaction_cache_reader()
1313 .try_get_executed_effects(tx_digest)?
1314 {
1315 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1316 assert_eq!(
1317 effects.digest(),
1318 expected_effects_digest_inner,
1319 "Unexpected effects digest for transaction {tx_digest}"
1320 );
1321 }
1322 tx_guard.release();
1323 return Ok((effects, None));
1324 }
1325
1326 let (tx_input_objects, per_authenticator_inputs) =
1327 self.read_objects_for_execution(tx_guard.as_lock_guard(), transaction, epoch_store)?;
1328
1329 let expected_effects_digest =
1335 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1336
1337 self.process_transaction(
1338 tx_guard,
1339 transaction,
1340 tx_input_objects,
1341 per_authenticator_inputs,
1342 expected_effects_digest,
1343 epoch_store,
1344 )
1345 .tap_err(|e| info!(?tx_digest, "process_transaction failed: {e}"))
1346 .tap_ok(
1347 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_transaction succeeded"),
1348 )
1349 }
1350
1351 pub fn read_objects_for_execution(
1352 &self,
1353 tx_lock: &TxLockGuard,
1354 transaction: &VerifiedExecutableTransaction,
1355 epoch_store: &Arc<AuthorityPerEpochStore>,
1356 ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1357 let _scope = monitored_scope("Execution::load_input_objects");
1358 let _metrics_guard = self
1359 .metrics
1360 .execution_load_input_objects_latency
1361 .start_timer();
1362
1363 let input_objects = transaction.collect_all_input_object_kind_for_reading()?;
1364
1365 let input_objects = self.input_loader.read_objects_for_execution(
1366 epoch_store,
1367 &transaction.key(),
1368 tx_lock,
1369 &input_objects,
1370 epoch_store.epoch(),
1371 )?;
1372
1373 transaction.split_input_objects_into_groups_for_reading(input_objects)
1374 }
1375
1376 pub fn try_execute_for_test(
1380 &self,
1381 certificate: &VerifiedCertificate,
1382 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1383 let epoch_store = self.epoch_store_for_testing();
1384 let (effects, execution_error_opt) = self.try_execute_immediately(
1385 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1386 None,
1387 &epoch_store,
1388 )?;
1389 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1390 Ok((signed_effects, execution_error_opt))
1391 }
1392
1393 pub fn execute_for_test(
1395 &self,
1396 certificate: &VerifiedCertificate,
1397 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1398 self.try_execute_for_test(certificate)
1399 .expect("try_execute_for_test should not fail")
1400 }
1401
1402 pub async fn notify_read_effects(
1403 &self,
1404 certificate: &VerifiedCertificate,
1405 ) -> IotaResult<TransactionEffects> {
1406 self.get_transaction_cache_reader()
1407 .try_notify_read_executed_effects(&[*certificate.digest()])
1408 .await
1409 .map(|mut r| r.pop().expect("must return correct number of effects"))
1410 }
1411
1412 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1413 self.get_object_cache_reader()
1414 .try_check_owned_objects_are_live(owned_object_refs)
1415 }
1416
1417 pub(crate) fn debug_dump_transaction_state(
1422 &self,
1423 tx_digest: &TransactionDigest,
1424 effects: &TransactionEffects,
1425 expected_effects_digest: TransactionEffectsDigest,
1426 inner_temporary_store: &InnerTemporaryStore,
1427 transaction: &VerifiedExecutableTransaction,
1428 debug_dump_config: &StateDebugDumpConfig,
1429 ) -> IotaResult<PathBuf> {
1430 let dump_dir = debug_dump_config
1433 .dump_file_directory
1434 .as_ref()
1435 .cloned()
1436 .unwrap_or(std::env::temp_dir());
1437 let epoch_store = self.load_epoch_store_one_call_per_task();
1438
1439 NodeStateDump::new(
1440 tx_digest,
1441 effects,
1442 expected_effects_digest,
1443 self.get_object_store().as_ref(),
1444 &epoch_store,
1445 inner_temporary_store,
1446 transaction,
1447 )?
1448 .write_to_file(&dump_dir)
1449 .map_err(|e| IotaError::FileIO(e.to_string()))
1450 }
1451
1452 #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = ?transaction.data().transaction_data().gas_owner().to_string()))]
1453 pub(crate) fn process_transaction(
1454 &self,
1455 tx_guard: TxGuard,
1456 transaction: &VerifiedExecutableTransaction,
1457 tx_input_objects: InputObjects,
1458 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1459 expected_effects_digest: Option<TransactionEffectsDigest>,
1460 epoch_store: &Arc<AuthorityPerEpochStore>,
1461 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1462 let process_transaction_start_time = tokio::time::Instant::now();
1463 let digest = *transaction.digest();
1464
1465 let _scope = monitored_scope("Execution::process_certificate");
1466
1467 fail_point_if!("correlated-crash-process-transaction", || {
1468 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1469 iota_simulator::task::kill_current_node(None);
1470 }
1471 });
1472
1473 let execution_guard = self.execution_lock_for_executable_transaction(transaction);
1474 let execution_guard = match execution_guard {
1480 Ok(execution_guard) => execution_guard,
1481 Err(err) => {
1482 tx_guard.release();
1483 return Err(err);
1484 }
1485 };
1486 if *execution_guard != epoch_store.epoch() {
1490 tx_guard.release();
1491 info!("The epoch of the execution_guard doesn't match the epoch store");
1492 return Err(IotaError::WrongEpoch {
1493 expected_epoch: epoch_store.epoch(),
1494 actual_epoch: *execution_guard,
1495 });
1496 }
1497
1498 let (inner_temporary_store, effects, execution_error_opt) = match self.execute_transaction(
1504 &execution_guard,
1505 transaction,
1506 tx_input_objects,
1507 per_authenticator_inputs,
1508 epoch_store,
1509 ) {
1510 Err(e) => {
1511 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1512 tx_guard.release();
1513 return Err(e);
1514 }
1515 Ok(res) => res,
1516 };
1517
1518 if let Some(expected_effects_digest) = expected_effects_digest {
1519 if effects.digest() != expected_effects_digest {
1520 match self.debug_dump_transaction_state(
1522 &digest,
1523 &effects,
1524 expected_effects_digest,
1525 &inner_temporary_store,
1526 transaction,
1527 &self.config.state_debug_dump_config,
1528 ) {
1529 Ok(out_path) => {
1530 info!(
1531 "Dumped node state for transaction {} to {}",
1532 digest,
1533 out_path.as_path().display().to_string()
1534 );
1535 }
1536 Err(e) => {
1537 error!("Error dumping state for transaction {}: {e}", digest);
1538 }
1539 }
1540 error!(
1541 tx_digest = ?digest,
1542 ?expected_effects_digest,
1543 actual_effects = ?effects,
1544 "fork detected!"
1545 );
1546 panic!(
1547 "Transaction {} is expected to have effects digest {}, but got {}!",
1548 digest,
1549 expected_effects_digest,
1550 effects.digest(),
1551 );
1552 }
1553 }
1554
1555 fail_point!("crash");
1556
1557 self.commit_transaction(
1558 transaction,
1559 inner_temporary_store,
1560 &effects,
1561 tx_guard,
1562 execution_guard,
1563 epoch_store,
1564 )?;
1565
1566 let elapsed = process_transaction_start_time.elapsed().as_micros() as f64;
1567 if elapsed > 0.0 {
1568 self.metrics
1569 .execution_gas_latency_ratio
1570 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1571 };
1572 Ok((effects, execution_error_opt))
1573 }
1574
1575 pub async fn reconfigure_traffic_control(
1576 &self,
1577 params: TrafficControlReconfigParams,
1578 ) -> Result<TrafficControlReconfigParams, IotaError> {
1579 if let Some(traffic_controller) = self.traffic_controller.as_ref() {
1580 traffic_controller.admin_reconfigure(params).await
1581 } else {
1582 Err(IotaError::InvalidAdminRequest(
1583 "Traffic controller is not configured on this node".to_string(),
1584 ))
1585 }
1586 }
1587
1588 #[instrument(level = "trace", skip_all)]
1589 fn commit_transaction(
1590 &self,
1591 transaction: &VerifiedExecutableTransaction,
1592 inner_temporary_store: InnerTemporaryStore,
1593 effects: &TransactionEffects,
1594 tx_guard: TxGuard,
1595 _execution_guard: ExecutionLockReadGuard<'_>,
1596 epoch_store: &Arc<AuthorityPerEpochStore>,
1597 ) -> IotaResult {
1598 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1599 monitored_scope("Execution::commit_certificate");
1600 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1601
1602 let tx_key = transaction.key();
1603 let tx_digest = transaction.digest();
1604 let input_object_count = inner_temporary_store.input_objects.len();
1605 let shared_object_count = effects.input_shared_objects().len();
1606
1607 let output_keys = inner_temporary_store.get_output_keys(effects);
1608
1609 let _ = self
1611 .post_process_one_tx(transaction, effects, &inner_temporary_store, epoch_store)
1612 .tap_err(|e| {
1613 self.metrics.post_processing_total_failures.inc();
1614 error!(?tx_digest, "tx post processing failed: {e}");
1615 });
1616
1617 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1621
1622 fail_point!("crash");
1624
1625 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1626 transaction.clone().into_unsigned(),
1627 effects.clone(),
1628 inner_temporary_store,
1629 );
1630 self.get_cache_writer()
1631 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1632
1633 if transaction.transaction_data().is_end_of_epoch_tx() {
1634 self.get_object_cache_reader()
1637 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1638 }
1639
1640 tx_guard.commit_tx();
1642
1643 self.transaction_manager
1647 .notify_commit(tx_digest, output_keys, epoch_store);
1648
1649 self.update_metrics(transaction, input_object_count, shared_object_count);
1650
1651 Ok(())
1652 }
1653
1654 fn update_metrics(
1655 &self,
1656 transaction: &VerifiedExecutableTransaction,
1657 input_object_count: usize,
1658 shared_object_count: usize,
1659 ) {
1660 if transaction.has_upgraded_multisig() {
1662 self.metrics.multisig_sig_count.inc();
1663 }
1664
1665 self.metrics.total_effects.inc();
1666 self.metrics.total_certs.inc();
1667
1668 if shared_object_count > 0 {
1669 self.metrics.shared_obj_tx.inc();
1670 }
1671
1672 if transaction.is_sponsored_tx() {
1673 self.metrics.sponsored_tx.inc();
1674 }
1675
1676 self.metrics
1677 .num_input_objs
1678 .observe(input_object_count as f64);
1679 self.metrics
1680 .num_shared_objects
1681 .observe(shared_object_count as f64);
1682 self.metrics.batch_size.observe(
1683 transaction
1684 .data()
1685 .intent_message()
1686 .value
1687 .kind()
1688 .num_commands() as f64,
1689 );
1690 }
1691
1692 #[instrument(level = "trace", skip_all)]
1704 fn execute_transaction(
1705 &self,
1706 _execution_guard: &ExecutionLockReadGuard<'_>,
1707 transaction: &VerifiedExecutableTransaction,
1708 tx_input_objects: InputObjects,
1709 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1710 epoch_store: &Arc<AuthorityPerEpochStore>,
1711 ) -> IotaResult<(
1712 InnerTemporaryStore,
1713 TransactionEffects,
1714 Option<ExecutionError>,
1715 )> {
1716 let _scope = monitored_scope("Execution::execute_certificate");
1717 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1718 let prepare_transaction_start_time = tokio::time::Instant::now();
1719
1720 let protocol_config = epoch_store.protocol_config();
1721
1722 let reference_gas_price = epoch_store.reference_gas_price();
1723
1724 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1725 let epoch_start_timestamp = epoch_store
1726 .epoch_start_config()
1727 .epoch_data()
1728 .epoch_start_timestamp();
1729
1730 let backing_store = self.get_backing_store().as_ref();
1731
1732 let tx_digest = *transaction.digest();
1733
1734 let tx_data = transaction.data().transaction_data();
1737 tx_data.validity_check(protocol_config)?;
1738
1739 let (kind, signer, gas_data) = tx_data.execution_parts();
1740
1741 let move_authenticators = transaction.move_authenticators();
1742
1743 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1744 let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1745 .is_empty()
1746 {
1747 let (tx_gas_status, tx_checked_input_objects) =
1752 iota_transaction_checks::check_certificate_input(
1753 transaction,
1754 tx_input_objects,
1755 protocol_config,
1756 reference_gas_price,
1757 )?;
1758
1759 let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1760 self.check_owned_locks(&owned_object_refs)?;
1761 epoch_store.executor().execute_transaction_to_effects(
1762 backing_store,
1763 protocol_config,
1764 self.metrics.limits_metrics.clone(),
1765 self.config
1768 .expensive_safety_check_config
1769 .enable_deep_per_tx_iota_conservation_check(),
1770 self.config.certificate_deny_config.certificate_deny_set(),
1771 &epoch_id,
1772 epoch_start_timestamp,
1773 tx_checked_input_objects,
1774 gas_data,
1775 tx_gas_status,
1776 kind,
1777 signer,
1778 tx_digest,
1779 &mut None,
1780 )
1781 } else {
1782 debug_assert_eq!(
1788 move_authenticators.len(),
1789 per_authenticator_inputs.len(),
1790 "Move authenticators amount must match the number of authenticator inputs"
1791 );
1792
1793 let per_authenticator_inputs = move_authenticators
1794 .iter()
1795 .zip(per_authenticator_inputs)
1796 .map(
1797 |(move_authenticator, (authenticator_input_objects, account_object))| {
1798 let (
1801 auth_account_object_id,
1802 auth_account_object_seq_number,
1803 auth_account_object_digest,
1804 ) = move_authenticator.object_to_authenticate_components()?;
1805
1806 let signer = move_authenticator.address()?;
1807
1808 let authenticator_function_ref_for_execution = self.check_move_account(
1809 auth_account_object_id,
1810 auth_account_object_seq_number,
1811 auth_account_object_digest,
1812 account_object,
1813 &signer,
1814 )?;
1815
1816 Ok((
1817 authenticator_input_objects,
1818 authenticator_function_ref_for_execution,
1819 ))
1820 },
1821 )
1822 .collect::<IotaResult<Vec<_>>>()?;
1823
1824 let per_authenticator_input_objects = per_authenticator_inputs
1825 .iter()
1826 .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1827 .collect::<Vec<_>>();
1828
1829 let tx_data_bytes =
1831 bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1832
1833 let (sender_auth_digest, sponsor_auth_digest) =
1834 transaction.data().compute_auth_digests()?;
1835
1836 let authenticator_gas_budget = protocol_config.max_auth_gas();
1841 let (
1842 gas_status,
1843 per_authenticator_checked_input_objects,
1844 authenticator_and_tx_checked_input_objects,
1845 ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1846 transaction,
1847 tx_input_objects,
1848 per_authenticator_input_objects,
1849 authenticator_gas_budget,
1850 protocol_config,
1851 reference_gas_price,
1852 )?;
1853
1854 debug_assert_eq!(
1855 move_authenticators.len(),
1856 per_authenticator_checked_input_objects.len(),
1857 "Move authenticators amount must match the number of checked authenticator inputs"
1858 );
1859
1860 let move_authenticators = move_authenticators
1861 .into_iter()
1862 .zip(per_authenticator_inputs)
1863 .zip(per_authenticator_checked_input_objects)
1864 .map(
1865 |(
1866 (move_authenticator, (_, authenticator_function_ref_for_execution)),
1867 authenticator_checked_input_objects,
1868 )| {
1869 (
1870 move_authenticator.to_owned(),
1871 authenticator_function_ref_for_execution,
1872 authenticator_checked_input_objects,
1873 )
1874 },
1875 )
1876 .collect::<Vec<_>>();
1877
1878 let owned_object_refs = authenticator_and_tx_checked_input_objects
1879 .inner()
1880 .filter_owned_objects();
1881 self.check_owned_locks(&owned_object_refs)?;
1882
1883 let (sender_authenticator_function_ref, sponsor_authenticator_function_ref) =
1884 extract_auth_fun_refs(signer, gas_data.owner, |address| {
1885 move_authenticators
1886 .iter()
1887 .find(|t| t.0.address().ok().as_ref() == Some(&address))
1888 .map(|t| t.1.authenticator_function_ref.clone())
1889 });
1890
1891 let auth_context_data = AuthContextData {
1892 transaction_data_bytes: tx_data_bytes,
1893 sender_auth_digest,
1894 sponsor_auth_digest,
1895 sender_authenticator_function_ref,
1896 sponsor_authenticator_function_ref,
1897 };
1898
1899 epoch_store
1900 .executor()
1901 .authenticate_then_execute_transaction_to_effects(
1902 backing_store,
1903 protocol_config,
1904 self.metrics.limits_metrics.clone(),
1905 self.config
1906 .expensive_safety_check_config
1907 .enable_deep_per_tx_iota_conservation_check(),
1908 self.config.certificate_deny_config.certificate_deny_set(),
1909 &epoch_id,
1910 epoch_start_timestamp,
1911 gas_data,
1912 gas_status,
1913 move_authenticators,
1914 authenticator_and_tx_checked_input_objects,
1915 kind,
1916 signer,
1917 tx_digest,
1918 auth_context_data,
1919 &mut None,
1920 )
1921 };
1922
1923 fail_point_if!("cp_execution_nondeterminism", || {
1924 #[cfg(msim)]
1925 self.create_fail_state(transaction, epoch_store, &mut effects);
1926 });
1927
1928 let elapsed = prepare_transaction_start_time.elapsed().as_micros() as f64;
1929 if elapsed > 0.0 {
1930 self.metrics
1931 .prepare_cert_gas_latency_ratio
1932 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1933 }
1934
1935 Ok((inner_temp_store, effects, execution_error_opt.err()))
1936 }
1937
1938 pub fn prepare_transaction_for_benchmark(
1939 &self,
1940 transaction: &VerifiedExecutableTransaction,
1941 input_objects: InputObjects,
1942 epoch_store: &Arc<AuthorityPerEpochStore>,
1943 ) -> IotaResult<(
1944 InnerTemporaryStore,
1945 TransactionEffects,
1946 Option<ExecutionError>,
1947 )> {
1948 let lock = RwLock::new(epoch_store.epoch());
1949 let execution_guard = lock.try_read().unwrap();
1950
1951 self.execute_transaction(
1952 &execution_guard,
1953 transaction,
1954 input_objects,
1955 vec![],
1956 epoch_store,
1957 )
1958 }
1959
1960 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1963 #[allow(clippy::type_complexity)]
1964 pub fn dry_exec_transaction(
1965 &self,
1966 transaction: TransactionData,
1967 transaction_digest: TransactionDigest,
1968 ) -> IotaResult<(
1969 DryRunTransactionBlockResponse,
1970 BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1971 TransactionEffects,
1972 Option<ObjectId>,
1973 )> {
1974 let epoch_store = self.load_epoch_store_one_call_per_task();
1975 if !self.is_fullnode(&epoch_store) {
1976 return Err(IotaError::UnsupportedFeature {
1977 error: "dry-exec is only supported on fullnodes".to_string(),
1978 });
1979 }
1980
1981 if transaction.kind().is_system() {
1982 return Err(IotaError::UnsupportedFeature {
1983 error: "dry-exec does not support system transactions".to_string(),
1984 });
1985 }
1986
1987 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1988 }
1989
1990 #[allow(clippy::type_complexity)]
1991 pub fn dry_exec_transaction_for_benchmark(
1992 &self,
1993 transaction: TransactionData,
1994 transaction_digest: TransactionDigest,
1995 ) -> IotaResult<(
1996 DryRunTransactionBlockResponse,
1997 BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
1998 TransactionEffects,
1999 Option<ObjectId>,
2000 )> {
2001 let epoch_store = self.load_epoch_store_one_call_per_task();
2002 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
2003 }
2004
2005 #[instrument(level = "trace", skip_all)]
2006 #[allow(clippy::type_complexity)]
2007 fn dry_exec_transaction_impl(
2008 &self,
2009 epoch_store: &AuthorityPerEpochStore,
2010 transaction: TransactionData,
2011 transaction_digest: TransactionDigest,
2012 ) -> IotaResult<(
2013 DryRunTransactionBlockResponse,
2014 BTreeMap<ObjectId, (ObjectRef, Object, WriteKind)>,
2015 TransactionEffects,
2016 Option<ObjectId>,
2017 )> {
2018 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2020
2021 let input_object_kinds = transaction.input_objects()?;
2022 let receiving_object_refs = transaction.receiving_objects();
2023
2024 iota_transaction_checks::deny::check_transaction_for_validation(
2025 &transaction,
2026 &[],
2027 &input_object_kinds,
2028 &receiving_object_refs,
2029 &self.config.transaction_deny_config,
2030 self.get_backing_package_store().as_ref(),
2031 )?;
2032
2033 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2034 None,
2036 &input_object_kinds,
2037 &receiving_object_refs,
2038 epoch_store.epoch(),
2039 )?;
2040
2041 let mut transaction = transaction;
2043 let reference_gas_price = epoch_store.reference_gas_price();
2044 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
2045 let sender = transaction.gas_owner();
2046 let gas_object_id = ObjectId::random();
2047 let gas_object = Object::new_move(
2048 MoveObject::new_gas_coin(
2049 OBJECT_START_VERSION,
2050 gas_object_id,
2051 SIMULATION_GAS_COIN_VALUE,
2052 ),
2053 Owner::Address(sender),
2054 TransactionDigest::GENESIS_MARKER,
2055 );
2056 let gas_object_ref = gas_object.object_ref();
2057 transaction.gas_data_mut().objects = vec![gas_object_ref];
2059 (
2060 iota_transaction_checks::check_transaction_input_with_given_gas(
2061 epoch_store.protocol_config(),
2062 reference_gas_price,
2063 &transaction,
2064 input_objects,
2065 receiving_objects,
2066 gas_object,
2067 &self.metrics.bytecode_verifier_metrics,
2068 &self.config.verifier_signing_config,
2069 )?,
2070 Some(gas_object_id),
2071 )
2072 } else {
2073 let authenticator_gas_budget = 0;
2076
2077 (
2078 iota_transaction_checks::check_transaction_input(
2079 epoch_store.protocol_config(),
2080 reference_gas_price,
2081 &transaction,
2082 input_objects,
2083 &receiving_objects,
2084 &self.metrics.bytecode_verifier_metrics,
2085 &self.config.verifier_signing_config,
2086 authenticator_gas_budget,
2087 )?,
2088 None,
2089 )
2090 };
2091
2092 let protocol_config = epoch_store.protocol_config();
2093 let (kind, signer, gas_data) = transaction.execution_parts();
2094
2095 let silent = true;
2096 let executor = iota_execution::executor(protocol_config, silent, None)
2097 .expect("Creating an executor should not fail here");
2098
2099 let expensive_checks = false;
2100 let (inner_temp_store, _, effects, execution_error) = executor
2101 .execute_transaction_to_effects(
2102 self.get_backing_store().as_ref(),
2103 protocol_config,
2104 self.metrics.limits_metrics.clone(),
2105 expensive_checks,
2106 self.config.certificate_deny_config.certificate_deny_set(),
2107 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2108 epoch_store
2109 .epoch_start_config()
2110 .epoch_data()
2111 .epoch_start_timestamp(),
2112 checked_input_objects,
2113 gas_data,
2114 gas_status,
2115 kind,
2116 signer,
2117 transaction_digest,
2118 &mut None,
2119 );
2120 let tx_digest = *effects.transaction_digest();
2121
2122 let module_cache =
2123 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2124
2125 let mut layout_resolver =
2126 epoch_store
2127 .executor()
2128 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2129 &inner_temp_store,
2130 self.get_backing_package_store(),
2131 )));
2132 let object_changes = Vec::new();
2134
2135 let balance_changes = Vec::new();
2137
2138 let written_with_kind = effects
2139 .created()
2140 .into_iter()
2141 .map(|(oref, _)| (oref, WriteKind::Create))
2142 .chain(
2143 effects
2144 .unwrapped()
2145 .into_iter()
2146 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2147 )
2148 .chain(
2149 effects
2150 .mutated()
2151 .into_iter()
2152 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2153 )
2154 .map(|(oref, kind)| {
2155 let obj = inner_temp_store.written.get(&oref.object_id).unwrap();
2156 (oref.object_id, (oref, obj.clone(), kind))
2158 })
2159 .collect();
2160
2161 let execution_error_source = execution_error
2162 .as_ref()
2163 .err()
2164 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2165
2166 Ok((
2167 DryRunTransactionBlockResponse {
2168 suggested_gas_price: self
2170 .congestion_tracker
2171 .get_prediction_suggested_gas_price(&transaction),
2172 input: IotaTransactionBlockData::try_from_with_module_cache(
2173 transaction,
2174 &module_cache,
2175 tx_digest,
2176 )
2177 .map_err(|e| IotaError::TransactionSerialization {
2178 error: format!(
2179 "Failed to convert transaction to IotaTransactionBlockData: {e}",
2180 ),
2181 })?, effects: effects.clone().try_into()?,
2183 events: IotaTransactionBlockEvents::try_from(
2184 inner_temp_store.events.clone(),
2185 tx_digest,
2186 None,
2187 layout_resolver.as_mut(),
2188 )?,
2189 object_changes,
2190 balance_changes,
2191 execution_error_source,
2192 },
2193 written_with_kind,
2194 effects,
2195 mock_gas,
2196 ))
2197 }
2198
2199 pub fn simulate_transaction(
2200 &self,
2201 mut transaction: TransactionData,
2202 checks: VmChecks,
2203 ) -> IotaResult<SimulateTransactionResult> {
2204 if transaction.kind().is_system() {
2205 return Err(IotaError::UnsupportedFeature {
2206 error: "simulate does not support system transactions".to_string(),
2207 });
2208 }
2209
2210 let epoch_store = self.load_epoch_store_one_call_per_task();
2211 if !self.is_fullnode(&epoch_store) {
2212 return Err(IotaError::UnsupportedFeature {
2213 error: "simulate is only supported on fullnodes".to_string(),
2214 });
2215 }
2216
2217 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2221
2222 let input_object_kinds = transaction.input_objects()?;
2223 let receiving_object_refs = transaction.receiving_objects();
2224
2225 iota_transaction_checks::deny::check_transaction_for_validation(
2228 &transaction,
2229 &[],
2230 &input_object_kinds,
2231 &receiving_object_refs,
2232 &self.config.transaction_deny_config,
2233 self.get_backing_package_store().as_ref(),
2234 )?;
2235
2236 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2238 None,
2240 &input_object_kinds,
2241 &receiving_object_refs,
2242 epoch_store.epoch(),
2243 )?;
2244
2245 let mock_gas_id = if transaction.gas().is_empty() {
2247 let mock_gas_object = Object::new_move(
2248 MoveObject::new_gas_coin(
2249 OBJECT_START_VERSION,
2250 ObjectId::MAX,
2251 SIMULATION_GAS_COIN_VALUE,
2252 ),
2253 Owner::Address(transaction.gas_data().owner),
2254 TransactionDigest::GENESIS_MARKER,
2255 );
2256 let mock_gas_object_ref = mock_gas_object.object_ref();
2257 transaction.gas_data_mut().objects = vec![mock_gas_object_ref];
2258 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2259 Some(mock_gas_object.id())
2260 } else {
2261 None
2262 };
2263
2264 let protocol_config = epoch_store.protocol_config();
2265
2266 let authenticator_gas_budget = 0;
2269
2270 let (gas_status, checked_input_objects) = if checks.enabled() {
2273 iota_transaction_checks::check_transaction_input(
2274 protocol_config,
2275 epoch_store.reference_gas_price(),
2276 &transaction,
2277 input_objects,
2278 &receiving_objects,
2279 &self.metrics.bytecode_verifier_metrics,
2280 &self.config.verifier_signing_config,
2281 authenticator_gas_budget,
2282 )?
2283 } else {
2284 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2285 protocol_config,
2286 transaction.kind(),
2287 input_objects,
2288 receiving_objects,
2289 )?;
2290 let gas_status = IotaGasStatus::new(
2291 transaction.gas_budget(),
2292 transaction.gas_price(),
2293 epoch_store.reference_gas_price(),
2294 protocol_config,
2295 )?;
2296
2297 (gas_status, checked_input_objects)
2298 };
2299
2300 let executor = iota_execution::executor(
2302 protocol_config,
2303 true, None,
2305 )
2306 .expect("Creating an executor should not fail here");
2307
2308 let (kind, signer, gas_data) = transaction.execution_parts();
2310 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2311 self.get_backing_store().as_ref(),
2312 protocol_config,
2313 self.metrics.limits_metrics.clone(),
2314 false, self.config.certificate_deny_config.certificate_deny_set(),
2316 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2317 epoch_store
2318 .epoch_start_config()
2319 .epoch_data()
2320 .epoch_start_timestamp(),
2321 checked_input_objects,
2322 gas_data,
2323 gas_status,
2324 kind,
2325 signer,
2326 transaction.digest(),
2327 checks.disabled(),
2328 );
2329
2330 Ok(SimulateTransactionResult {
2333 input_objects: inner_temp_store.input_objects,
2334 output_objects: inner_temp_store.written,
2335 events: effects.events_digest().map(|_| inner_temp_store.events),
2336 effects,
2337 execution_result,
2338 suggested_gas_price: self
2339 .congestion_tracker
2340 .get_prediction_suggested_gas_price(&transaction),
2341 mock_gas_id,
2342 })
2343 }
2344
2345 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2350 pub async fn dev_inspect_transaction_block(
2351 &self,
2352 sender: Address,
2353 transaction_kind: TransactionKind,
2354 gas_price: Option<u64>,
2355 gas_budget: Option<u64>,
2356 gas_sponsor: Option<Address>,
2357 gas_objects: Option<Vec<ObjectRef>>,
2358 show_raw_txn_data_and_effects: Option<bool>,
2359 skip_checks: Option<bool>,
2360 ) -> IotaResult<DevInspectResults> {
2361 let epoch_store = self.load_epoch_store_one_call_per_task();
2362
2363 if !self.is_fullnode(&epoch_store) {
2364 return Err(IotaError::UnsupportedFeature {
2365 error: "dev-inspect is only supported on fullnodes".to_string(),
2366 });
2367 }
2368
2369 if transaction_kind.is_system() {
2370 return Err(IotaError::UnsupportedFeature {
2371 error: "system transactions are not supported".to_string(),
2372 });
2373 }
2374
2375 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2376 let skip_checks = skip_checks.unwrap_or(true);
2377 let reference_gas_price = epoch_store.reference_gas_price();
2378 let protocol_config = epoch_store.protocol_config();
2379 let max_tx_gas = protocol_config.max_tx_gas();
2380
2381 let price = gas_price.unwrap_or(reference_gas_price);
2382 let budget = gas_budget.unwrap_or(max_tx_gas);
2383 let owner = gas_sponsor.unwrap_or(sender);
2384 let payment = gas_objects.unwrap_or_default();
2387 let mut transaction = TransactionData::V1(TransactionDataV1 {
2388 kind: transaction_kind.clone(),
2389 sender,
2390 gas_payment: GasData {
2391 objects: payment,
2392 owner,
2393 price,
2394 budget,
2395 },
2396 expiration: TransactionExpiration::None,
2397 });
2398
2399 let raw_txn_data = if show_raw_txn_data_and_effects {
2400 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2401 error: "Failed to serialize transaction during dev inspect".to_string(),
2402 })?
2403 } else {
2404 vec![]
2405 };
2406
2407 transaction.validity_check_no_gas_check(protocol_config)?;
2408
2409 let input_object_kinds = transaction.input_objects()?;
2410 let receiving_object_refs = transaction.receiving_objects();
2411
2412 iota_transaction_checks::deny::check_transaction_for_validation(
2413 &transaction,
2414 &[],
2415 &input_object_kinds,
2416 &receiving_object_refs,
2417 &self.config.transaction_deny_config,
2418 self.get_backing_package_store().as_ref(),
2419 )?;
2420
2421 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2422 None,
2424 &input_object_kinds,
2425 &receiving_object_refs,
2426 epoch_store.epoch(),
2427 )?;
2428
2429 let (gas_status, checked_input_objects) = if skip_checks {
2430 if transaction.gas().is_empty() {
2435 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2437 SIMULATION_GAS_COIN_VALUE,
2438 transaction.gas_owner(),
2439 );
2440 let gas_object_ref = dummy_gas_object.object_ref();
2441 transaction.gas_data_mut().objects = vec![gas_object_ref];
2442 input_objects.push(ObjectReadResult::new(
2443 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2444 dummy_gas_object.into(),
2445 ));
2446 }
2447 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2448 protocol_config,
2449 &transaction_kind,
2450 input_objects,
2451 receiving_objects,
2452 )?;
2453 let gas_status = IotaGasStatus::new(
2454 max_tx_gas,
2455 transaction.gas_price(),
2456 reference_gas_price,
2457 protocol_config,
2458 )?;
2459
2460 (gas_status, checked_input_objects)
2461 } else {
2462 if transaction.gas().is_empty() {
2466 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2468 SIMULATION_GAS_COIN_VALUE,
2469 transaction.gas_owner(),
2470 );
2471 let gas_object_ref = dummy_gas_object.object_ref();
2472 transaction.gas_data_mut().objects = vec![gas_object_ref];
2473 iota_transaction_checks::check_transaction_input_with_given_gas(
2474 epoch_store.protocol_config(),
2475 reference_gas_price,
2476 &transaction,
2477 input_objects,
2478 receiving_objects,
2479 dummy_gas_object,
2480 &self.metrics.bytecode_verifier_metrics,
2481 &self.config.verifier_signing_config,
2482 )?
2483 } else {
2484 let authenticator_gas_budget = 0;
2487
2488 iota_transaction_checks::check_transaction_input(
2489 epoch_store.protocol_config(),
2490 reference_gas_price,
2491 &transaction,
2492 input_objects,
2493 &receiving_objects,
2494 &self.metrics.bytecode_verifier_metrics,
2495 &self.config.verifier_signing_config,
2496 authenticator_gas_budget,
2497 )?
2498 }
2499 };
2500
2501 let executor = iota_execution::executor(protocol_config, true, None)
2502 .expect("Creating an executor should not fail here");
2503 let gas_data = transaction.gas_data().clone();
2504 let intent_msg = IntentMessage::new(
2505 Intent {
2506 version: IntentVersion::V0,
2507 scope: IntentScope::TransactionData,
2508 app_id: IntentAppId::Iota,
2509 },
2510 transaction,
2511 );
2512 let transaction_digest = TransactionDigest::new(intent_msg.value.digest().into_inner());
2513 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2514 self.get_backing_store().as_ref(),
2515 protocol_config,
2516 self.metrics.limits_metrics.clone(),
2517 false,
2519 self.config.certificate_deny_config.certificate_deny_set(),
2520 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2521 epoch_store
2522 .epoch_start_config()
2523 .epoch_data()
2524 .epoch_start_timestamp(),
2525 checked_input_objects,
2526 gas_data,
2527 gas_status,
2528 transaction_kind,
2529 sender,
2530 transaction_digest,
2531 skip_checks,
2532 );
2533
2534 let raw_effects = if show_raw_txn_data_and_effects {
2535 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2536 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2537 })?
2538 } else {
2539 vec![]
2540 };
2541
2542 let mut layout_resolver =
2543 epoch_store
2544 .executor()
2545 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2546 &inner_temp_store,
2547 self.get_backing_package_store(),
2548 )));
2549
2550 DevInspectResults::new(
2551 effects,
2552 inner_temp_store.events.clone(),
2553 execution_result,
2554 raw_txn_data,
2555 raw_effects,
2556 layout_resolver.as_mut(),
2557 )
2558 }
2559
2560 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2562 let epoch_store = self.epoch_store_for_testing();
2563 Ok(epoch_store.reference_gas_price())
2564 }
2565
2566 #[instrument(level = "trace", skip_all)]
2567 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2568 self.get_transaction_cache_reader()
2569 .try_is_tx_already_executed(digest)
2570 }
2571
2572 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2574 self.try_is_tx_already_executed(digest)
2575 .expect("storage access failed")
2576 }
2577
2578 #[instrument(level = "debug", skip_all, err)]
2580 fn index_tx(
2581 &self,
2582 indexes: &IndexStore,
2583 digest: &TransactionDigest,
2584 transaction: &VerifiedExecutableTransaction,
2586 effects: &TransactionEffects,
2587 events: &TransactionEvents,
2588 timestamp_ms: u64,
2589 tx_coins: Option<TxCoins>,
2590 written: &WrittenObjects,
2591 inner_temporary_store: &InnerTemporaryStore,
2592 epoch_store: &Arc<AuthorityPerEpochStore>,
2593 ) -> IotaResult<u64> {
2594 let changes = self
2595 .process_object_index(effects, written, inner_temporary_store, epoch_store)
2596 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2597
2598 indexes.index_tx(
2599 transaction.data().intent_message().value.sender(),
2600 transaction
2601 .data()
2602 .intent_message()
2603 .value
2604 .input_objects()?
2605 .iter()
2606 .map(|o| o.object_id()),
2607 effects
2608 .all_changed_objects()
2609 .into_iter()
2610 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2611 transaction
2612 .data()
2613 .intent_message()
2614 .value
2615 .move_calls()
2616 .into_iter()
2617 .map(|(package, module, function)| {
2618 (*package, module.to_owned(), function.to_owned())
2619 }),
2620 events,
2621 changes,
2622 digest,
2623 timestamp_ms,
2624 tx_coins,
2625 )
2626 }
2627
2628 #[cfg(msim)]
2629 fn create_fail_state(
2630 &self,
2631 transaction: &VerifiedExecutableTransaction,
2632 epoch_store: &Arc<AuthorityPerEpochStore>,
2633 effects: &mut TransactionEffects,
2634 ) {
2635 use std::cell::RefCell;
2636
2637 use iota_types::effects::TransactionEffectsAPIForTesting;
2638 thread_local! {
2639 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2640 }
2641 if !transaction.data().intent_message().value.is_system_tx() {
2642 let committee = epoch_store.committee();
2643 let cur_stake = (**committee).weight(&self.name);
2644 if cur_stake > 0 {
2645 FAIL_STATE.with_borrow_mut(|fail_state| {
2646 if fail_state.0 < committee.validity_threshold() {
2648 fail_state.0 += cur_stake;
2649 fail_state.1.insert(self.name);
2650 }
2651
2652 if fail_state.1.contains(&self.name) {
2653 info!("cp_exec failing tx");
2654 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2655 }
2656 });
2657 }
2658 }
2659 }
2660
2661 fn process_object_index(
2662 &self,
2663 effects: &TransactionEffects,
2664 written: &WrittenObjects,
2665 inner_temporary_store: &InnerTemporaryStore,
2666 epoch_store: &Arc<AuthorityPerEpochStore>,
2667 ) -> IotaResult<ObjectIndexChanges> {
2668 let mut layout_resolver =
2669 epoch_store
2670 .executor()
2671 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2672 inner_temporary_store,
2673 self.get_backing_package_store(),
2674 )));
2675
2676 let modified_at_version = effects
2677 .modified_at_versions()
2678 .into_iter()
2679 .collect::<HashMap<_, _>>();
2680
2681 let tx_digest = effects.transaction_digest();
2682 let mut deleted_owners = vec![];
2683 let mut deleted_dynamic_fields = vec![];
2684 for object_ref in effects.deleted().into_iter().chain(effects.wrapped()) {
2685 let old_version = modified_at_version.get(&object_ref.object_id).unwrap();
2686 match self.get_owner_at_version(&object_ref.object_id, *old_version).unwrap_or_else(
2689 |e| panic!("tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {} at version {old_version:?}. Err: {e:?}", object_ref.object_id)
2690 ) {
2691 Owner::Address(addr) => deleted_owners.push((addr, object_ref.object_id)),
2692 Owner::Object(object_id) => {
2693 deleted_dynamic_fields.push((object_id, object_ref.object_id))
2694 }
2695 _ => {}
2696 }
2697 }
2698
2699 let mut new_owners = vec![];
2700 let mut new_dynamic_fields = vec![];
2701
2702 for (oref, owner, kind) in effects.all_changed_objects() {
2703 let id = &oref.object_id;
2704 if let WriteKind::Mutate = kind {
2707 let Some(old_version) = modified_at_version.get(id) else {
2708 panic!(
2709 "tx_digest={tx_digest}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2710 );
2711 };
2712 let Some(old_object) = self
2715 .get_object_store()
2716 .try_get_object_by_key(id, *old_version)?
2717 else {
2718 panic!(
2719 "tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {id} at version {old_version:?}"
2720 );
2721 };
2722 if old_object.owner != owner {
2723 match old_object.owner {
2724 Owner::Address(addr) => {
2725 deleted_owners.push((addr, *id));
2726 }
2727 Owner::Object(object_id) => deleted_dynamic_fields.push((object_id, *id)),
2728 _ => {}
2729 }
2730 }
2731 }
2732
2733 match owner {
2734 Owner::Address(addr) => {
2735 let new_object = written.get(id).unwrap_or_else(
2738 || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2739 );
2740 assert_eq!(
2741 new_object.version(),
2742 oref.version,
2743 "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2744 tx_digest,
2745 id,
2746 new_object.version(),
2747 oref.version
2748 );
2749
2750 let type_ = new_object
2751 .type_()
2752 .map(|type_| ObjectType::Struct(type_.clone()))
2753 .unwrap_or(ObjectType::Package);
2754
2755 new_owners.push((
2756 (addr, *id),
2757 ObjectInfo {
2758 object_id: *id,
2759 version: oref.version,
2760 digest: oref.digest,
2761 type_,
2762 owner,
2763 previous_transaction: *effects.transaction_digest(),
2764 },
2765 ));
2766 }
2767 Owner::Object(owner) => {
2768 let new_object = written.get(id).unwrap_or_else(
2769 || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2770 );
2771 assert_eq!(
2772 new_object.version(),
2773 oref.version,
2774 "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2775 tx_digest,
2776 id,
2777 new_object.version(),
2778 oref.version
2779 );
2780
2781 let Some(df_info) = self
2782 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2783 .unwrap_or_else(|e| {
2784 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2785 None
2786 }
2787 )
2788 else {
2789 continue;
2791 };
2792 new_dynamic_fields.push(((owner, *id), df_info))
2793 }
2794 _ => {}
2795 }
2796 }
2797
2798 Ok(ObjectIndexChanges {
2799 deleted_owners,
2800 deleted_dynamic_fields,
2801 new_owners,
2802 new_dynamic_fields,
2803 })
2804 }
2805
2806 fn try_create_dynamic_field_info(
2807 &self,
2808 o: &Object,
2809 written: &WrittenObjects,
2810 resolver: &mut dyn LayoutResolver,
2811 ) -> IotaResult<Option<DynamicFieldInfo>> {
2812 let Some(move_object) = o.data.as_struct_opt().cloned() else {
2814 return Ok(None);
2815 };
2816
2817 if !move_object.struct_tag().is_dynamic_field() {
2819 return Ok(None);
2820 }
2821
2822 let layout = match resolver.get_annotated_layout(move_object.struct_tag()) {
2823 Ok(annotated_layout) => annotated_layout.into_layout(),
2824 Err(e) => {
2825 error!(
2826 "unable to load layout for type `{:?}`: {e}",
2827 move_object.struct_tag()
2828 );
2829 return Ok(None);
2830 }
2831 };
2832
2833 let field =
2834 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2835 IotaError::ObjectDeserialization {
2836 error: e.to_string(),
2837 }
2838 })?;
2839
2840 let type_ = field.kind;
2841 let name_type: TypeTag = type_tag_core_to_sdk(&field.name_layout.into());
2842 let bcs_name = field.name_bytes.to_owned();
2843
2844 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2845 .map_err(|e| {
2846 warn!("{e}");
2847 IotaError::ObjectDeserialization {
2848 error: e.to_string(),
2849 }
2850 })?;
2851
2852 let name = DynamicFieldName {
2853 type_: name_type,
2854 value: IotaMoveValue::from(name_value).to_json_value(),
2855 };
2856
2857 let value_metadata = field.value_metadata().map_err(|e| {
2858 warn!("{e}");
2859 IotaError::ObjectDeserialization {
2860 error: e.to_string(),
2861 }
2862 })?;
2863
2864 Ok(Some(match value_metadata {
2865 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2866 name,
2867 bcs_name,
2868 type_,
2869 object_type: object_type.to_canonical_string(true),
2870 object_id: o.id(),
2871 version: o.version(),
2872 digest: o.digest(),
2873 },
2874
2875 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2876 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2881 (
2882 object.version(),
2883 object.digest(),
2884 object.data.object_type().unwrap().clone(),
2885 )
2886 } else {
2887 let object = self
2889 .get_object_store()
2890 .try_get_object_by_key(&object_id, o.version())?
2891 .ok_or_else(|| UserInputError::ObjectNotFound {
2892 object_id,
2893 version: Some(o.version()),
2894 })?;
2895 let version = object.version();
2896 let digest = object.digest();
2897 let object_type = object.data.object_type().unwrap().clone();
2898 (version, digest, object_type)
2899 };
2900
2901 DynamicFieldInfo {
2902 name,
2903 bcs_name,
2904 type_,
2905 object_type: object_type.to_string(),
2906 object_id,
2907 version,
2908 digest,
2909 }
2910 }
2911 }))
2912 }
2913
2914 #[instrument(level = "trace", skip_all, err)]
2915 fn post_process_one_tx(
2916 &self,
2917 transaction: &VerifiedExecutableTransaction,
2918 effects: &TransactionEffects,
2919 inner_temporary_store: &InnerTemporaryStore,
2920 epoch_store: &Arc<AuthorityPerEpochStore>,
2921 ) -> IotaResult {
2922 if self.indexes.is_none() {
2923 return Ok(());
2924 }
2925
2926 let _scope = monitored_scope("Execution::post_process_one_tx");
2927
2928 let tx_digest = transaction.digest();
2929 let timestamp_ms = Self::unixtime_now_ms();
2930 let events = &inner_temporary_store.events;
2931 let written = &inner_temporary_store.written;
2932 let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2933 effects,
2934 inner_temporary_store,
2935 epoch_store,
2936 );
2937
2938 if let Some(indexes) = &self.indexes {
2940 let _ = self
2941 .index_tx(
2942 indexes.as_ref(),
2943 tx_digest,
2944 transaction,
2945 effects,
2946 events,
2947 timestamp_ms,
2948 tx_coins,
2949 written,
2950 inner_temporary_store,
2951 epoch_store,
2952 )
2953 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2954 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2955 .expect("Indexing tx should not fail");
2956
2957 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2958 let events = self.make_transaction_block_events(
2959 events.clone(),
2960 *tx_digest,
2961 timestamp_ms,
2962 epoch_store,
2963 inner_temporary_store,
2964 )?;
2965 self.subscription_handler
2967 .process_tx(transaction.data().transaction_data(), &effects, &events)
2968 .tap_ok(|_| {
2969 self.metrics
2970 .post_processing_total_tx_had_event_processed
2971 .inc()
2972 })
2973 .tap_err(|e| {
2974 warn!(
2975 ?tx_digest,
2976 "Post processing - Couldn't process events for tx: {}", e
2977 )
2978 })?;
2979
2980 self.metrics
2981 .post_processing_total_events_emitted
2982 .inc_by(events.data.len() as u64);
2983 };
2984 Ok(())
2985 }
2986
2987 fn make_transaction_block_events(
2988 &self,
2989 transaction_events: TransactionEvents,
2990 digest: TransactionDigest,
2991 timestamp_ms: u64,
2992 epoch_store: &Arc<AuthorityPerEpochStore>,
2993 inner_temporary_store: &InnerTemporaryStore,
2994 ) -> IotaResult<IotaTransactionBlockEvents> {
2995 let mut layout_resolver =
2996 epoch_store
2997 .executor()
2998 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2999 inner_temporary_store,
3000 self.get_backing_package_store(),
3001 )));
3002 IotaTransactionBlockEvents::try_from(
3003 transaction_events,
3004 digest,
3005 Some(timestamp_ms),
3006 layout_resolver.as_mut(),
3007 )
3008 }
3009
3010 pub fn unixtime_now_ms() -> u64 {
3011 let now = SystemTime::now()
3012 .duration_since(UNIX_EPOCH)
3013 .expect("Time went backwards")
3014 .as_millis();
3015 u64::try_from(now).expect("Travelling in time machine")
3016 }
3017
3018 #[instrument(level = "trace", skip_all)]
3019 pub async fn handle_transaction_info_request(
3020 &self,
3021 request: TransactionInfoRequest,
3022 ) -> IotaResult<TransactionInfoResponse> {
3023 let epoch_store = self.load_epoch_store_one_call_per_task();
3024 let (transaction, status) = self
3025 .get_transaction_status(&request.transaction_digest, &epoch_store)?
3026 .ok_or(IotaError::TransactionNotFound {
3027 digest: request.transaction_digest,
3028 })?;
3029 Ok(TransactionInfoResponse {
3030 transaction,
3031 status,
3032 })
3033 }
3034
3035 #[instrument(level = "trace", skip_all)]
3036 pub async fn handle_object_info_request(
3037 &self,
3038 request: ObjectInfoRequest,
3039 ) -> IotaResult<ObjectInfoResponse> {
3040 let epoch_store = self.load_epoch_store_one_call_per_task();
3041
3042 let requested_object_seq = match request.request_kind {
3043 ObjectInfoRequestKind::LatestObjectInfo => {
3044 self.try_get_object_or_tombstone(request.object_id)
3045 .await?
3046 .ok_or_else(|| {
3047 IotaError::from(UserInputError::ObjectNotFound {
3048 object_id: request.object_id,
3049 version: None,
3050 })
3051 })?
3052 .version
3053 }
3054 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
3055 };
3056
3057 let object = self
3058 .get_object_store()
3059 .try_get_object_by_key(&request.object_id, requested_object_seq)?
3060 .ok_or_else(|| {
3061 IotaError::from(UserInputError::ObjectNotFound {
3062 object_id: request.object_id,
3063 version: Some(requested_object_seq),
3064 })
3065 })?;
3066
3067 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
3068 (request.generate_layout, object.data.as_struct_opt())
3069 {
3070 Some(into_struct_layout(
3071 epoch_store
3072 .executor()
3073 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3074 .get_annotated_layout(move_obj.struct_tag())?,
3075 )?)
3076 } else {
3077 None
3078 };
3079
3080 let lock = if !object.is_address_owned() {
3081 None
3083 } else {
3084 self.get_transaction_lock(&object.object_ref(), &epoch_store)
3085 .await?
3086 .map(|s| s.into_inner())
3087 };
3088
3089 Ok(ObjectInfoResponse {
3090 object,
3091 layout,
3092 lock_for_debugging: lock,
3093 })
3094 }
3095
3096 #[instrument(level = "trace", skip_all)]
3097 pub fn handle_checkpoint_request(
3098 &self,
3099 request: &CheckpointRequest,
3100 ) -> IotaResult<CheckpointResponse> {
3101 let summary = if request.certified {
3102 let summary = match request.sequence_number {
3103 Some(seq) => self
3104 .checkpoint_store
3105 .get_checkpoint_by_sequence_number(seq)?,
3106 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3107 }
3108 .map(|v| v.into_inner());
3109 summary.map(CheckpointSummaryResponse::Certified)
3110 } else {
3111 let summary = match request.sequence_number {
3112 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3113 None => self
3114 .checkpoint_store
3115 .get_latest_locally_computed_checkpoint()?,
3116 };
3117 summary.map(CheckpointSummaryResponse::Pending)
3118 };
3119 let contents = match &summary {
3120 Some(s) => self
3121 .checkpoint_store
3122 .get_checkpoint_contents(&s.content_digest())?,
3123 None => None,
3124 };
3125 Ok(CheckpointResponse {
3126 checkpoint: summary,
3127 contents,
3128 })
3129 }
3130
3131 fn check_protocol_version(
3132 supported_protocol_versions: SupportedProtocolVersions,
3133 current_version: ProtocolVersion,
3134 ) {
3135 info!("current protocol version is now {:?}", current_version);
3136 info!("supported versions are: {:?}", supported_protocol_versions);
3137 if !supported_protocol_versions.is_version_supported(current_version) {
3138 let msg = format!(
3139 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3140 );
3141
3142 error!("{}", msg);
3143 eprintln!("{msg}");
3144
3145 #[cfg(not(msim))]
3146 std::process::exit(1);
3147
3148 #[cfg(msim)]
3149 iota_simulator::task::shutdown_current_node();
3150 }
3151 }
3152
3153 #[expect(clippy::disallowed_methods)] #[expect(clippy::too_many_arguments)]
3155 pub async fn new(
3156 name: AuthorityName,
3157 secret: StableSyncAuthoritySigner,
3158 supported_protocol_versions: SupportedProtocolVersions,
3159 store: Arc<AuthorityStore>,
3160 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3161 epoch_store: Arc<AuthorityPerEpochStore>,
3162 committee_store: Arc<CommitteeStore>,
3163 indexes: Option<Arc<IndexStore>>,
3164 grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3165 checkpoint_store: Arc<CheckpointStore>,
3166 prometheus_registry: &Registry,
3167 genesis_objects: &[Object],
3168 db_checkpoint_config: &DBCheckpointConfig,
3169 config: NodeConfig,
3170 archive_readers: ArchiveReaderBalancer,
3171 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3172 chain_identifier: ChainIdentifier,
3173 pruner_db: Option<Arc<AuthorityPrunerTables>>,
3174 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
3175 policy_config: Option<PolicyConfig>,
3176 firewall_config: Option<RemoteFirewallConfig>,
3177 ) -> Arc<Self> {
3178 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3179
3180 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3181 let (tx_ready_transactions, rx_ready_transactions) = unbounded_channel();
3182 let transaction_manager = Arc::new(TransactionManager::new(
3183 execution_cache_trait_pointers.object_cache_reader.clone(),
3184 execution_cache_trait_pointers
3185 .transaction_cache_reader
3186 .clone(),
3187 &epoch_store,
3188 tx_ready_transactions,
3189 metrics.clone(),
3190 ));
3191 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3192
3193 let authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3194 epoch_store.get_parent_path(),
3195 config
3196 .authority_store_pruning_config
3197 .num_latest_epoch_dbs_to_retain,
3198 )
3199 .await;
3200 let _pruner = AuthorityStorePruner::new(
3201 store.perpetual_tables.clone(),
3202 checkpoint_store.clone(),
3203 grpc_indexes_store.clone(),
3204 indexes.clone(),
3205 config.authority_store_pruning_config.clone(),
3206 epoch_store.committee().authority_exists(&name),
3207 epoch_store.epoch_start_state().epoch_duration_ms(),
3208 prometheus_registry,
3209 archive_readers,
3210 pruner_db,
3211 checkpoint_progress_tracker.clone(),
3212 );
3213 let input_loader =
3214 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3215 let epoch = epoch_store.epoch();
3216 let rgp = epoch_store.reference_gas_price();
3217 let traffic_controller_metrics =
3218 Arc::new(TrafficControllerMetrics::new(prometheus_registry));
3219 let traffic_controller = if let Some(policy_config) = policy_config {
3220 Some(Arc::new(
3221 TrafficController::init(
3222 policy_config,
3223 traffic_controller_metrics,
3224 firewall_config.clone(),
3225 )
3226 .await,
3227 ))
3228 } else {
3229 None
3230 };
3231 let state = Arc::new(AuthorityState {
3232 name,
3233 secret,
3234 execution_lock: RwLock::new(epoch),
3235 epoch_store: ArcSwap::new(epoch_store.clone()),
3236 input_loader,
3237 execution_cache_trait_pointers,
3238 indexes,
3239 grpc_indexes_store,
3240 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3241 checkpoint_store,
3242 committee_store,
3243 transaction_manager,
3244 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3245 metrics,
3246 _pruner,
3247 authority_per_epoch_pruner,
3248 checkpoint_progress_tracker,
3249 db_checkpoint_config: db_checkpoint_config.clone(),
3250 config,
3251 overload_info: AuthorityOverloadInfo::default(),
3252 validator_tx_finalizer,
3253 chain_identifier,
3254 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3255 traffic_controller,
3256 });
3257
3258 let authority_state = Arc::downgrade(&state);
3260 spawn_monitored_task!(execution_process(
3261 authority_state,
3262 rx_ready_transactions,
3263 rx_execution_shutdown,
3264 ));
3265 spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3266
3267 state
3269 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3270 .expect("Error indexing genesis objects.");
3271
3272 state
3273 }
3274
3275 pub fn epoch_db_pruner(&self) -> &AuthorityPerEpochStorePruner {
3276 &self.authority_per_epoch_pruner
3277 }
3278
3279 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3281 &self.execution_cache_trait_pointers.object_cache_reader
3282 }
3283
3284 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3285 &self.execution_cache_trait_pointers.transaction_cache_reader
3286 }
3287
3288 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3289 &self.execution_cache_trait_pointers.cache_writer
3290 }
3291
3292 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3293 &self.execution_cache_trait_pointers.backing_store
3294 }
3295
3296 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3297 &self.execution_cache_trait_pointers.backing_package_store
3298 }
3299
3300 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3301 &self.execution_cache_trait_pointers.object_store
3302 }
3303
3304 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3305 &self.execution_cache_trait_pointers.reconfig_api
3306 }
3307
3308 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3309 &self.execution_cache_trait_pointers.global_state_hash_store
3310 }
3311
3312 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3313 &self.execution_cache_trait_pointers.checkpoint_cache
3314 }
3315
3316 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3317 &self.execution_cache_trait_pointers.state_sync_store
3318 }
3319
3320 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3321 &self.execution_cache_trait_pointers.cache_commit
3322 }
3323
3324 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3325 self.execution_cache_trait_pointers
3326 .testing_api
3327 .database_for_testing()
3328 }
3329
3330 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3331 &self,
3332 config: NodeConfig,
3333 metrics: Arc<AuthorityStorePruningMetrics>,
3334 ) -> anyhow::Result<()> {
3335 let archive_readers =
3336 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3337 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3338 &self.database_for_testing().perpetual_tables,
3339 &self.checkpoint_store,
3340 self.grpc_indexes_store.as_deref(),
3341 None,
3342 config.authority_store_pruning_config,
3343 metrics,
3344 archive_readers,
3345 EPOCH_DURATION_MS_FOR_TESTING,
3346 self.checkpoint_progress_tracker.as_ref(),
3347 )
3348 .await
3349 }
3350
3351 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3352 &self.transaction_manager
3353 }
3354
3355 pub fn enqueue_transactions_for_execution(
3358 &self,
3359 transactions: Vec<VerifiedExecutableTransaction>,
3360 epoch_store: &Arc<AuthorityPerEpochStore>,
3361 ) {
3362 self.transaction_manager.enqueue(transactions, epoch_store)
3363 }
3364
3365 pub fn enqueue_certificates_for_execution(
3367 &self,
3368 certs: Vec<VerifiedCertificate>,
3369 epoch_store: &Arc<AuthorityPerEpochStore>,
3370 ) {
3371 self.transaction_manager
3372 .enqueue_certificates(certs, epoch_store)
3373 }
3374
3375 pub fn enqueue_with_expected_effects_digest(
3376 &self,
3377 transactions: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3378 epoch_store: &AuthorityPerEpochStore,
3379 ) {
3380 self.transaction_manager
3381 .enqueue_with_expected_effects_digest(transactions, epoch_store)
3382 }
3383
3384 fn create_owner_index_if_empty(
3385 &self,
3386 genesis_objects: &[Object],
3387 epoch_store: &Arc<AuthorityPerEpochStore>,
3388 ) -> IotaResult {
3389 let Some(index_store) = &self.indexes else {
3390 return Ok(());
3391 };
3392 if !index_store.is_empty() {
3393 return Ok(());
3394 }
3395
3396 let mut new_owners = vec![];
3397 let mut new_dynamic_fields = vec![];
3398 let mut layout_resolver = epoch_store
3399 .executor()
3400 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3401 for o in genesis_objects.iter() {
3402 match o.owner {
3403 Owner::Address(addr) => {
3404 new_owners.push(((addr, o.id()), ObjectInfo::new(&o.object_ref(), o)))
3405 }
3406 Owner::Object(object_id) => {
3407 let id = o.id();
3408 let info = match self.try_create_dynamic_field_info(
3409 o,
3410 &BTreeMap::new(),
3411 layout_resolver.as_mut(),
3412 ) {
3413 Ok(Some(info)) => info,
3414 Ok(None) => continue,
3415 Err(IotaError::UserInput {
3416 error:
3417 UserInputError::ObjectNotFound {
3418 object_id: not_found_id,
3419 version,
3420 },
3421 }) => {
3422 warn!(
3423 ?not_found_id,
3424 ?version,
3425 object_owner=?object_id,
3426 field=?id,
3427 "Skipping dynamic field: referenced genesis object not found"
3428 );
3429 continue;
3430 }
3431 Err(e) => return Err(e),
3432 };
3433 new_dynamic_fields.push(((object_id, id), info));
3434 }
3435 _ => {}
3436 }
3437 }
3438
3439 index_store.insert_genesis_objects(ObjectIndexChanges {
3440 deleted_owners: vec![],
3441 deleted_dynamic_fields: vec![],
3442 new_owners,
3443 new_dynamic_fields,
3444 })
3445 }
3446
3447 pub fn execution_lock_for_executable_transaction(
3451 &self,
3452 transaction: &VerifiedExecutableTransaction,
3453 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3454 let lock = self
3455 .execution_lock
3456 .try_read()
3457 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3458 if *lock == transaction.auth_sig().epoch() {
3459 Ok(lock)
3460 } else {
3461 Err(IotaError::WrongEpoch {
3462 expected_epoch: *lock,
3463 actual_epoch: transaction.auth_sig().epoch(),
3464 })
3465 }
3466 }
3467
3468 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3474 self.execution_lock
3475 .try_read()
3476 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3477 }
3478
3479 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3480 self.execution_lock.write().await
3481 }
3482
3483 #[instrument(level = "error", skip_all)]
3484 pub async fn reconfigure(
3485 &self,
3486 cur_epoch_store: &AuthorityPerEpochStore,
3487 supported_protocol_versions: SupportedProtocolVersions,
3488 new_committee: Committee,
3489 epoch_start_configuration: EpochStartConfiguration,
3490 state_hasher: Arc<GlobalStateHasher>,
3491 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3492 epoch_supply_change: i64,
3493 epoch_last_checkpoint: CheckpointSequenceNumber,
3494 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3495 Self::check_protocol_version(
3496 supported_protocol_versions,
3497 epoch_start_configuration
3498 .epoch_start_state()
3499 .protocol_version(),
3500 );
3501 self.metrics.reset_on_reconfigure();
3502 self.committee_store.insert_new_committee(&new_committee)?;
3503
3504 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3506
3507 cur_epoch_store.epoch_terminated().await;
3509
3510 let highest_locally_built_checkpoint_seq = self
3511 .checkpoint_store
3512 .get_latest_locally_computed_checkpoint()?
3513 .map(|c| *c.sequence_number())
3514 .unwrap_or(0);
3515
3516 assert!(
3517 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3518 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3519 );
3520 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint
3521 || self.is_fullnode(cur_epoch_store)
3522 {
3523 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3527 if num_shared_version_assignments > 10 {
3534 debug_fatal!(
3536 "all shared_version_assignments should have been removed \
3537 (num_shared_version_assignments: {num_shared_version_assignments})"
3538 );
3539 }
3540 }
3541
3542 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3548 .await?;
3549 self.get_reconfig_api()
3550 .clear_state_end_of_epoch(&execution_lock);
3551 self.check_system_consistency(
3552 cur_epoch_store,
3553 state_hasher,
3554 expensive_safety_check_config,
3555 epoch_supply_change,
3556 )?;
3557 self.get_reconfig_api()
3558 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3559 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3560 if self
3561 .db_checkpoint_config
3562 .perform_db_checkpoints_at_epoch_end
3563 {
3564 let checkpoint_indexes = self
3565 .db_checkpoint_config
3566 .perform_index_db_checkpoints_at_epoch_end
3567 .unwrap_or(false);
3568 let current_epoch = cur_epoch_store.epoch();
3569 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3570 self.checkpoint_all_dbs(
3571 &epoch_checkpoint_path,
3572 cur_epoch_store,
3573 checkpoint_indexes,
3574 )?;
3575 }
3576 }
3577
3578 let new_epoch = new_committee.epoch;
3579 let new_epoch_store = self
3580 .reopen_epoch_db(
3581 cur_epoch_store,
3582 new_committee,
3583 epoch_start_configuration,
3584 expensive_safety_check_config,
3585 epoch_last_checkpoint,
3586 )
3587 .await?;
3588 assert_eq!(new_epoch_store.epoch(), new_epoch);
3589 self.transaction_manager.reconfigure(new_epoch);
3590 *execution_lock = new_epoch;
3591 Ok(new_epoch_store)
3595 }
3596
3597 pub async fn reconfigure_for_testing(&self) {
3602 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3603 let epoch_store = self.epoch_store_for_testing().clone();
3604 let protocol_config = epoch_store.protocol_config().clone();
3605 let _guard =
3613 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3614 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3615 self.get_backing_package_store().clone(),
3616 &self.config.expensive_safety_check_config,
3617 self.checkpoint_store
3618 .get_epoch_last_checkpoint(epoch_store.epoch())
3619 .unwrap()
3620 .map(|c| *c.sequence_number())
3621 .unwrap_or_default(),
3622 );
3623 let new_epoch = new_epoch_store.epoch();
3624 self.transaction_manager.reconfigure(new_epoch);
3625 self.epoch_store.store(new_epoch_store);
3626 epoch_store.epoch_terminated().await;
3627 *execution_lock = new_epoch;
3628 }
3629
3630 #[instrument(level = "error", skip_all)]
3631 fn check_system_consistency(
3632 &self,
3633 cur_epoch_store: &AuthorityPerEpochStore,
3634 state_hasher: Arc<GlobalStateHasher>,
3635 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3636 epoch_supply_change: i64,
3637 ) -> IotaResult<()> {
3638 info!(
3639 "Performing iota conservation consistency check for epoch {}",
3640 cur_epoch_store.epoch()
3641 );
3642
3643 if cfg!(debug_assertions) {
3644 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3645 }
3646
3647 self.get_reconfig_api()
3648 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3649
3650 if expensive_safety_check_config.enable_state_consistency_check() {
3652 info!(
3653 "Performing state consistency check for epoch {}",
3654 cur_epoch_store.epoch()
3655 );
3656 self.expensive_check_is_consistent_state(
3657 state_hasher,
3658 cur_epoch_store,
3659 cfg!(debug_assertions), );
3661 }
3662
3663 if expensive_safety_check_config.enable_secondary_index_checks() {
3664 if let Some(indexes) = self.indexes.clone() {
3665 verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
3666 .expect("secondary indexes are inconsistent");
3667 }
3668 }
3669
3670 Ok(())
3671 }
3672
3673 fn expensive_check_is_consistent_state(
3674 &self,
3675 state_hasher: Arc<GlobalStateHasher>,
3676 cur_epoch_store: &AuthorityPerEpochStore,
3677 panic: bool,
3678 ) {
3679 let live_object_set_hash = state_hasher.digest_live_object_set();
3680
3681 let root_state_hash: ECMHLiveObjectSetDigest = self
3682 .get_global_state_hash_store()
3683 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
3684 .expect("Retrieving root state hash cannot fail")
3685 .expect("Root state hash for epoch must exist")
3686 .1
3687 .digest()
3688 .into();
3689
3690 let is_inconsistent = root_state_hash != live_object_set_hash;
3691 if is_inconsistent {
3692 if panic {
3693 panic!(
3694 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3695 );
3696 } else {
3697 error!(
3698 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3699 root_state_hash, live_object_set_hash
3700 );
3701 }
3702 } else {
3703 info!("State consistency check passed");
3704 }
3705
3706 if !panic {
3707 state_hasher.set_inconsistent_state(is_inconsistent);
3708 }
3709 }
3710
3711 pub fn current_epoch_for_testing(&self) -> EpochId {
3712 self.epoch_store_for_testing().epoch()
3713 }
3714
3715 #[instrument(level = "error", skip_all)]
3716 pub fn checkpoint_all_dbs(
3717 &self,
3718 checkpoint_path: &Path,
3719 cur_epoch_store: &AuthorityPerEpochStore,
3720 checkpoint_indexes: bool,
3721 ) -> IotaResult {
3722 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3723 let current_epoch = cur_epoch_store.epoch();
3724
3725 if checkpoint_path.exists() {
3726 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3727 return Ok(());
3728 }
3729
3730 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3731 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3732
3733 if checkpoint_path_tmp.exists() {
3734 fs::remove_dir_all(&checkpoint_path_tmp)
3735 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3736 }
3737
3738 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3739 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3740
3741 self.checkpoint_store
3744 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3745
3746 self.get_reconfig_api()
3747 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3748
3749 self.committee_store
3750 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3751
3752 if checkpoint_indexes {
3753 if let Some(indexes) = self.indexes.as_ref() {
3754 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3755 }
3756 if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3757 grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3758 }
3759 }
3760
3761 fs::rename(checkpoint_path_tmp, checkpoint_path)
3762 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3763 Ok(())
3764 }
3765
3766 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3773 self.epoch_store.load()
3774 }
3775
3776 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3778 self.load_epoch_store_one_call_per_task()
3779 }
3780
3781 pub fn clone_committee_for_testing(&self) -> Committee {
3782 Committee::clone(self.epoch_store_for_testing().committee())
3783 }
3784
3785 #[instrument(level = "trace", skip_all)]
3786 pub async fn try_get_object(&self, object_id: &ObjectId) -> IotaResult<Option<Object>> {
3787 self.get_object_store()
3788 .try_get_object(object_id)
3789 .map_err(Into::into)
3790 }
3791
3792 pub async fn get_object(&self, object_id: &ObjectId) -> Option<Object> {
3794 self.try_get_object(object_id)
3795 .await
3796 .expect("storage access failed")
3797 }
3798
3799 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3800 Ok(self
3801 .try_get_object(&ObjectId::SYSTEM)
3802 .await?
3803 .expect("system package should always exist")
3804 .object_ref())
3805 }
3806
3807 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3809 self.get_object_cache_reader()
3810 .try_get_iota_system_state_object_unsafe()
3811 }
3812
3813 #[instrument(level = "trace", skip_all)]
3814 pub fn get_checkpoint_by_sequence_number(
3815 &self,
3816 sequence_number: CheckpointSequenceNumber,
3817 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3818 Ok(self
3819 .checkpoint_store
3820 .get_checkpoint_by_sequence_number(sequence_number)?)
3821 }
3822
3823 pub async fn wait_for_checkpoint_inclusion(
3830 &self,
3831 digests: &[TransactionDigest],
3832 timeout: Duration,
3833 ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3834 let epoch_store = self.load_epoch_store_one_call_per_task();
3835
3836 let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3839
3840 let results = epoch_store
3841 .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3842 *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3843 self.get_checkpoint_by_sequence_number(seq)
3844 .ok()
3845 .flatten()
3846 .map(|c| c.timestamp_ms)
3847 .unwrap_or(0)
3848 })
3849 })
3850 .await?;
3851
3852 Ok(digests
3853 .iter()
3854 .copied()
3855 .zip(results)
3856 .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3857 .collect())
3858 }
3859
3860 #[instrument(level = "trace", skip_all)]
3861 pub fn get_transaction_checkpoint_for_tests(
3862 &self,
3863 digest: &TransactionDigest,
3864 epoch_store: &AuthorityPerEpochStore,
3865 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3866 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3867 let Some(checkpoint) = checkpoint else {
3868 return Ok(None);
3869 };
3870 let checkpoint = self
3871 .checkpoint_store
3872 .get_checkpoint_by_sequence_number(checkpoint)?;
3873 Ok(checkpoint)
3874 }
3875
3876 #[instrument(level = "trace", skip_all)]
3877 pub fn get_object_read(&self, object_id: &ObjectId) -> IotaResult<ObjectRead> {
3878 Ok(
3879 match self
3880 .get_object_cache_reader()
3881 .try_get_latest_object_or_tombstone(*object_id)?
3882 {
3883 Some((_, ObjectOrTombstone::Object(object))) => {
3884 let layout = self.get_object_layout(&object)?;
3885 ObjectRead::Exists(object.object_ref(), object, layout)
3886 }
3887 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3888 None => ObjectRead::NotExists(*object_id),
3889 },
3890 )
3891 }
3892
3893 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3895 self.chain_identifier
3896 }
3897
3898 #[instrument(level = "trace", skip_all)]
3899 pub fn get_move_object<T>(&self, object_id: &ObjectId) -> IotaResult<T>
3900 where
3901 T: DeserializeOwned,
3902 {
3903 let o = self.get_object_read(object_id)?.into_object()?;
3904 if let Some(move_object) = o.data.as_struct_opt() {
3905 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3906 IotaError::ObjectDeserialization {
3907 error: format!("{e}"),
3908 }
3909 })?)
3910 } else {
3911 Err(IotaError::ObjectDeserialization {
3912 error: format!("Provided object : [{object_id}] is not a Move object."),
3913 })
3914 }
3915 }
3916
3917 #[instrument(level = "trace", skip_all)]
3923 pub fn get_past_object_read(
3924 &self,
3925 object_id: &ObjectId,
3926 version: SequenceNumber,
3927 ) -> IotaResult<PastObjectRead> {
3928 let Some(obj_ref) = self
3930 .get_object_cache_reader()
3931 .try_get_latest_object_ref_or_tombstone(*object_id)?
3932 else {
3933 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3934 };
3935
3936 if version > obj_ref.version {
3937 return Ok(PastObjectRead::VersionTooHigh {
3938 object_id: *object_id,
3939 asked_version: version,
3940 latest_version: obj_ref.version,
3941 });
3942 }
3943
3944 if version < obj_ref.version {
3945 return Ok(match self.read_object_at_version(object_id, version)? {
3947 Some((object, layout)) => {
3948 let obj_ref = object.object_ref();
3949 PastObjectRead::VersionFound(obj_ref, object, layout)
3950 }
3951
3952 None => PastObjectRead::VersionNotFound(*object_id, version),
3953 });
3954 }
3955
3956 if !obj_ref.digest.is_object_alive() {
3957 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3958 }
3959
3960 match self.read_object_at_version(object_id, obj_ref.version)? {
3961 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3962 None => {
3963 error!(
3964 "Object with in parent_entry is missing from object store, datastore is \
3965 inconsistent",
3966 );
3967 Err(UserInputError::ObjectNotFound {
3968 object_id: *object_id,
3969 version: Some(obj_ref.version),
3970 }
3971 .into())
3972 }
3973 }
3974 }
3975
3976 #[instrument(level = "trace", skip_all)]
3977 fn read_object_at_version(
3978 &self,
3979 object_id: &ObjectId,
3980 version: SequenceNumber,
3981 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3982 let Some(object) = self
3983 .get_object_cache_reader()
3984 .try_get_object_by_key(object_id, version)?
3985 else {
3986 return Ok(None);
3987 };
3988
3989 let layout = self.get_object_layout(&object)?;
3990 Ok(Some((object, layout)))
3991 }
3992
3993 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3994 let layout = object
3995 .data
3996 .as_struct_opt()
3997 .map(|object| {
3998 into_struct_layout(
3999 self.load_epoch_store_one_call_per_task()
4000 .executor()
4001 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
4003 .get_annotated_layout(object.struct_tag())?,
4004 )
4005 })
4006 .transpose()?;
4007 Ok(layout)
4008 }
4009
4010 fn get_owner_at_version(
4011 &self,
4012 object_id: &ObjectId,
4013 version: SequenceNumber,
4014 ) -> IotaResult<Owner> {
4015 self.get_object_store()
4016 .try_get_object_by_key(object_id, version)?
4017 .ok_or_else(|| {
4018 IotaError::from(UserInputError::ObjectNotFound {
4019 object_id: *object_id,
4020 version: Some(version),
4021 })
4022 })
4023 .map(|o| o.owner)
4024 }
4025
4026 #[instrument(level = "trace", skip_all)]
4027 pub fn get_owner_objects(
4028 &self,
4029 owner: Address,
4030 cursor: Option<ObjectId>,
4032 limit: usize,
4033 filter: Option<IotaObjectDataFilter>,
4034 ) -> IotaResult<Vec<ObjectInfo>> {
4035 if let Some(indexes) = &self.indexes {
4036 indexes.get_owner_objects(owner, cursor, limit, filter)
4037 } else {
4038 Err(IotaError::IndexStoreNotAvailable)
4039 }
4040 }
4041
4042 #[instrument(level = "trace", skip_all)]
4043 pub fn get_owned_coins_iterator_with_cursor(
4044 &self,
4045 owner: Address,
4046 cursor: (String, ObjectId),
4048 limit: usize,
4049 one_coin_type_only: bool,
4050 ) -> IotaResult<impl Iterator<Item = (String, ObjectId, CoinInfo)> + '_> {
4051 if let Some(indexes) = &self.indexes {
4052 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
4053 } else {
4054 Err(IotaError::IndexStoreNotAvailable)
4055 }
4056 }
4057
4058 #[instrument(level = "trace", skip_all)]
4059 pub fn get_owner_objects_iterator(
4060 &self,
4061 owner: Address,
4062 cursor: Option<ObjectId>,
4064 filter: Option<IotaObjectDataFilter>,
4065 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
4066 let cursor_u = cursor.unwrap_or(ObjectId::ZERO);
4067 if let Some(indexes) = &self.indexes {
4068 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
4069 } else {
4070 Err(IotaError::IndexStoreNotAvailable)
4071 }
4072 }
4073
4074 #[instrument(level = "trace", skip_all)]
4075 pub async fn get_move_objects<T>(&self, owner: Address, tag: StructTag) -> IotaResult<Vec<T>>
4076 where
4077 T: DeserializeOwned,
4078 {
4079 let object_ids = self
4080 .get_owner_objects_iterator(owner, None, None)?
4081 .filter(|o| match &o.type_ {
4082 ObjectType::Struct(s) => *s == tag,
4083 ObjectType::Package => false,
4084 })
4085 .map(|info| ObjectKey(info.object_id, info.version))
4086 .collect::<Vec<_>>();
4087 let mut move_objects = vec![];
4088
4089 let objects = self
4090 .get_object_store()
4091 .try_multi_get_objects_by_key(&object_ids)?;
4092
4093 for (o, id) in objects.into_iter().zip(object_ids) {
4094 let object = o.ok_or_else(|| {
4095 IotaError::from(UserInputError::ObjectNotFound {
4096 object_id: id.0,
4097 version: Some(id.1),
4098 })
4099 })?;
4100 let move_object = object.data.as_struct_opt().ok_or_else(|| {
4101 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
4102 })?;
4103 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
4104 IotaError::ObjectDeserialization {
4105 error: format!("{e}"),
4106 }
4107 })?);
4108 }
4109 Ok(move_objects)
4110 }
4111
4112 #[instrument(level = "trace", skip_all)]
4113 pub fn get_dynamic_fields(
4114 &self,
4115 owner: ObjectId,
4116 cursor: Option<ObjectId>,
4118 limit: usize,
4119 ) -> IotaResult<Vec<(ObjectId, DynamicFieldInfo)>> {
4120 Ok(self
4121 .get_dynamic_fields_iterator(owner, cursor)?
4122 .take(limit)
4123 .collect::<Result<Vec<_>, _>>()?)
4124 }
4125
4126 fn get_dynamic_fields_iterator(
4127 &self,
4128 owner: ObjectId,
4129 cursor: Option<ObjectId>,
4131 ) -> IotaResult<impl Iterator<Item = Result<(ObjectId, DynamicFieldInfo), TypedStoreError>> + '_>
4132 {
4133 if let Some(indexes) = &self.indexes {
4134 indexes.get_dynamic_fields_iterator(owner, cursor)
4135 } else {
4136 Err(IotaError::IndexStoreNotAvailable)
4137 }
4138 }
4139
4140 #[instrument(level = "trace", skip_all)]
4141 pub fn get_dynamic_field_object_id(
4142 &self,
4143 owner: ObjectId,
4144 name_type: TypeTag,
4145 name_bcs_bytes: &[u8],
4146 ) -> IotaResult<Option<ObjectId>> {
4147 if let Some(indexes) = &self.indexes {
4148 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4149 } else {
4150 Err(IotaError::IndexStoreNotAvailable)
4151 }
4152 }
4153
4154 #[instrument(level = "trace", skip_all)]
4155 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4156 Ok(self.get_indexes()?.next_sequence_number())
4157 }
4158
4159 #[instrument(level = "trace", skip_all)]
4160 pub async fn get_executed_transaction_and_effects(
4161 &self,
4162 digest: TransactionDigest,
4163 kv_store: Arc<TransactionKeyValueStore>,
4164 ) -> IotaResult<(Transaction, TransactionEffects)> {
4165 let transaction = kv_store.get_tx(digest).await?;
4166 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4167 Ok((transaction, effects))
4168 }
4169
4170 #[instrument(level = "trace", skip_all)]
4171 pub fn multi_get_checkpoint_by_sequence_number(
4172 &self,
4173 sequence_numbers: &[CheckpointSequenceNumber],
4174 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4175 Ok(self
4176 .checkpoint_store
4177 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4178 }
4179
4180 #[instrument(level = "trace", skip_all)]
4181 pub fn get_transaction_events(
4182 &self,
4183 digest: &TransactionDigest,
4184 ) -> IotaResult<TransactionEvents> {
4185 self.get_transaction_cache_reader()
4186 .try_get_events(digest)?
4187 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4188 }
4189
4190 pub fn get_transaction_input_objects(
4191 &self,
4192 effects: &TransactionEffects,
4193 ) -> anyhow::Result<Vec<Object>> {
4194 iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4195 .map_err(Into::into)
4196 }
4197
4198 pub fn get_transaction_output_objects(
4199 &self,
4200 effects: &TransactionEffects,
4201 ) -> anyhow::Result<Vec<Object>> {
4202 iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4203 .map_err(Into::into)
4204 }
4205
4206 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4207 match &self.indexes {
4208 Some(i) => Ok(i.clone()),
4209 None => Err(IotaError::UnsupportedFeature {
4210 error: "extended object indexing is not enabled on this server".into(),
4211 }),
4212 }
4213 }
4214
4215 pub async fn get_transactions_for_tests(
4216 self: &Arc<Self>,
4217 filter: Option<TransactionFilter>,
4218 cursor: Option<TransactionDigest>,
4219 limit: Option<usize>,
4220 reverse: bool,
4221 ) -> IotaResult<Vec<TransactionDigest>> {
4222 let metrics = KeyValueStoreMetrics::new_for_tests();
4223 let kv_store = Arc::new(TransactionKeyValueStore::new(
4224 "rocksdb",
4225 metrics,
4226 self.clone(),
4227 ));
4228 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4229 .await
4230 }
4231
4232 #[instrument(level = "trace", skip_all)]
4233 pub async fn get_transactions(
4234 &self,
4235 kv_store: &Arc<TransactionKeyValueStore>,
4236 filter: Option<TransactionFilter>,
4237 cursor: Option<TransactionDigest>,
4239 limit: Option<usize>,
4240 reverse: bool,
4241 ) -> IotaResult<Vec<TransactionDigest>> {
4242 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4243 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4244 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4245 if reverse {
4246 let iter = iter
4247 .rev()
4248 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4249 .skip(usize::from(cursor.is_some()));
4250 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4251 } else {
4252 let iter = iter
4253 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4254 .skip(usize::from(cursor.is_some()));
4255 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4256 }
4257 }
4258 self.get_indexes()?
4259 .get_transactions(filter, cursor, limit, reverse)
4260 }
4261
4262 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4263 &self.checkpoint_store
4264 }
4265
4266 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4267 self.get_checkpoint_store()
4268 .get_highest_executed_checkpoint_seq_number()?
4269 .ok_or(IotaError::UserInput {
4270 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4271 })
4272 }
4273
4274 #[cfg(msim)]
4275 pub fn get_highest_pruned_checkpoint_for_testing(
4276 &self,
4277 ) -> IotaResult<CheckpointSequenceNumber> {
4278 self.database_for_testing()
4279 .perpetual_tables
4280 .get_highest_pruned_checkpoint()
4281 .map(|c| c.unwrap_or(0))
4282 .map_err(Into::into)
4283 }
4284
4285 #[instrument(level = "trace", skip_all)]
4286 pub fn get_checkpoint_summary_by_sequence_number(
4287 &self,
4288 sequence_number: CheckpointSequenceNumber,
4289 ) -> IotaResult<CheckpointSummary> {
4290 let verified_checkpoint = self
4291 .get_checkpoint_store()
4292 .get_checkpoint_by_sequence_number(sequence_number)?;
4293 match verified_checkpoint {
4294 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4295 None => Err(IotaError::UserInput {
4296 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4297 }),
4298 }
4299 }
4300
4301 #[instrument(level = "trace", skip_all)]
4302 pub fn get_checkpoint_summary_by_digest(
4303 &self,
4304 digest: CheckpointDigest,
4305 ) -> IotaResult<CheckpointSummary> {
4306 let verified_checkpoint = self
4307 .get_checkpoint_store()
4308 .get_checkpoint_by_digest(&digest)?;
4309 match verified_checkpoint {
4310 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4311 None => Err(IotaError::UserInput {
4312 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4313 }),
4314 }
4315 }
4316
4317 #[instrument(level = "trace", skip_all)]
4318 pub fn find_publish_txn_digest(&self, package_id: ObjectId) -> IotaResult<TransactionDigest> {
4319 if package_id.is_system_package() {
4320 return self.find_genesis_txn_digest();
4321 }
4322 Ok(self
4323 .get_object_read(&package_id)?
4324 .into_object()?
4325 .previous_transaction)
4326 }
4327
4328 #[instrument(level = "trace", skip_all)]
4329 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4330 let summary = self
4331 .get_verified_checkpoint_by_sequence_number(0)?
4332 .into_message();
4333 let content = self.get_checkpoint_contents(summary.content_digest)?;
4334 let genesis_transaction = content.enumerate_transactions(&summary).next();
4335 Ok(genesis_transaction
4336 .ok_or(IotaError::UserInput {
4337 error: UserInputError::GenesisTransactionNotFound,
4338 })?
4339 .1
4340 .transaction)
4341 }
4342
4343 #[instrument(level = "trace", skip_all)]
4344 pub fn get_verified_checkpoint_by_sequence_number(
4345 &self,
4346 sequence_number: CheckpointSequenceNumber,
4347 ) -> IotaResult<VerifiedCheckpoint> {
4348 let verified_checkpoint = self
4349 .get_checkpoint_store()
4350 .get_checkpoint_by_sequence_number(sequence_number)?;
4351 match verified_checkpoint {
4352 Some(verified_checkpoint) => Ok(verified_checkpoint),
4353 None => Err(IotaError::UserInput {
4354 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4355 }),
4356 }
4357 }
4358
4359 #[instrument(level = "trace", skip_all)]
4360 pub fn get_verified_checkpoint_summary_by_digest(
4361 &self,
4362 digest: CheckpointDigest,
4363 ) -> IotaResult<VerifiedCheckpoint> {
4364 let verified_checkpoint = self
4365 .get_checkpoint_store()
4366 .get_checkpoint_by_digest(&digest)?;
4367 match verified_checkpoint {
4368 Some(verified_checkpoint) => Ok(verified_checkpoint),
4369 None => Err(IotaError::UserInput {
4370 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4371 }),
4372 }
4373 }
4374
4375 #[instrument(level = "trace", skip_all)]
4376 pub fn get_checkpoint_contents(
4377 &self,
4378 digest: CheckpointContentsDigest,
4379 ) -> IotaResult<CheckpointContents> {
4380 self.get_checkpoint_store()
4381 .get_checkpoint_contents(&digest)?
4382 .ok_or(IotaError::UserInput {
4383 error: UserInputError::CheckpointContentsNotFound(digest),
4384 })
4385 }
4386
4387 #[instrument(level = "trace", skip_all)]
4388 pub fn get_checkpoint_contents_by_sequence_number(
4389 &self,
4390 sequence_number: CheckpointSequenceNumber,
4391 ) -> IotaResult<CheckpointContents> {
4392 let verified_checkpoint = self
4393 .get_checkpoint_store()
4394 .get_checkpoint_by_sequence_number(sequence_number)?;
4395 match verified_checkpoint {
4396 Some(verified_checkpoint) => {
4397 let content_digest = verified_checkpoint.into_inner().content_digest;
4398 self.get_checkpoint_contents(content_digest)
4399 }
4400 None => Err(IotaError::UserInput {
4401 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4402 }),
4403 }
4404 }
4405
4406 #[instrument(level = "trace", skip_all)]
4407 pub async fn query_events(
4408 &self,
4409 kv_store: &Arc<TransactionKeyValueStore>,
4410 query: EventFilter,
4411 cursor: Option<EventID>,
4413 limit: usize,
4414 descending: bool,
4415 ) -> IotaResult<Vec<IotaEvent>> {
4416 let index_store = self.get_indexes()?;
4417
4418 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4420 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4421 IotaError::TransactionNotFound {
4422 digest: cursor.tx_digest,
4423 },
4424 )?;
4425 (tx_seq, cursor.event_seq as usize)
4426 } else if descending {
4427 (u64::MAX, usize::MAX)
4428 } else {
4429 (0, 0)
4430 };
4431
4432 let limit = limit + 1;
4433 let mut event_keys = match query {
4434 EventFilter::All(filters) => {
4435 if filters.is_empty() {
4436 index_store.all_events(tx_num, event_num, limit, descending)?
4437 } else {
4438 return Err(IotaError::UserInput {
4439 error: UserInputError::Unsupported(
4440 "This query type does not currently support filter combinations"
4441 .to_string(),
4442 ),
4443 });
4444 }
4445 }
4446 EventFilter::Transaction(digest) => {
4447 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4448 }
4449 EventFilter::MoveModule { package, module } => {
4450 let module_id = ModuleId::new(
4451 AccountAddress::new(package.into_bytes()),
4452 move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4453 );
4454 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4455 }
4456 EventFilter::MoveEventType(struct_name) => index_store
4457 .events_by_move_event_struct_name(
4458 &struct_name,
4459 tx_num,
4460 event_num,
4461 limit,
4462 descending,
4463 )?,
4464 EventFilter::Sender(sender) => {
4465 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4466 }
4467 EventFilter::TimeRange {
4468 start_time,
4469 end_time,
4470 } => index_store
4471 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4472 EventFilter::MoveEventModule { package, module } => index_store
4473 .events_by_move_event_module(
4474 &ModuleId::new(
4475 AccountAddress::new(package.into_bytes()),
4476 move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4477 ),
4478 tx_num,
4479 event_num,
4480 limit,
4481 descending,
4482 )?,
4483 EventFilter::Package(_)
4485 | EventFilter::MoveEventField { .. }
4486 | EventFilter::Any(_)
4487 | EventFilter::And(_, _)
4488 | EventFilter::Or(_, _) => {
4489 return Err(IotaError::UserInput {
4490 error: UserInputError::Unsupported(
4491 "This query type is not supported by the full node.".to_string(),
4492 ),
4493 });
4494 }
4495 };
4496
4497 if cursor.is_some() {
4500 if !event_keys.is_empty() {
4501 event_keys.remove(0);
4502 }
4503 } else {
4504 event_keys.truncate(limit - 1);
4505 }
4506
4507 let transaction_digests = event_keys
4509 .iter()
4510 .map(|(_, digest, _, _)| *digest)
4511 .collect::<HashSet<_>>()
4512 .into_iter()
4513 .collect::<Vec<_>>();
4514
4515 let events = kv_store
4516 .multi_get_events_by_tx_digests(&transaction_digests)
4517 .await?;
4518
4519 let events_map: HashMap<_, _> =
4520 transaction_digests.iter().zip(events.into_iter()).collect();
4521
4522 let stored_events = event_keys
4523 .into_iter()
4524 .map(|k| {
4525 (
4526 k,
4527 events_map
4528 .get(&k.1)
4529 .expect("fetched digest is missing")
4530 .clone()
4531 .and_then(|e| e.get(k.2).cloned()),
4532 )
4533 })
4534 .map(
4535 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4536 event
4537 .map(|e| (e, tx_digest, event_seq, timestamp))
4538 .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4539 },
4540 )
4541 .collect::<Result<Vec<_>, _>>()?;
4542
4543 let epoch_store = self.load_epoch_store_one_call_per_task();
4544 let backing_store = self.get_backing_package_store().as_ref();
4545 let mut layout_resolver = epoch_store
4546 .executor()
4547 .type_layout_resolver(Box::new(backing_store));
4548 let mut events = vec![];
4549 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4550 events.push(IotaEvent::try_from(
4551 e.clone(),
4552 tx_digest,
4553 event_seq as u64,
4554 Some(timestamp),
4555 layout_resolver.get_annotated_layout(&e.type_)?,
4556 )?)
4557 }
4558 Ok(events)
4559 }
4560
4561 pub async fn insert_genesis_object(&self, object: Object) {
4562 self.get_reconfig_api()
4563 .try_insert_genesis_object(object)
4564 .expect("Cannot insert genesis object")
4565 }
4566
4567 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4568 futures::future::join_all(
4569 objects
4570 .iter()
4571 .map(|o| self.insert_genesis_object(o.clone())),
4572 )
4573 .await;
4574 }
4575
4576 #[instrument(level = "trace", skip_all)]
4578 pub fn get_transaction_status(
4579 &self,
4580 transaction_digest: &TransactionDigest,
4581 epoch_store: &Arc<AuthorityPerEpochStore>,
4582 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4583 if let Some(effects) =
4585 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4586 {
4587 if let Some(transaction) = self
4588 .get_transaction_cache_reader()
4589 .try_get_transaction_block(transaction_digest)?
4590 {
4591 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4592 let events = if effects.events_digest().is_some() {
4593 self.get_transaction_events(effects.transaction_digest())?
4594 } else {
4595 TransactionEvents::default()
4596 };
4597 return Ok(Some((
4598 (*transaction).clone().into_message(),
4599 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4600 )));
4601 } else {
4602 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4607 }
4608 }
4609 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4610 self.metrics.tx_already_processed.inc();
4611 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4612 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4613 } else {
4614 Ok(None)
4615 }
4616 }
4617
4618 #[instrument(level = "trace", skip_all)]
4622 pub fn get_signed_effects_and_maybe_resign(
4623 &self,
4624 transaction_digest: &TransactionDigest,
4625 epoch_store: &Arc<AuthorityPerEpochStore>,
4626 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4627 let effects = self
4628 .get_transaction_cache_reader()
4629 .try_get_executed_effects(transaction_digest)?;
4630 match effects {
4631 Some(effects) => {
4632 if effects.epoch() != epoch_store.epoch() {
4655 debug!(
4656 tx_digest=?transaction_digest,
4657 effects_epoch=?effects.epoch(),
4658 epoch=?epoch_store.epoch(),
4659 "Re-signing the effects with the current epoch"
4660 );
4661 }
4662 Ok(Some(self.sign_effects(effects, epoch_store)?))
4663 }
4664 None => Ok(None),
4665 }
4666 }
4667
4668 #[instrument(level = "trace", skip_all)]
4669 pub(crate) fn sign_effects(
4670 &self,
4671 effects: TransactionEffects,
4672 epoch_store: &Arc<AuthorityPerEpochStore>,
4673 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4674 let tx_digest = *effects.transaction_digest();
4675 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4676 Some(sig) => {
4677 debug_assert!(sig.epoch == epoch_store.epoch());
4678 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4679 }
4680 _ => {
4681 let sig = AuthoritySignInfo::new(
4682 epoch_store.epoch(),
4683 &effects,
4684 Intent::iota_app(IntentScope::TransactionEffects),
4685 self.name,
4686 &*self.secret,
4687 );
4688
4689 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4690
4691 epoch_store.insert_effects_digest_and_signature(
4692 &tx_digest,
4693 effects.digest(),
4694 &sig,
4695 )?;
4696
4697 effects
4698 }
4699 };
4700
4701 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4702 signed_effects,
4703 ))
4704 }
4705
4706 #[instrument(level = "trace", skip_all)]
4708 fn fullnode_only_get_tx_coins_for_indexing(
4709 &self,
4710 effects: &TransactionEffects,
4711 inner_temporary_store: &InnerTemporaryStore,
4712 epoch_store: &Arc<AuthorityPerEpochStore>,
4713 ) -> Option<TxCoins> {
4714 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4715 return None;
4716 }
4717 let written_coin_objects = inner_temporary_store
4718 .written
4719 .iter()
4720 .filter_map(|(k, v)| {
4721 if v.is_coin() {
4722 Some((*k, v.clone()))
4723 } else {
4724 None
4725 }
4726 })
4727 .collect();
4728 let mut input_coin_objects = inner_temporary_store
4729 .input_objects
4730 .iter()
4731 .filter_map(|(k, v)| {
4732 if v.is_coin() {
4733 Some((*k, v.clone()))
4734 } else {
4735 None
4736 }
4737 })
4738 .collect::<ObjectMap>();
4739
4740 for (object_id, version) in effects.modified_at_versions() {
4745 if inner_temporary_store
4746 .loaded_runtime_objects
4747 .contains_key(&object_id)
4748 {
4749 if let Some(object) = self
4750 .get_object_store()
4751 .get_object_by_key(&object_id, version)
4752 {
4753 if object.is_coin() {
4754 input_coin_objects.insert(object_id, object);
4755 }
4756 }
4757 }
4758 }
4759
4760 Some((input_coin_objects, written_coin_objects))
4761 }
4762
4763 #[instrument(level = "trace", skip_all)]
4775 pub async fn get_transaction_lock(
4776 &self,
4777 object_ref: &ObjectRef,
4778 epoch_store: &AuthorityPerEpochStore,
4779 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4780 let lock_info = self
4781 .get_object_cache_reader()
4782 .try_get_lock(*object_ref, epoch_store)?;
4783 let lock_info = match lock_info {
4784 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4785 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4786 provided_obj_ref: *object_ref,
4787 current_version: locked_ref.version,
4788 }
4789 .into());
4790 }
4791 ObjectLockStatus::Initialized => {
4792 return Ok(None);
4793 }
4794 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4795 };
4796
4797 epoch_store.get_signed_transaction(&lock_info)
4798 }
4799
4800 pub async fn try_get_objects(&self, objects: &[ObjectId]) -> IotaResult<Vec<Option<Object>>> {
4801 self.get_object_cache_reader().try_get_objects(objects)
4802 }
4803
4804 pub async fn get_objects(&self, objects: &[ObjectId]) -> Vec<Option<Object>> {
4806 self.try_get_objects(objects)
4807 .await
4808 .expect("storage access failed")
4809 }
4810
4811 pub async fn try_get_object_or_tombstone(
4812 &self,
4813 object_id: ObjectId,
4814 ) -> IotaResult<Option<ObjectRef>> {
4815 self.get_object_cache_reader()
4816 .try_get_latest_object_ref_or_tombstone(object_id)
4817 }
4818
4819 pub async fn get_object_or_tombstone(&self, object_id: ObjectId) -> Option<ObjectRef> {
4821 self.try_get_object_or_tombstone(object_id)
4822 .await
4823 .expect("storage access failed")
4824 }
4825
4826 pub fn set_override_protocol_upgrade_buffer_stake(
4836 &self,
4837 expected_epoch: EpochId,
4838 buffer_stake_bps: u64,
4839 ) -> IotaResult {
4840 let epoch_store = self.load_epoch_store_one_call_per_task();
4841 let actual_epoch = epoch_store.epoch();
4842 if actual_epoch != expected_epoch {
4843 return Err(IotaError::WrongEpoch {
4844 expected_epoch,
4845 actual_epoch,
4846 });
4847 }
4848
4849 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4850 }
4851
4852 pub fn clear_override_protocol_upgrade_buffer_stake(
4853 &self,
4854 expected_epoch: EpochId,
4855 ) -> IotaResult {
4856 let epoch_store = self.load_epoch_store_one_call_per_task();
4857 let actual_epoch = epoch_store.epoch();
4858 if actual_epoch != expected_epoch {
4859 return Err(IotaError::WrongEpoch {
4860 expected_epoch,
4861 actual_epoch,
4862 });
4863 }
4864
4865 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4866 }
4867
4868 pub async fn get_available_system_packages(
4872 &self,
4873 binary_config: &BinaryConfig,
4874 ) -> Vec<ObjectRef> {
4875 let mut results = vec![];
4876
4877 let system_packages = BuiltInFramework::iter_system_packages();
4878
4879 #[cfg(msim)]
4881 let extra_packages = framework_injection::get_extra_packages(self.name);
4882 #[cfg(msim)]
4883 let system_packages = {
4884 let mut packages: Vec<_> = system_packages.collect();
4885 packages.extend(extra_packages.iter());
4886 packages
4887 };
4888
4889 for system_package in system_packages {
4890 let modules = system_package.modules().to_vec();
4891 #[cfg(msim)]
4893 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4894 .unwrap_or(modules);
4895
4896 let Some(obj_ref) = iota_framework::compare_system_package(
4897 &self.get_object_store(),
4898 &system_package.id,
4899 &modules,
4900 system_package.dependencies.to_vec(),
4901 binary_config,
4902 )
4903 .await
4904 else {
4905 return vec![];
4906 };
4907 results.push(obj_ref);
4908 }
4909
4910 results
4911 }
4912
4913 async fn get_system_package_bytes(
4930 &self,
4931 system_packages: Vec<ObjectRef>,
4932 binary_config: &BinaryConfig,
4933 ) -> Option<Vec<SystemPackage>> {
4934 let ids: Vec<_> = system_packages
4935 .iter()
4936 .map(|object_ref| object_ref.object_id)
4937 .collect();
4938 let objects = self.get_objects(&ids).await;
4939
4940 let mut res = Vec::with_capacity(system_packages.len());
4941 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4942 let prev_transaction = match object {
4943 Some(cur_object) if cur_object.object_ref() == system_package_ref => {
4944 info!(
4946 "Framework {} does not need updating",
4947 system_package_ref.object_id
4948 );
4949 continue;
4950 }
4951
4952 Some(cur_object) => cur_object.previous_transaction,
4953 None => TransactionDigest::GENESIS_MARKER,
4954 };
4955
4956 #[cfg(msim)]
4957 let FrameworkSystemPackage {
4958 id: _,
4959 bytes,
4960 dependencies,
4961 } = framework_injection::get_override_system_package(
4962 &system_package_ref.object_id,
4963 self.name,
4964 )
4965 .unwrap_or_else(|| {
4966 BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone()
4967 });
4968
4969 #[cfg(not(msim))]
4970 let FrameworkSystemPackage {
4971 id: _,
4972 bytes,
4973 dependencies,
4974 } = BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone();
4975
4976 let modules: Vec<_> = bytes
4977 .iter()
4978 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4979 .collect();
4980
4981 let new_object = Object::new_system_package(
4982 &modules,
4983 system_package_ref.version,
4984 dependencies.clone(),
4985 prev_transaction,
4986 );
4987
4988 let new_ref = new_object.object_ref();
4989 if new_ref != system_package_ref {
4990 error!(
4991 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4992 );
4993 return None;
4994 }
4995
4996 res.push(SystemPackage {
4997 version: system_package_ref.version,
4998 modules: bytes,
4999 dependencies,
5000 });
5001 }
5002
5003 Some(res)
5004 }
5005
5006 fn is_protocol_version_supported_v1(
5010 proposed_protocol_version: ProtocolVersion,
5011 committee: &Committee,
5012 capabilities: Vec<AuthorityCapabilitiesV1>,
5013 mut buffer_stake_bps: u64,
5014 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
5015 if buffer_stake_bps > 10000 {
5016 warn!("clamping buffer_stake_bps to 10000");
5017 buffer_stake_bps = 10000;
5018 }
5019
5020 let mut desired_upgrades: Vec<_> = capabilities
5023 .into_iter()
5024 .filter_map(|mut cap| {
5025 if cap.available_system_packages.is_empty() {
5027 return None;
5028 }
5029
5030 cap.available_system_packages.sort();
5031
5032 info!(
5033 "validator {:?} supports {:?} with system packages: {:?}",
5034 cap.authority.concise(),
5035 cap.supported_protocol_versions,
5036 cap.available_system_packages,
5037 );
5038
5039 cap.supported_protocol_versions
5043 .get_version_digest(proposed_protocol_version)
5044 .map(|digest| (digest, cap.available_system_packages, cap.authority))
5045 })
5046 .collect();
5047
5048 desired_upgrades.sort();
5051 desired_upgrades
5052 .into_iter()
5053 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
5054 .into_iter()
5055 .find_map(|((digest, packages), group)| {
5056 assert!(!packages.is_empty());
5058
5059 let mut stake_aggregator: StakeAggregator<(), true> =
5060 StakeAggregator::new(Arc::new(committee.clone()));
5061
5062 for (_, _, authority) in group {
5063 stake_aggregator.insert_generic(authority, ());
5064 }
5065
5066 let total_votes = stake_aggregator.total_votes();
5067 let quorum_threshold = committee.quorum_threshold();
5068 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5069
5070 info!(
5071 protocol_config_digest = ?digest,
5072 ?total_votes,
5073 ?quorum_threshold,
5074 ?buffer_stake_bps,
5075 ?effective_threshold,
5076 ?proposed_protocol_version,
5077 ?packages,
5078 "support for upgrade"
5079 );
5080
5081 let has_support = total_votes >= effective_threshold;
5082 has_support.then_some((proposed_protocol_version, digest, packages))
5083 })
5084 }
5085
5086 fn choose_protocol_version_and_system_packages_v1(
5090 current_protocol_version: ProtocolVersion,
5091 current_protocol_digest: Digest,
5092 committee: &Committee,
5093 capabilities: Vec<AuthorityCapabilitiesV1>,
5094 buffer_stake_bps: u64,
5095 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
5096 let mut next_protocol_version = current_protocol_version;
5097 let mut system_packages = vec![];
5098 let mut protocol_version_digest = current_protocol_digest;
5099
5100 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
5104 next_protocol_version + 1,
5105 committee,
5106 capabilities.clone(),
5107 buffer_stake_bps,
5108 ) {
5109 next_protocol_version = version;
5110 protocol_version_digest = digest;
5111 system_packages = packages;
5112 }
5113
5114 (
5115 next_protocol_version,
5116 protocol_version_digest,
5117 system_packages,
5118 )
5119 }
5120
5121 fn get_validators_supporting_protocol_version(
5126 target_protocol_version: ProtocolVersion,
5127 target_digest: Digest,
5128 active_validators: &[AuthorityPublicKey],
5129 capabilities: &[AuthorityCapabilitiesV1],
5130 ) -> Vec<u64> {
5131 let mut eligible_validators = Vec::new();
5132
5133 for capability in capabilities {
5134 if let Some(digest) = capability
5136 .supported_protocol_versions
5137 .get_version_digest(target_protocol_version)
5138 {
5139 if digest == target_digest {
5140 if let Some(index) = active_validators
5142 .iter()
5143 .position(|name| AuthorityName::from(name) == capability.authority)
5144 {
5145 eligible_validators.push(index as u64);
5146 }
5147 }
5148 }
5149 }
5150
5151 eligible_validators.sort();
5153 eligible_validators
5154 }
5155
5156 fn calculate_eligible_validators_weight(
5161 eligible_validator_indices: &[u64],
5162 active_validators: &[AuthorityPublicKey],
5163 committee: &Committee,
5164 ) -> u64 {
5165 let mut total_weight = 0u64;
5166
5167 for &index in eligible_validator_indices {
5168 let authority_pubkey = &active_validators[index as usize];
5169 if let Some((_, weight)) = committee
5171 .members()
5172 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5173 {
5174 total_weight += weight;
5175 }
5176 }
5177
5178 total_weight
5179 }
5180
5181 #[instrument(level = "error", skip_all)]
5194 pub async fn create_and_execute_advance_epoch_tx(
5195 &self,
5196 epoch_store: &Arc<AuthorityPerEpochStore>,
5197 gas_cost_summary: &GasCostSummary,
5198 checkpoint: CheckpointSequenceNumber,
5199 epoch_start_timestamp_ms: CheckpointTimestamp,
5200 scores: Vec<u64>,
5201 ) -> anyhow::Result<(
5202 IotaSystemState,
5203 Option<SystemEpochInfoEvent>,
5204 TransactionEffects,
5205 )> {
5206 let mut txns = Vec::new();
5207
5208 let next_epoch = epoch_store.epoch() + 1;
5209
5210 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5211 let authority_capabilities = epoch_store
5212 .get_capabilities_v1()
5213 .expect("read capabilities from db cannot fail");
5214 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5215 Self::choose_protocol_version_and_system_packages_v1(
5216 epoch_store.protocol_version(),
5217 SupportedProtocolVersionsWithHashes::protocol_config_digest(
5218 epoch_store.protocol_config(),
5219 ),
5220 epoch_store.committee(),
5221 authority_capabilities.clone(),
5222 buffer_stake_bps,
5223 );
5224
5225 let config = epoch_store.protocol_config();
5229 let binary_config = to_binary_config(config);
5230 let Some(next_epoch_system_package_bytes) = self
5231 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5232 .await
5233 else {
5234 error!(
5235 "upgraded system packages {:?} are not locally available, cannot create \
5236 ChangeEpochTx. validator binary must be upgraded to the correct version!",
5237 next_epoch_system_packages
5238 );
5239 bail!("missing system packages: cannot form ChangeEpochTx");
5249 };
5250
5251 if config.select_committee_from_eligible_validators() {
5254 let active_validators = epoch_store.epoch_start_state().get_active_validators();
5256
5257 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5258
5259 if config.select_committee_supporting_next_epoch_version() {
5263 eligible_active_validators = Self::get_validators_supporting_protocol_version(
5264 next_epoch_protocol_version,
5265 next_epoch_protocol_digest,
5266 &active_validators,
5267 &authority_capabilities,
5268 );
5269
5270 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5272 &eligible_active_validators,
5273 &active_validators,
5274 epoch_store.committee(),
5275 );
5276
5277 let committee = epoch_store.committee();
5281 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5282
5283 if eligible_validators_weight < effective_threshold {
5284 error!(
5285 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5286 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5287 );
5288 eligible_active_validators = (0..active_validators.len() as u64).collect();
5291 }
5292 }
5293
5294 if config.pass_validator_scores_to_advance_epoch() {
5297 txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5298 next_epoch,
5299 next_epoch_protocol_version.as_u64(),
5300 gas_cost_summary.storage_cost,
5301 gas_cost_summary.computation_cost,
5302 gas_cost_summary.computation_cost_burned,
5303 gas_cost_summary.storage_rebate,
5304 gas_cost_summary.non_refundable_storage_fee,
5305 epoch_start_timestamp_ms,
5306 next_epoch_system_package_bytes,
5307 eligible_active_validators,
5308 scores,
5309 config.adjust_rewards_by_score(),
5310 ));
5311 } else {
5312 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5313 next_epoch,
5314 next_epoch_protocol_version.as_u64(),
5315 gas_cost_summary.storage_cost,
5316 gas_cost_summary.computation_cost,
5317 gas_cost_summary.computation_cost_burned,
5318 gas_cost_summary.storage_rebate,
5319 gas_cost_summary.non_refundable_storage_fee,
5320 epoch_start_timestamp_ms,
5321 next_epoch_system_package_bytes,
5322 eligible_active_validators,
5323 ));
5324 }
5325 } else if config.protocol_defined_base_fee()
5326 && config.max_committee_members_count_as_option().is_some()
5327 {
5328 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5329 next_epoch,
5330 next_epoch_protocol_version.as_u64(),
5331 gas_cost_summary.storage_cost,
5332 gas_cost_summary.computation_cost,
5333 gas_cost_summary.computation_cost_burned,
5334 gas_cost_summary.storage_rebate,
5335 gas_cost_summary.non_refundable_storage_fee,
5336 epoch_start_timestamp_ms,
5337 next_epoch_system_package_bytes,
5338 ));
5339 } else {
5340 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5341 next_epoch,
5342 next_epoch_protocol_version.as_u64(),
5343 gas_cost_summary.storage_cost,
5344 gas_cost_summary.computation_cost,
5345 gas_cost_summary.storage_rebate,
5346 gas_cost_summary.non_refundable_storage_fee,
5347 epoch_start_timestamp_ms,
5348 next_epoch_system_package_bytes,
5349 ));
5350 }
5351
5352 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5353
5354 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5355 tx.clone(),
5356 epoch_store.epoch(),
5357 checkpoint,
5358 );
5359
5360 let tx_digest = executable_tx.digest();
5361
5362 info!(
5363 ?next_epoch,
5364 ?next_epoch_protocol_version,
5365 ?next_epoch_system_packages,
5366 computation_cost=?gas_cost_summary.computation_cost,
5367 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5368 storage_cost=?gas_cost_summary.storage_cost,
5369 storage_rebate=?gas_cost_summary.storage_rebate,
5370 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5371 ?tx_digest,
5372 "Creating advance epoch transaction"
5373 );
5374
5375 fail_point_async!("change_epoch_tx_delay");
5376 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5377
5378 if self
5382 .get_transaction_cache_reader()
5383 .try_is_tx_already_executed(tx_digest)?
5384 {
5385 warn!("change epoch tx has already been executed via state sync");
5386 bail!("change epoch tx has already been executed via state sync",);
5387 }
5388
5389 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5390
5391 epoch_store.assign_shared_object_versions_idempotent(
5395 self.get_object_cache_reader().as_ref(),
5396 std::slice::from_ref(&executable_tx),
5397 )?;
5398
5399 let (input_objects, _) =
5400 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5401
5402 let (temporary_store, effects, _execution_error_opt) = self.execute_transaction(
5403 &execution_guard,
5404 &executable_tx,
5405 input_objects,
5406 vec![],
5407 epoch_store,
5408 )?;
5409 let system_obj = get_iota_system_state(&temporary_store.written)
5410 .expect("change epoch tx must write to system object");
5411 let system_epoch_info_event = temporary_store
5413 .events
5414 .0
5415 .into_iter()
5416 .find(|event| event.is_system_epoch_info_event())
5417 .map(SystemEpochInfoEvent::from);
5418 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5421
5422 self.get_state_sync_store()
5426 .try_insert_transaction_and_effects(&tx, &effects)
5427 .map_err(|err| {
5428 let err: anyhow::Error = err.into();
5429 err
5430 })?;
5431
5432 info!(
5433 "Effects summary of the change epoch transaction: {:?}",
5434 effects.summary_for_debug()
5435 );
5436 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5437 assert!(effects.status().is_success());
5439 Ok((system_obj, system_epoch_info_event, effects))
5440 }
5441
5442 #[instrument(level = "error", skip_all)]
5446 async fn revert_uncommitted_epoch_transactions(
5447 &self,
5448 epoch_store: &AuthorityPerEpochStore,
5449 ) -> IotaResult {
5450 {
5451 let state = epoch_store.get_reconfig_state_write_lock_guard();
5452 if state.should_accept_user_certs() {
5453 epoch_store.close_user_certs(state);
5462 }
5463 }
5465
5466 if !epoch_store.protocol_config().enable_pcool_flow() {
5469 let pending_certificates = epoch_store.pending_consensus_certificates();
5470 info!(
5471 "Reverting {} locally executed transactions that was not included in the epoch: \
5472 {:?}",
5473 pending_certificates.len(),
5474 pending_certificates,
5475 );
5476 for digest in pending_certificates {
5477 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5478 info!(
5479 "Not reverting pending consensus transaction {:?} - it was included in \
5480 checkpoint",
5481 digest
5482 );
5483 continue;
5484 }
5485 info!("Reverting {:?} at the end of epoch", digest);
5486 epoch_store.revert_executed_transaction(&digest)?;
5487 self.get_reconfig_api().try_revert_state_update(&digest)?;
5488 }
5489 info!("All uncommitted local transactions reverted");
5490 } else {
5491 info!("P-COOL mode: skipping revert of uncommitted epoch transactions");
5492 }
5493
5494 Ok(())
5495 }
5496
5497 #[instrument(level = "error", skip_all)]
5498 async fn reopen_epoch_db(
5499 &self,
5500 cur_epoch_store: &AuthorityPerEpochStore,
5501 new_committee: Committee,
5502 epoch_start_configuration: EpochStartConfiguration,
5503 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5504 epoch_last_checkpoint: CheckpointSequenceNumber,
5505 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5506 let new_epoch = new_committee.epoch;
5507 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5508 assert_eq!(
5509 epoch_start_configuration.epoch_start_state().epoch(),
5510 new_committee.epoch
5511 );
5512 fail_point!("before-open-new-epoch-store");
5513 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5514 self.name,
5515 new_committee,
5516 epoch_start_configuration,
5517 self.get_backing_package_store().clone(),
5518 expensive_safety_check_config,
5519 epoch_last_checkpoint,
5520 )?;
5521 self.epoch_store.store(new_epoch_store.clone());
5522 Ok(new_epoch_store)
5523 }
5524
5525 fn check_move_account(
5528 &self,
5529 auth_account_object_id: ObjectId,
5530 auth_account_object_seq_number: Option<SequenceNumber>,
5531 auth_account_object_digest: Option<ObjectDigest>,
5532 account_object: ObjectReadResult,
5533 signer: &Address,
5534 ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5535 let account_object = match account_object.object {
5536 ObjectReadResultKind::Object(object) => Ok(object),
5537 ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5538 Err(UserInputError::AccountObjectDeleted {
5539 account_id: account_object.id(),
5540 account_version: version,
5541 transaction_digest: digest,
5542 })
5543 }
5544 ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5547 Err(UserInputError::AccountObjectInCanceledTransaction {
5548 account_id: account_object.id(),
5549 account_version: version,
5550 })
5551 }
5552 }?;
5553
5554 let account_object_addr = Address::from(auth_account_object_id);
5555
5556 fp_ensure!(
5557 signer == &account_object_addr,
5558 UserInputError::IncorrectUserSignature {
5559 error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5560 }
5561 .into()
5562 );
5563
5564 fp_ensure!(
5565 account_object.is_shared() || account_object.is_immutable(),
5566 UserInputError::AccountObjectNotSupported {
5567 object_id: auth_account_object_id
5568 }
5569 .into()
5570 );
5571
5572 let auth_account_object_seq_number =
5573 if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5574 let account_object_version = account_object.version();
5575
5576 fp_ensure!(
5577 account_object_version == auth_account_object_seq_number,
5578 UserInputError::AccountObjectVersionMismatch {
5579 object_id: auth_account_object_id,
5580 expected_version: auth_account_object_seq_number,
5581 actual_version: account_object_version,
5582 }
5583 .into()
5584 );
5585
5586 auth_account_object_seq_number
5587 } else {
5588 account_object.version()
5589 };
5590
5591 if let Some(auth_account_object_digest) = auth_account_object_digest {
5592 let expected_digest = account_object.digest();
5593 fp_ensure!(
5594 expected_digest == auth_account_object_digest,
5595 UserInputError::InvalidAccountObjectDigest {
5596 object_id: auth_account_object_id,
5597 expected_digest,
5598 actual_digest: auth_account_object_digest,
5599 }
5600 .into()
5601 );
5602 }
5603
5604 let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5605 auth_account_object_id,
5606 &AuthenticatorFunctionRefV1Key::tag().into(),
5607 &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5608 )
5609 .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5610 account_object_id: auth_account_object_id,
5611 })?;
5612
5613 let authenticator_function_ref_field = self
5614 .get_object_cache_reader()
5615 .try_find_object_lt_or_eq_version(
5616 authenticator_function_ref_field_id,
5617 auth_account_object_seq_number,
5618 )?;
5619
5620 if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5621 let field_move_object = authenticator_function_ref_field_obj
5622 .data
5623 .as_struct_opt()
5624 .expect("dynamic field should never be a package object");
5625
5626 let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5627 field_move_object.to_rust().map_err(|_| {
5628 UserInputError::InvalidAuthenticatorFunctionRefField {
5629 account_object_id: auth_account_object_id,
5630 }
5631 })?;
5632
5633 Ok(AuthenticatorFunctionRefForExecution::new_v1(
5634 field.value,
5635 authenticator_function_ref_field_obj.object_ref(),
5636 authenticator_function_ref_field_obj.owner,
5637 authenticator_function_ref_field_obj.storage_rebate,
5638 authenticator_function_ref_field_obj.previous_transaction,
5639 ))
5640 } else {
5641 Err(UserInputError::MoveAuthenticatorNotFound {
5642 authenticator_function_ref_id: authenticator_function_ref_field_id,
5643 account_object_id: auth_account_object_id,
5644 account_object_version: auth_account_object_seq_number,
5645 }
5646 .into())
5647 }
5648 }
5649
5650 #[allow(clippy::type_complexity)]
5651 fn read_objects_for_validation(
5652 &self,
5653 transaction: &VerifiedTransaction,
5654 epoch: u64,
5655 ) -> IotaResult<(
5656 InputObjects,
5657 ReceivingObjects,
5658 Vec<(InputObjects, ObjectReadResult)>,
5659 )> {
5660 let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5661 Some(transaction.digest()),
5662 &transaction.collect_all_input_object_kind_for_reading()?,
5663 &transaction.data().transaction_data().receiving_objects(),
5664 epoch,
5665 )?;
5666
5667 transaction
5668 .split_input_objects_into_groups_for_reading(input_objects)
5669 .map(|(tx_input_objects, per_authenticator_inputs)| {
5670 (
5671 tx_input_objects,
5672 tx_receiving_objects,
5673 per_authenticator_inputs,
5674 )
5675 })
5676 }
5677
5678 #[allow(clippy::type_complexity)]
5679 fn check_transaction_inputs_for_validation(
5680 &self,
5681 protocol_config: &ProtocolConfig,
5682 reference_gas_price: u64,
5683 tx_data: &TransactionData,
5684 tx_input_objects: InputObjects,
5685 tx_receiving_objects: &ReceivingObjects,
5686 move_authenticators: &Vec<&MoveAuthenticator>,
5687 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5688 ) -> IotaResult<(
5689 IotaGasStatus,
5690 CheckedInputObjects,
5691 Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5692 )> {
5693 let authenticator_gas_budget = if move_authenticators.is_empty() {
5694 0
5695 } else {
5696 protocol_config.max_auth_gas()
5699 };
5700
5701 debug_assert_eq!(
5702 move_authenticators.len(),
5703 per_authenticator_inputs.len(),
5704 "Move authenticators amount must match the number of authenticator inputs"
5705 );
5706
5707 let per_authenticator_checked_inputs = move_authenticators
5708 .iter()
5709 .zip(per_authenticator_inputs)
5710 .map(
5711 |(move_authenticator, (authenticator_input_objects, account_object))| {
5712 let (
5714 auth_account_object_id,
5715 auth_account_object_seq_number,
5716 auth_account_object_digest,
5717 ) = move_authenticator.object_to_authenticate_components()?;
5718
5719 let signer = move_authenticator.address()?;
5720
5721 let AuthenticatorFunctionRefForExecution {
5723 authenticator_function_ref,
5724 ..
5725 } = self.check_move_account(
5726 auth_account_object_id,
5727 auth_account_object_seq_number,
5728 auth_account_object_digest,
5729 account_object,
5730 &signer,
5731 )?;
5732
5733 let authenticator_checked_input_objects =
5735 iota_transaction_checks::check_move_authenticator_input_for_validation(
5736 authenticator_input_objects,
5737 )?;
5738
5739 Ok((
5740 authenticator_checked_input_objects,
5741 authenticator_function_ref,
5742 ))
5743 },
5744 )
5745 .collect::<IotaResult<Vec<_>>>()?;
5746
5747 let (gas_status, tx_checked_input_objects) =
5749 iota_transaction_checks::check_transaction_input(
5750 protocol_config,
5751 reference_gas_price,
5752 tx_data,
5753 tx_input_objects,
5754 tx_receiving_objects,
5755 &self.metrics.bytecode_verifier_metrics,
5756 &self.config.verifier_signing_config,
5757 authenticator_gas_budget,
5758 )?;
5759
5760 Ok((
5761 gas_status,
5762 tx_checked_input_objects,
5763 per_authenticator_checked_inputs,
5764 ))
5765 }
5766
5767 #[cfg(test)]
5768 pub(crate) fn iter_live_object_set_for_testing(
5769 &self,
5770 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5771 self.get_global_state_hash_store()
5772 .iter_cached_live_object_set_for_testing()
5773 }
5774
5775 #[cfg(test)]
5776 pub(crate) fn shutdown_execution_for_test(&self) {
5777 self.tx_execution_shutdown
5778 .lock()
5779 .take()
5780 .unwrap()
5781 .send(())
5782 .unwrap();
5783 }
5784
5785 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5788 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5789 self.get_object_cache_reader()
5790 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5791 self.get_reconfig_api()
5792 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5793 }
5794}
5795
5796pub struct RandomnessRoundReceiver {
5797 authority_state: Arc<AuthorityState>,
5798 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5799}
5800
5801impl RandomnessRoundReceiver {
5802 pub fn spawn(
5803 authority_state: Arc<AuthorityState>,
5804 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5805 ) -> JoinHandle<()> {
5806 let rrr = RandomnessRoundReceiver {
5807 authority_state,
5808 randomness_rx,
5809 };
5810 spawn_monitored_task!(rrr.run())
5811 }
5812
5813 async fn run(mut self) {
5814 info!("RandomnessRoundReceiver event loop started");
5815
5816 loop {
5817 tokio::select! {
5818 maybe_recv = self.randomness_rx.recv() => {
5819 if let Some((epoch, round, bytes)) = maybe_recv {
5820 self.handle_new_randomness(epoch, round, bytes);
5821 } else {
5822 break;
5823 }
5824 },
5825 }
5826 }
5827
5828 info!("RandomnessRoundReceiver event loop ended");
5829 }
5830
5831 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5832 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5833 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5834 if epoch_store.epoch() != epoch {
5835 warn!(
5836 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5837 epoch_store.epoch()
5838 );
5839 return;
5840 }
5841 let transaction = VerifiedTransaction::new_randomness_state_update(
5842 epoch,
5843 round,
5844 bytes,
5845 epoch_store
5846 .epoch_start_config()
5847 .randomness_obj_initial_shared_version(),
5848 );
5849 debug!(
5850 "created randomness state update transaction with digest: {:?}",
5851 transaction.digest()
5852 );
5853 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5854 let digest = *transaction.digest();
5855
5856 self.authority_state
5861 .get_cache_commit()
5862 .persist_transaction(&transaction);
5863
5864 self.authority_state
5866 .transaction_manager()
5867 .enqueue(vec![transaction], &epoch_store);
5868
5869 let authority_state = self.authority_state.clone();
5870 spawn_monitored_task!(async move {
5871 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5880 let result = tokio::time::timeout(
5881 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5882 authority_state
5883 .get_transaction_cache_reader()
5884 .try_notify_read_executed_effects(&[digest]),
5885 )
5886 .await;
5887 let result = match result {
5888 Ok(result) => result,
5889 Err(_) => {
5890 if cfg!(debug_assertions) {
5891 panic!(
5893 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5894 );
5895 }
5896 warn!(
5897 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5898 );
5899 authority_state
5901 .get_transaction_cache_reader()
5902 .try_notify_read_executed_effects(&[digest])
5903 .await
5904 }
5905 };
5906
5907 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5908 let effects = effects.pop().expect("should return effects");
5909 if *effects.status() != ExecutionStatus::Success {
5910 fatal!(
5911 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5912 );
5913 }
5914 debug!(
5915 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5916 );
5917 });
5918 }
5919}
5920
5921#[async_trait]
5922impl TransactionKeyValueStoreTrait for AuthorityState {
5923 async fn multi_get(
5924 &self,
5925 transaction_keys: &[TransactionDigest],
5926 effects_keys: &[TransactionDigest],
5927 ) -> IotaResult<KVStoreTransactionData> {
5928 let txns = if !transaction_keys.is_empty() {
5929 self.get_transaction_cache_reader()
5930 .try_multi_get_transaction_blocks(transaction_keys)?
5931 .into_iter()
5932 .map(|t| t.map(|t| (*t).clone().into_inner()))
5933 .collect()
5934 } else {
5935 vec![]
5936 };
5937
5938 let fx = if !effects_keys.is_empty() {
5939 self.get_transaction_cache_reader()
5940 .try_multi_get_executed_effects(effects_keys)?
5941 } else {
5942 vec![]
5943 };
5944
5945 Ok((txns, fx))
5946 }
5947
5948 async fn multi_get_checkpoints(
5949 &self,
5950 checkpoint_summaries: &[CheckpointSequenceNumber],
5951 checkpoint_contents: &[CheckpointSequenceNumber],
5952 checkpoint_summaries_by_digest: &[CheckpointDigest],
5953 ) -> IotaResult<(
5954 Vec<Option<CertifiedCheckpointSummary>>,
5955 Vec<Option<CheckpointContents>>,
5956 Vec<Option<CertifiedCheckpointSummary>>,
5957 )> {
5958 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5960 let store = self.get_checkpoint_store();
5961 for seq in checkpoint_summaries {
5962 let checkpoint = store
5963 .get_checkpoint_by_sequence_number(*seq)?
5964 .map(|c| c.into_inner());
5965
5966 summaries.push(checkpoint);
5967 }
5968
5969 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5970 for seq in checkpoint_contents {
5971 let checkpoint = store
5972 .get_checkpoint_by_sequence_number(*seq)?
5973 .and_then(|summary| {
5974 store
5975 .get_checkpoint_contents(&summary.content_digest)
5976 .expect("db read cannot fail")
5977 });
5978 contents.push(checkpoint);
5979 }
5980
5981 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5982 for digest in checkpoint_summaries_by_digest {
5983 let checkpoint = store
5984 .get_checkpoint_by_digest(digest)?
5985 .map(|c| c.into_inner());
5986 summaries_by_digest.push(checkpoint);
5987 }
5988
5989 Ok((summaries, contents, summaries_by_digest))
5990 }
5991
5992 async fn get_transaction_perpetual_checkpoint(
5993 &self,
5994 digest: TransactionDigest,
5995 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5996 self.get_checkpoint_cache()
5997 .try_get_transaction_perpetual_checkpoint(&digest)
5998 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5999 }
6000
6001 async fn get_object(
6002 &self,
6003 object_id: ObjectId,
6004 version: VersionNumber,
6005 ) -> IotaResult<Option<Object>> {
6006 self.get_object_cache_reader()
6007 .try_get_object_by_key(&object_id, version)
6008 }
6009
6010 #[instrument(skip_all)]
6011 async fn multi_get_objects(
6012 &self,
6013 object_keys: &[ObjectKey],
6014 ) -> IotaResult<Vec<Option<Object>>> {
6015 Ok(self
6016 .get_object_cache_reader()
6017 .multi_get_objects_by_key(object_keys))
6018 }
6019
6020 async fn multi_get_transactions_perpetual_checkpoints(
6021 &self,
6022 digests: &[TransactionDigest],
6023 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
6024 let res = self
6025 .get_checkpoint_cache()
6026 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
6027
6028 Ok(res
6029 .into_iter()
6030 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
6031 .collect())
6032 }
6033
6034 #[instrument(skip(self, digests), fields(digests = digests.iter().map(|d| d.to_string()).collect::<Vec<String>>().join(", ")))]
6035 async fn multi_get_events_by_tx_digests(
6036 &self,
6037 digests: &[TransactionDigest],
6038 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
6039 if digests.is_empty() {
6040 return Ok(vec![]);
6041 }
6042
6043 Ok(self
6044 .get_transaction_cache_reader()
6045 .multi_get_events(digests))
6046 }
6047}
6048
6049#[cfg(msim)]
6050pub mod framework_injection {
6051 use std::{
6052 cell::RefCell,
6053 collections::{BTreeMap, BTreeSet},
6054 };
6055
6056 use iota_framework::{BuiltInFramework, SystemPackage};
6057 use iota_sdk_types::ObjectId;
6058 use iota_types::base_types::AuthorityName;
6059 use move_binary_format::CompiledModule;
6060
6061 type FrameworkOverrideConfig = BTreeMap<ObjectId, PackageOverrideConfig>;
6062
6063 thread_local! {
6065 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
6066 }
6067
6068 type Framework = Vec<CompiledModule>;
6069
6070 pub type PackageUpgradeCallback =
6071 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
6072
6073 enum PackageOverrideConfig {
6074 Global(Framework),
6075 PerValidator(PackageUpgradeCallback),
6076 }
6077
6078 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
6079 modules
6080 .iter()
6081 .map(|m| {
6082 let mut buf = Vec::new();
6083 m.serialize_with_version(m.version, &mut buf).unwrap();
6084 buf
6085 })
6086 .collect()
6087 }
6088
6089 pub fn set_override(package_id: ObjectId, modules: Vec<CompiledModule>) {
6090 OVERRIDE.with(|bs| {
6091 bs.borrow_mut()
6092 .insert(package_id, PackageOverrideConfig::Global(modules))
6093 });
6094 }
6095
6096 pub fn set_override_cb(package_id: ObjectId, func: PackageUpgradeCallback) {
6097 OVERRIDE.with(|bs| {
6098 bs.borrow_mut()
6099 .insert(package_id, PackageOverrideConfig::PerValidator(func))
6100 });
6101 }
6102
6103 pub fn get_override_bytes(package_id: &ObjectId, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
6104 OVERRIDE.with(|cfg| {
6105 cfg.borrow().get(package_id).and_then(|entry| match entry {
6106 PackageOverrideConfig::Global(framework) => {
6107 Some(compiled_modules_to_bytes(framework))
6108 }
6109 PackageOverrideConfig::PerValidator(func) => {
6110 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6111 }
6112 })
6113 })
6114 }
6115
6116 pub fn get_override_modules(
6117 package_id: &ObjectId,
6118 name: AuthorityName,
6119 ) -> Option<Vec<CompiledModule>> {
6120 OVERRIDE.with(|cfg| {
6121 cfg.borrow().get(package_id).and_then(|entry| match entry {
6122 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6123 PackageOverrideConfig::PerValidator(func) => func(name),
6124 })
6125 })
6126 }
6127
6128 pub fn get_override_system_package(
6129 package_id: &ObjectId,
6130 name: AuthorityName,
6131 ) -> Option<SystemPackage> {
6132 let bytes = get_override_bytes(package_id, name)?;
6133 let dependencies = if package_id.is_system_package() {
6134 BuiltInFramework::get_package_by_id(package_id)
6135 .dependencies
6136 .to_vec()
6137 } else {
6138 BuiltInFramework::all_package_ids()
6141 };
6142 Some(SystemPackage {
6143 id: *package_id,
6144 bytes,
6145 dependencies,
6146 })
6147 }
6148
6149 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6150 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6151 let extra: Vec<ObjectId> = OVERRIDE.with(|cfg| {
6152 cfg.borrow()
6153 .keys()
6154 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6155 .collect()
6156 });
6157
6158 extra
6159 .into_iter()
6160 .map(|package| SystemPackage {
6161 id: package,
6162 bytes: get_override_bytes(&package, name).unwrap(),
6163 dependencies: BuiltInFramework::all_package_ids(),
6164 })
6165 .collect()
6166 }
6167}
6168
6169#[derive(Debug, Serialize, Deserialize, Clone)]
6170pub struct ObjDumpFormat {
6171 pub id: ObjectId,
6172 pub version: VersionNumber,
6173 pub digest: ObjectDigest,
6174 pub object: Object,
6175}
6176
6177impl ObjDumpFormat {
6178 fn new(object: Object) -> Self {
6179 let oref = object.object_ref();
6180 Self {
6181 id: oref.object_id,
6182 version: oref.version,
6183 digest: oref.digest,
6184 object,
6185 }
6186 }
6187}
6188
6189#[derive(Debug, Serialize, Deserialize, Clone)]
6190pub struct NodeStateDump {
6191 pub tx_digest: TransactionDigest,
6192 pub sender_signed_data: SenderSignedData,
6193 pub executed_epoch: u64,
6194 pub reference_gas_price: u64,
6195 pub protocol_version: u64,
6196 pub epoch_start_timestamp_ms: u64,
6197 pub computed_effects: TransactionEffects,
6198 pub expected_effects_digest: TransactionEffectsDigest,
6199 pub relevant_system_packages: Vec<ObjDumpFormat>,
6200 pub shared_objects: Vec<ObjDumpFormat>,
6201 pub loaded_child_objects: Vec<ObjDumpFormat>,
6202 pub modified_at_versions: Vec<ObjDumpFormat>,
6203 pub runtime_reads: Vec<ObjDumpFormat>,
6204 pub input_objects: Vec<ObjDumpFormat>,
6205}
6206
6207impl NodeStateDump {
6208 pub fn new(
6209 tx_digest: &TransactionDigest,
6210 effects: &TransactionEffects,
6211 expected_effects_digest: TransactionEffectsDigest,
6212 object_store: &dyn ObjectStore,
6213 epoch_store: &Arc<AuthorityPerEpochStore>,
6214 inner_temporary_store: &InnerTemporaryStore,
6215 transaction: &VerifiedExecutableTransaction,
6216 ) -> IotaResult<Self> {
6217 let executed_epoch = epoch_store.epoch();
6219 let reference_gas_price = epoch_store.reference_gas_price();
6220 let epoch_start_config = epoch_store.epoch_start_config();
6221 let protocol_version = epoch_store.protocol_version().as_u64();
6222 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6223
6224 let mut relevant_system_packages = Vec::new();
6226 for sys_package_id in BuiltInFramework::all_package_ids() {
6227 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6228 relevant_system_packages.push(ObjDumpFormat::new(w))
6229 }
6230 }
6231
6232 let mut shared_objects = Vec::new();
6234 for kind in effects.input_shared_objects() {
6235 match kind {
6236 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6237 if let Some(w) =
6238 object_store.try_get_object_by_key(&obj_ref.object_id, obj_ref.version)?
6239 {
6240 shared_objects.push(ObjDumpFormat::new(w))
6241 }
6242 }
6243 InputSharedObject::ReadDeleted(..)
6244 | InputSharedObject::MutateDeleted(..)
6245 | InputSharedObject::Cancelled(..) => (), }
6248 }
6249
6250 let mut loaded_child_objects = Vec::new();
6253 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6254 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6255 loaded_child_objects.push(ObjDumpFormat::new(w))
6256 }
6257 }
6258
6259 let mut modified_at_versions = Vec::new();
6261 for (id, ver) in effects.modified_at_versions() {
6262 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6263 modified_at_versions.push(ObjDumpFormat::new(w))
6264 }
6265 }
6266
6267 let mut runtime_reads = Vec::new();
6271 for obj in inner_temporary_store
6272 .runtime_packages_loaded_from_db
6273 .values()
6274 {
6275 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6276 }
6277
6278 Ok(Self {
6281 tx_digest: *tx_digest,
6282 executed_epoch,
6283 reference_gas_price,
6284 epoch_start_timestamp_ms,
6285 protocol_version,
6286 relevant_system_packages,
6287 shared_objects,
6288 loaded_child_objects,
6289 modified_at_versions,
6290 runtime_reads,
6291 sender_signed_data: transaction.clone().into_message(),
6292 input_objects: inner_temporary_store
6293 .input_objects
6294 .values()
6295 .map(|o| ObjDumpFormat::new(o.clone()))
6296 .collect(),
6297 computed_effects: effects.clone(),
6298 expected_effects_digest,
6299 })
6300 }
6301
6302 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6303 let mut objects = Vec::new();
6304 objects.extend(self.relevant_system_packages.clone());
6305 objects.extend(self.shared_objects.clone());
6306 objects.extend(self.loaded_child_objects.clone());
6307 objects.extend(self.modified_at_versions.clone());
6308 objects.extend(self.runtime_reads.clone());
6309 objects.extend(self.input_objects.clone());
6310 objects
6311 }
6312
6313 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6314 let file_name = format!(
6315 "{}_{}_NODE_DUMP.json",
6316 self.tx_digest,
6317 AuthorityState::unixtime_now_ms()
6318 );
6319 let mut path = path.to_path_buf();
6320 path.push(&file_name);
6321 let mut file = File::create(path.clone())?;
6322 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6323 Ok(path)
6324 }
6325
6326 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6327 let file = File::open(path)?;
6328 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6329 }
6330}
6331
6332fn pre_consensus_move_authenticators<'a>(
6344 tx: &'a VerifiedTransaction,
6345 protocol_config: &ProtocolConfig,
6346) -> Vec<&'a MoveAuthenticator> {
6347 if protocol_config.pre_consensus_sponsor_only_move_authentication() {
6348 if tx.transaction_data().is_sponsored_tx() {
6349 if let Some(sponsor_move_authenticator) = tx.sponsor_move_authenticator() {
6350 vec![sponsor_move_authenticator]
6351 } else {
6352 vec![]
6353 }
6354 } else {
6355 tx.move_authenticators()
6356 }
6357 } else {
6358 tx.move_authenticators()
6359 }
6360}