1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49 key_value_store::{
50 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51 },
52 key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57 IOTA_SYSTEM_ADDRESS, TypeTag,
58 account_abstraction::{
59 account::AuthenticatorFunctionRefV1Key,
60 authenticator_function::{
61 AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62 AuthenticatorFunctionRefV1,
63 },
64 },
65 authenticator_state::get_authenticator_state,
66 base_types::*,
67 committee::{Committee, EpochId, ProtocolVersion},
68 crypto::{
69 AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70 default_hash,
71 },
72 deny_list_v1::check_coin_deny_list_v1_during_signing,
73 digests::{ChainIdentifier, Digest},
74 dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75 effects::{
76 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77 TransactionEvents, VerifiedSignedTransactionEffects,
78 },
79 error::{ExecutionError, IotaError, IotaResult, UserInputError},
80 event::{Event, EventID, SystemEpochInfoEvent},
81 executable_transaction::VerifiedExecutableTransaction,
82 execution_config_utils::to_binary_config,
83 execution_status::ExecutionStatus,
84 fp_ensure,
85 gas::{GasCostSummary, IotaGasStatus},
86 gas_coin::NANOS_PER_IOTA,
87 inner_temporary_store::{
88 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89 WrittenObjects,
90 },
91 iota_system_state::{
92 IotaSystemState, IotaSystemStateTrait,
93 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94 },
95 is_system_package,
96 layout_resolver::{LayoutResolver, into_struct_layout},
97 message_envelope::Message,
98 messages_checkpoint::{
99 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103 },
104 messages_consensus::AuthorityCapabilitiesV1,
105 messages_grpc::{
106 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108 TransactionStatus,
109 },
110 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111 move_authenticator::MoveAuthenticator,
112 object::{
113 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114 bounded_visitor::BoundedVisitor,
115 },
116 storage::{
117 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118 },
119 supported_protocol_versions::{
120 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121 },
122 transaction::*,
123 transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131 register_histogram_vec_with_registry, register_histogram_with_registry,
132 register_int_counter_vec_with_registry, register_int_counter_with_registry,
133 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139 task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152 authority::{
153 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157 authority_store_tables::AuthorityPrunerTables,
158 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159 },
160 authority_client::NetworkAuthorityClient,
161 checkpoints::CheckpointStore,
162 congestion_tracker::CongestionTracker,
163 consensus_adapter::ConsensusAdapter,
164 epoch::committee_store::CommitteeStore,
165 execution_cache::{
166 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168 TransactionCacheRead,
169 },
170 execution_driver::execution_process,
171 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
172 metrics::{LatencyObserver, RateTracker},
173 module_cache_metrics::ResolverMetrics,
174 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
175 rest_index::RestIndexStore,
176 stake_aggregator::StakeAggregator,
177 state_accumulator::{AccumulatorStore, StateAccumulator},
178 subscription_handler::SubscriptionHandler,
179 transaction_input_loader::TransactionInputLoader,
180 transaction_manager::TransactionManager,
181 transaction_outputs::TransactionOutputs,
182 validator_tx_finalizer::ValidatorTxFinalizer,
183 verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223mod authority_store_migrations;
224pub mod authority_store_pruner;
225pub mod authority_store_tables;
226pub mod authority_store_types;
227pub mod epoch_start_configuration;
228pub mod shared_object_congestion_tracker;
229pub mod shared_object_version_manager;
230pub mod suggested_gas_price_calculator;
231pub mod test_authority_builder;
232pub mod transaction_deferral;
233
234pub(crate) mod authority_store;
235pub mod backpressure;
236
237pub struct AuthorityMetrics {
239 tx_orders: IntCounter,
240 total_certs: IntCounter,
241 total_cert_attempts: IntCounter,
242 total_effects: IntCounter,
243 pub shared_obj_tx: IntCounter,
244 sponsored_tx: IntCounter,
245 tx_already_processed: IntCounter,
246 num_input_objs: Histogram,
247 num_shared_objects: Histogram,
248 batch_size: Histogram,
249
250 authority_state_handle_transaction_latency: Histogram,
251
252 execute_certificate_latency_single_writer: Histogram,
253 execute_certificate_latency_shared_object: Histogram,
254
255 internal_execution_latency: Histogram,
256 execution_load_input_objects_latency: Histogram,
257 prepare_certificate_latency: Histogram,
258 commit_certificate_latency: Histogram,
259 db_checkpoint_latency: Histogram,
260
261 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
262 pub(crate) transaction_manager_num_missing_objects: IntGauge,
263 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
264 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
265 pub(crate) transaction_manager_num_ready: IntGauge,
266 pub(crate) transaction_manager_object_cache_size: IntGauge,
267 pub(crate) transaction_manager_object_cache_hits: IntCounter,
268 pub(crate) transaction_manager_object_cache_misses: IntCounter,
269 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
270 pub(crate) transaction_manager_package_cache_size: IntGauge,
271 pub(crate) transaction_manager_package_cache_hits: IntCounter,
272 pub(crate) transaction_manager_package_cache_misses: IntCounter,
273 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
274 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
275
276 pub(crate) execution_driver_executed_transactions: IntCounter,
277 pub(crate) execution_driver_dispatch_queue: IntGauge,
278 pub(crate) execution_queueing_delay_s: Histogram,
279 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
280 pub(crate) execution_gas_latency_ratio: Histogram,
281
282 pub(crate) skipped_consensus_txns: IntCounter,
283 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
284
285 pub(crate) authority_overload_status: IntGauge,
286 pub(crate) authority_load_shedding_percentage: IntGauge,
287
288 pub(crate) transaction_overload_sources: IntCounterVec,
289
290 post_processing_total_events_emitted: IntCounter,
292 post_processing_total_tx_indexed: IntCounter,
293 post_processing_total_tx_had_event_processed: IntCounter,
294 post_processing_total_failures: IntCounter,
295
296 pub consensus_handler_processed: IntCounterVec,
298 pub consensus_handler_transaction_sizes: HistogramVec,
299 pub consensus_handler_num_low_scoring_authorities: IntGauge,
300 pub consensus_handler_scores: IntGaugeVec,
301 pub consensus_handler_deferred_transactions: IntCounter,
302 pub consensus_handler_congested_transactions: IntCounter,
303 pub consensus_handler_cancelled_transactions: IntCounter,
304 pub consensus_handler_max_object_costs: IntGaugeVec,
305 pub consensus_committed_subdags: IntCounterVec,
306 pub consensus_committed_messages: IntGaugeVec,
307 pub consensus_committed_user_transactions: IntGaugeVec,
308 pub consensus_handler_leader_round: IntGauge,
309 pub consensus_calculated_throughput: IntGauge,
310 pub consensus_calculated_throughput_profile: IntGauge,
311
312 pub limits_metrics: Arc<LimitsMetrics>,
313
314 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
316
317 pub zklogin_sig_count: IntCounter,
319 pub multisig_sig_count: IntCounter,
321
322 pub execution_queueing_latency: LatencyObserver,
325
326 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
333
334 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
337}
338
339const POSITIVE_INT_BUCKETS: &[f64] = &[
341 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
342 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
343 7000000., 10000000.,
344];
345
346const LATENCY_SEC_BUCKETS: &[f64] = &[
347 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
348 10., 20., 30., 60., 90.,
349];
350
351const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
353 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
354 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
355];
356
357const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
358 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
359 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
360];
361
362pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
366 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
367 let execute_certificate_latency = register_histogram_vec_with_registry!(
368 "authority_state_execute_certificate_latency",
369 "Latency of executing certificates, including waiting for inputs",
370 &["tx_type"],
371 LATENCY_SEC_BUCKETS.to_vec(),
372 registry,
373 )
374 .unwrap();
375
376 let execute_certificate_latency_single_writer =
377 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
378 let execute_certificate_latency_shared_object =
379 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
380
381 Self {
382 tx_orders: register_int_counter_with_registry!(
383 "total_transaction_orders",
384 "Total number of transaction orders",
385 registry,
386 )
387 .unwrap(),
388 total_certs: register_int_counter_with_registry!(
389 "total_transaction_certificates",
390 "Total number of transaction certificates handled",
391 registry,
392 )
393 .unwrap(),
394 total_cert_attempts: register_int_counter_with_registry!(
395 "total_handle_certificate_attempts",
396 "Number of calls to handle_certificate",
397 registry,
398 )
399 .unwrap(),
400 total_effects: register_int_counter_with_registry!(
402 "total_transaction_effects",
403 "Total number of transaction effects produced",
404 registry,
405 )
406 .unwrap(),
407
408 shared_obj_tx: register_int_counter_with_registry!(
409 "num_shared_obj_tx",
410 "Number of transactions involving shared objects",
411 registry,
412 )
413 .unwrap(),
414
415 sponsored_tx: register_int_counter_with_registry!(
416 "num_sponsored_tx",
417 "Number of sponsored transactions",
418 registry,
419 )
420 .unwrap(),
421
422 tx_already_processed: register_int_counter_with_registry!(
423 "num_tx_already_processed",
424 "Number of transaction orders already processed previously",
425 registry,
426 )
427 .unwrap(),
428 num_input_objs: register_histogram_with_registry!(
429 "num_input_objects",
430 "Distribution of number of input TX objects per TX",
431 POSITIVE_INT_BUCKETS.to_vec(),
432 registry,
433 )
434 .unwrap(),
435 num_shared_objects: register_histogram_with_registry!(
436 "num_shared_objects",
437 "Number of shared input objects per TX",
438 POSITIVE_INT_BUCKETS.to_vec(),
439 registry,
440 )
441 .unwrap(),
442 batch_size: register_histogram_with_registry!(
443 "batch_size",
444 "Distribution of size of transaction batch",
445 POSITIVE_INT_BUCKETS.to_vec(),
446 registry,
447 )
448 .unwrap(),
449 authority_state_handle_transaction_latency: register_histogram_with_registry!(
450 "authority_state_handle_transaction_latency",
451 "Latency of handling transactions",
452 LATENCY_SEC_BUCKETS.to_vec(),
453 registry,
454 )
455 .unwrap(),
456 execute_certificate_latency_single_writer,
457 execute_certificate_latency_shared_object,
458 internal_execution_latency: register_histogram_with_registry!(
459 "authority_state_internal_execution_latency",
460 "Latency of actual certificate executions",
461 LATENCY_SEC_BUCKETS.to_vec(),
462 registry,
463 )
464 .unwrap(),
465 execution_load_input_objects_latency: register_histogram_with_registry!(
466 "authority_state_execution_load_input_objects_latency",
467 "Latency of loading input objects for execution",
468 LOW_LATENCY_SEC_BUCKETS.to_vec(),
469 registry,
470 )
471 .unwrap(),
472 prepare_certificate_latency: register_histogram_with_registry!(
473 "authority_state_prepare_certificate_latency",
474 "Latency of executing certificates, before committing the results",
475 LATENCY_SEC_BUCKETS.to_vec(),
476 registry,
477 )
478 .unwrap(),
479 commit_certificate_latency: register_histogram_with_registry!(
480 "authority_state_commit_certificate_latency",
481 "Latency of committing certificate execution results",
482 LATENCY_SEC_BUCKETS.to_vec(),
483 registry,
484 )
485 .unwrap(),
486 db_checkpoint_latency: register_histogram_with_registry!(
487 "db_checkpoint_latency",
488 "Latency of checkpointing dbs",
489 LATENCY_SEC_BUCKETS.to_vec(),
490 registry,
491 ).unwrap(),
492 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
493 "transaction_manager_num_enqueued_certificates",
494 "Current number of certificates enqueued to TransactionManager",
495 &["result"],
496 registry,
497 )
498 .unwrap(),
499 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
500 "transaction_manager_num_missing_objects",
501 "Current number of missing objects in TransactionManager",
502 registry,
503 )
504 .unwrap(),
505 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
506 "transaction_manager_num_pending_certificates",
507 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
508 registry,
509 )
510 .unwrap(),
511 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
512 "transaction_manager_num_executing_certificates",
513 "Number of executing certificates, including queued and actually running certificates",
514 registry,
515 )
516 .unwrap(),
517 transaction_manager_num_ready: register_int_gauge_with_registry!(
518 "transaction_manager_num_ready",
519 "Number of ready transactions in TransactionManager",
520 registry,
521 )
522 .unwrap(),
523 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
524 "transaction_manager_object_cache_size",
525 "Current size of object-availability cache in TransactionManager",
526 registry,
527 )
528 .unwrap(),
529 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
530 "transaction_manager_object_cache_hits",
531 "Number of object-availability cache hits in TransactionManager",
532 registry,
533 )
534 .unwrap(),
535 authority_overload_status: register_int_gauge_with_registry!(
536 "authority_overload_status",
537 "Whether authority is current experiencing overload and enters load shedding mode.",
538 registry)
539 .unwrap(),
540 authority_load_shedding_percentage: register_int_gauge_with_registry!(
541 "authority_load_shedding_percentage",
542 "The percentage of transactions is shed when the authority is in load shedding mode.",
543 registry)
544 .unwrap(),
545 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
546 "transaction_manager_object_cache_misses",
547 "Number of object-availability cache misses in TransactionManager",
548 registry,
549 )
550 .unwrap(),
551 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
552 "transaction_manager_object_cache_evictions",
553 "Number of object-availability cache evictions in TransactionManager",
554 registry,
555 )
556 .unwrap(),
557 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
558 "transaction_manager_package_cache_size",
559 "Current size of package-availability cache in TransactionManager",
560 registry,
561 )
562 .unwrap(),
563 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
564 "transaction_manager_package_cache_hits",
565 "Number of package-availability cache hits in TransactionManager",
566 registry,
567 )
568 .unwrap(),
569 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
570 "transaction_manager_package_cache_misses",
571 "Number of package-availability cache misses in TransactionManager",
572 registry,
573 )
574 .unwrap(),
575 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
576 "transaction_manager_package_cache_evictions",
577 "Number of package-availability cache evictions in TransactionManager",
578 registry,
579 )
580 .unwrap(),
581 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
582 "transaction_manager_transaction_queue_age_s",
583 "Time spent in waiting for transaction in the queue",
584 LATENCY_SEC_BUCKETS.to_vec(),
585 registry,
586 )
587 .unwrap(),
588 transaction_overload_sources: register_int_counter_vec_with_registry!(
589 "transaction_overload_sources",
590 "Number of times each source indicates transaction overload.",
591 &["source"],
592 registry)
593 .unwrap(),
594 execution_driver_executed_transactions: register_int_counter_with_registry!(
595 "execution_driver_executed_transactions",
596 "Cumulative number of transaction executed by execution driver",
597 registry,
598 )
599 .unwrap(),
600 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
601 "execution_driver_dispatch_queue",
602 "Number of transaction pending in execution driver dispatch queue",
603 registry,
604 )
605 .unwrap(),
606 execution_queueing_delay_s: register_histogram_with_registry!(
607 "execution_queueing_delay_s",
608 "Queueing delay between a transaction is ready for execution until it starts executing.",
609 LATENCY_SEC_BUCKETS.to_vec(),
610 registry
611 )
612 .unwrap(),
613 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
614 "prepare_cert_gas_latency_ratio",
615 "The ratio of computation gas divided by VM execution latency.",
616 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
617 registry
618 )
619 .unwrap(),
620 execution_gas_latency_ratio: register_histogram_with_registry!(
621 "execution_gas_latency_ratio",
622 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
623 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
624 registry
625 )
626 .unwrap(),
627 skipped_consensus_txns: register_int_counter_with_registry!(
628 "skipped_consensus_txns",
629 "Total number of consensus transactions skipped",
630 registry,
631 )
632 .unwrap(),
633 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
634 "skipped_consensus_txns_cache_hit",
635 "Total number of consensus transactions skipped because of local cache hit",
636 registry,
637 )
638 .unwrap(),
639 post_processing_total_events_emitted: register_int_counter_with_registry!(
640 "post_processing_total_events_emitted",
641 "Total number of events emitted in post processing",
642 registry,
643 )
644 .unwrap(),
645 post_processing_total_tx_indexed: register_int_counter_with_registry!(
646 "post_processing_total_tx_indexed",
647 "Total number of txes indexed in post processing",
648 registry,
649 )
650 .unwrap(),
651 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
652 "post_processing_total_tx_had_event_processed",
653 "Total number of txes finished event processing in post processing",
654 registry,
655 )
656 .unwrap(),
657 post_processing_total_failures: register_int_counter_with_registry!(
658 "post_processing_total_failures",
659 "Total number of failure in post processing",
660 registry,
661 )
662 .unwrap(),
663 consensus_handler_processed: register_int_counter_vec_with_registry!(
664 "consensus_handler_processed",
665 "Number of transactions processed by consensus handler",
666 &["class"],
667 registry
668 ).unwrap(),
669 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
670 "consensus_handler_transaction_sizes",
671 "Sizes of each type of transactions processed by consensus handler",
672 &["class"],
673 POSITIVE_INT_BUCKETS.to_vec(),
674 registry
675 ).unwrap(),
676 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
677 "consensus_handler_num_low_scoring_authorities",
678 "Number of low scoring authorities based on reputation scores from consensus",
679 registry
680 ).unwrap(),
681 consensus_handler_scores: register_int_gauge_vec_with_registry!(
682 "consensus_handler_scores",
683 "scores from consensus for each authority",
684 &["authority"],
685 registry,
686 ).unwrap(),
687 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
688 "consensus_handler_deferred_transactions",
689 "Number of transactions deferred by consensus handler",
690 registry,
691 ).unwrap(),
692 consensus_handler_congested_transactions: register_int_counter_with_registry!(
693 "consensus_handler_congested_transactions",
694 "Number of transactions deferred by consensus handler due to congestion",
695 registry,
696 ).unwrap(),
697 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
698 "consensus_handler_cancelled_transactions",
699 "Number of transactions cancelled by consensus handler",
700 registry,
701 ).unwrap(),
702 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
703 "consensus_handler_max_congestion_control_object_costs",
704 "Max object costs for congestion control in the current consensus commit",
705 &["commit_type"],
706 registry,
707 ).unwrap(),
708 consensus_committed_subdags: register_int_counter_vec_with_registry!(
709 "consensus_committed_subdags",
710 "Number of committed subdags, sliced by leader",
711 &["authority"],
712 registry,
713 ).unwrap(),
714 consensus_committed_messages: register_int_gauge_vec_with_registry!(
715 "consensus_committed_messages",
716 "Total number of committed consensus messages, sliced by author",
717 &["authority"],
718 registry,
719 ).unwrap(),
720 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
721 "consensus_committed_user_transactions",
722 "Number of committed user transactions, sliced by submitter",
723 &["authority"],
724 registry,
725 ).unwrap(),
726 consensus_handler_leader_round: register_int_gauge_with_registry!(
727 "consensus_handler_leader_round",
728 "The leader round of the current consensus output being processed in the consensus handler",
729 registry,
730 ).unwrap(),
731 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
732 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
733 zklogin_sig_count: register_int_counter_with_registry!(
734 "zklogin_sig_count",
735 "Count of zkLogin signatures",
736 registry,
737 )
738 .unwrap(),
739 multisig_sig_count: register_int_counter_with_registry!(
740 "multisig_sig_count",
741 "Count of zkLogin signatures",
742 registry,
743 )
744 .unwrap(),
745 consensus_calculated_throughput: register_int_gauge_with_registry!(
746 "consensus_calculated_throughput",
747 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
748 registry,
749 ).unwrap(),
750 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
751 "consensus_calculated_throughput_profile",
752 "The current active calculated throughput profile",
753 registry
754 ).unwrap(),
755 execution_queueing_latency: LatencyObserver::new(),
756 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
758 }
759 }
760
761 pub fn reset_on_reconfigure(&self) {
765 self.consensus_committed_messages.reset();
766 self.consensus_handler_scores.reset();
767 self.consensus_committed_user_transactions.reset();
768 }
769}
770
771pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
778
779pub struct AuthorityState {
780 pub name: AuthorityName,
783 pub secret: StableSyncAuthoritySigner,
785
786 input_loader: TransactionInputLoader,
788 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
789
790 epoch_store: ArcSwap<AuthorityPerEpochStore>,
791
792 execution_lock: RwLock<EpochId>,
798
799 pub indexes: Option<Arc<IndexStore>>,
800 pub rest_index: Option<Arc<RestIndexStore>>,
801
802 pub subscription_handler: Arc<SubscriptionHandler>,
803 pub checkpoint_store: Arc<CheckpointStore>,
804
805 committee_store: Arc<CommitteeStore>,
806
807 transaction_manager: Arc<TransactionManager>,
809
810 #[cfg_attr(not(test), expect(unused))]
812 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
813
814 pub metrics: Arc<AuthorityMetrics>,
815 _pruner: AuthorityStorePruner,
816 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
817
818 db_checkpoint_config: DBCheckpointConfig,
820
821 pub config: NodeConfig,
822
823 pub overload_info: AuthorityOverloadInfo,
825
826 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
827
828 chain_identifier: ChainIdentifier,
831
832 pub(crate) congestion_tracker: Arc<CongestionTracker>,
833}
834
835impl AuthorityState {
844 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
845 epoch_store.committee().authority_exists(&self.name)
846 }
847
848 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
849 epoch_store
850 .active_validators()
851 .iter()
852 .any(|a| AuthorityName::from(a) == self.name)
853 }
854
855 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
856 !self.is_committee_validator(epoch_store)
857 }
858
859 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
860 &self.committee_store
861 }
862
863 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
864 self.committee_store.clone()
865 }
866
867 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
868 &self.config.authority_overload_config
869 }
870
871 pub fn get_epoch_state_commitments(
872 &self,
873 epoch: EpochId,
874 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
875 self.checkpoint_store.get_epoch_state_commitments(epoch)
876 }
877
878 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
882 async fn handle_transaction_impl(
883 &self,
884 transaction: VerifiedTransaction,
885 epoch_store: &Arc<AuthorityPerEpochStore>,
886 ) -> IotaResult<VerifiedSignedTransaction> {
887 let _execution_lock = self.execution_lock_for_signing()?;
889
890 let protocol_config = epoch_store.protocol_config();
891 let reference_gas_price = epoch_store.reference_gas_price();
892
893 let epoch = epoch_store.epoch();
894
895 let tx_data = transaction.data().transaction_data();
896
897 iota_transaction_checks::deny::check_transaction_for_signing(
901 tx_data,
902 transaction.tx_signatures(),
903 &transaction.input_objects()?,
904 &tx_data.receiving_objects(),
905 &self.config.transaction_deny_config,
906 self.get_backing_package_store().as_ref(),
907 )?;
908
909 let (tx_input_objects, tx_receiving_objects, auth_input_objects, account_object) =
914 self.read_objects_for_signing(&transaction, epoch)?;
915
916 let move_authenticator = transaction.sender_move_authenticator();
920
921 let (
927 gas_status,
928 tx_checked_input_objects,
929 auth_checked_input_objects,
930 authenticator_function_ref,
931 ) = self.check_transaction_inputs_for_signing(
932 protocol_config,
933 reference_gas_price,
934 tx_data,
935 tx_input_objects,
936 &tx_receiving_objects,
937 move_authenticator,
938 auth_input_objects,
939 account_object,
940 )?;
941
942 check_coin_deny_list_v1_during_signing(
943 tx_data.sender(),
944 &tx_checked_input_objects,
945 &tx_receiving_objects,
946 &auth_checked_input_objects,
947 &self.get_object_store(),
948 )?;
949
950 if let Some(move_authenticator) = move_authenticator {
951 let auth_checked_input_objects = auth_checked_input_objects
955 .expect("MoveAuthenticator input objects must be provided");
956 let authenticator_function_ref = authenticator_function_ref
957 .expect("AuthenticatorFunctionRefV1 object must be provided");
958
959 let (kind, signer, gas_data) = tx_data.execution_parts();
960
961 let validation_result = epoch_store.executor().authenticate_transaction(
963 self.get_backing_store().as_ref(),
964 protocol_config,
965 self.metrics.limits_metrics.clone(),
966 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
967 epoch_store
968 .epoch_start_config()
969 .epoch_data()
970 .epoch_start_timestamp(),
971 gas_data,
972 gas_status,
973 move_authenticator.to_owned(),
974 authenticator_function_ref,
975 auth_checked_input_objects,
976 kind,
977 signer,
978 transaction.digest().to_owned(),
979 &mut None,
980 );
981
982 if let Err(validation_error) = validation_result {
983 return Err(IotaError::MoveAuthenticatorExecutionFailure {
984 error: validation_error.to_string(),
985 });
986 }
987 }
988
989 let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
990
991 let signed_transaction =
992 VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
993
994 self.get_cache_writer().try_acquire_transaction_locks(
999 epoch_store,
1000 &owned_objects,
1001 signed_transaction.clone(),
1002 )?;
1003
1004 Ok(signed_transaction)
1005 }
1006
1007 #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()))]
1009 pub async fn handle_transaction(
1010 &self,
1011 epoch_store: &Arc<AuthorityPerEpochStore>,
1012 transaction: VerifiedTransaction,
1013 ) -> IotaResult<HandleTransactionResponse> {
1014 let tx_digest = *transaction.digest();
1015 debug!("handle_transaction");
1016
1017 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1019 return Ok(HandleTransactionResponse { status });
1020 }
1021
1022 let _metrics_guard = self
1023 .metrics
1024 .authority_state_handle_transaction_latency
1025 .start_timer();
1026 self.metrics.tx_orders.inc();
1027
1028 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1029 match signed {
1030 Ok(s) => {
1031 if self.is_committee_validator(epoch_store) {
1032 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1033 let tx = s.clone();
1034 let validator_tx_finalizer = validator_tx_finalizer.clone();
1035 let cache_reader = self.get_transaction_cache_reader().clone();
1036 let epoch_store = epoch_store.clone();
1037 spawn_monitored_task!(epoch_store.within_alive_epoch(
1038 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1039 ));
1040 }
1041 }
1042 Ok(HandleTransactionResponse {
1043 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1044 })
1045 }
1046 Err(err) => Ok(HandleTransactionResponse {
1050 status: self
1051 .get_transaction_status(&tx_digest, epoch_store)?
1052 .ok_or(err)?
1053 .1,
1054 }),
1055 }
1056 }
1057
1058 pub fn check_system_overload_at_signing(&self) -> bool {
1059 self.config
1060 .authority_overload_config
1061 .check_system_overload_at_signing
1062 }
1063
1064 pub fn check_system_overload_at_execution(&self) -> bool {
1065 self.config
1066 .authority_overload_config
1067 .check_system_overload_at_execution
1068 }
1069
1070 pub(crate) fn check_system_overload(
1071 &self,
1072 consensus_adapter: &Arc<ConsensusAdapter>,
1073 tx_data: &SenderSignedData,
1074 do_authority_overload_check: bool,
1075 ) -> IotaResult {
1076 if do_authority_overload_check {
1077 self.check_authority_overload(tx_data).tap_err(|_| {
1078 self.update_overload_metrics("execution_queue");
1079 })?;
1080 }
1081 self.transaction_manager
1082 .check_execution_overload(self.overload_config(), tx_data)
1083 .tap_err(|_| {
1084 self.update_overload_metrics("execution_pending");
1085 })?;
1086 consensus_adapter.check_consensus_overload().tap_err(|_| {
1087 self.update_overload_metrics("consensus");
1088 })?;
1089
1090 let pending_tx_count = self
1091 .get_cache_commit()
1092 .approximate_pending_transaction_count();
1093 if pending_tx_count
1094 > self
1095 .config
1096 .execution_cache_config
1097 .writeback_cache
1098 .backpressure_threshold_for_rpc()
1099 {
1100 return Err(IotaError::ValidatorOverloadedRetryAfter {
1101 retry_after_secs: 10,
1102 });
1103 }
1104
1105 Ok(())
1106 }
1107
1108 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1109 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1110 return Ok(());
1111 }
1112
1113 let load_shedding_percentage = self
1114 .overload_info
1115 .load_shedding_percentage
1116 .load(Ordering::Relaxed);
1117 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1118 }
1119
1120 fn update_overload_metrics(&self, source: &str) {
1121 self.metrics
1122 .transaction_overload_sources
1123 .with_label_values(&[source])
1124 .inc();
1125 }
1126
1127 #[instrument(level = "trace", skip_all)]
1129 pub async fn execute_certificate(
1130 &self,
1131 certificate: &VerifiedCertificate,
1132 epoch_store: &Arc<AuthorityPerEpochStore>,
1133 ) -> IotaResult<TransactionEffects> {
1134 let _metrics_guard = if certificate.contains_shared_object() {
1135 self.metrics
1136 .execute_certificate_latency_shared_object
1137 .start_timer()
1138 } else {
1139 self.metrics
1140 .execute_certificate_latency_single_writer
1141 .start_timer()
1142 };
1143 trace!("execute_certificate");
1144
1145 self.metrics.total_cert_attempts.inc();
1146
1147 if !certificate.contains_shared_object() {
1148 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1153 }
1154
1155 epoch_store
1158 .within_alive_epoch(self.notify_read_effects(certificate))
1159 .await
1160 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1161 .and_then(|r| r)
1162 }
1163
1164 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1179 pub fn try_execute_immediately(
1180 &self,
1181 certificate: &VerifiedExecutableTransaction,
1182 expected_effects_digest: Option<TransactionEffectsDigest>,
1183 epoch_store: &Arc<AuthorityPerEpochStore>,
1184 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1185 let _scope = monitored_scope("Execution::try_execute_immediately");
1186 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1187
1188 let tx_digest = certificate.digest();
1189
1190 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1192
1193 if let Some(effects) = self
1196 .get_transaction_cache_reader()
1197 .try_get_executed_effects(tx_digest)?
1198 {
1199 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1200 assert_eq!(
1201 effects.digest(),
1202 expected_effects_digest_inner,
1203 "Unexpected effects digest for transaction {tx_digest:?}"
1204 );
1205 }
1206 tx_guard.release();
1207 return Ok((effects, None));
1208 }
1209
1210 let (tx_input_objects, authenticator_input_objects, account_object) =
1211 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1212
1213 let expected_effects_digest =
1219 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1220
1221 self.process_certificate(
1222 tx_guard,
1223 certificate,
1224 tx_input_objects,
1225 account_object,
1226 authenticator_input_objects,
1227 expected_effects_digest,
1228 epoch_store,
1229 )
1230 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1231 .tap_ok(
1232 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1233 )
1234 }
1235
1236 pub fn read_objects_for_execution(
1237 &self,
1238 tx_lock: &CertLockGuard,
1239 certificate: &VerifiedExecutableTransaction,
1240 epoch_store: &Arc<AuthorityPerEpochStore>,
1241 ) -> IotaResult<(InputObjects, Option<InputObjects>, Option<ObjectReadResult>)> {
1242 let _scope = monitored_scope("Execution::load_input_objects");
1243 let _metrics_guard = self
1244 .metrics
1245 .execution_load_input_objects_latency
1246 .start_timer();
1247
1248 let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1249
1250 let input_objects = self.input_loader.read_objects_for_execution(
1251 epoch_store,
1252 &certificate.key(),
1253 tx_lock,
1254 &input_objects,
1255 epoch_store.epoch(),
1256 )?;
1257
1258 certificate.split_input_objects_into_groups_for_reading(input_objects)
1259 }
1260
1261 pub fn try_execute_for_test(
1265 &self,
1266 certificate: &VerifiedCertificate,
1267 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1268 let epoch_store = self.epoch_store_for_testing();
1269 let (effects, execution_error_opt) = self.try_execute_immediately(
1270 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1271 None,
1272 &epoch_store,
1273 )?;
1274 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1275 Ok((signed_effects, execution_error_opt))
1276 }
1277
1278 pub fn execute_for_test(
1280 &self,
1281 certificate: &VerifiedCertificate,
1282 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1283 self.try_execute_for_test(certificate)
1284 .expect("try_execute_for_test should not fail")
1285 }
1286
1287 pub async fn notify_read_effects(
1288 &self,
1289 certificate: &VerifiedCertificate,
1290 ) -> IotaResult<TransactionEffects> {
1291 self.get_transaction_cache_reader()
1292 .try_notify_read_executed_effects(&[*certificate.digest()])
1293 .await
1294 .map(|mut r| r.pop().expect("must return correct number of effects"))
1295 }
1296
1297 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1298 self.get_object_cache_reader()
1299 .try_check_owned_objects_are_live(owned_object_refs)
1300 }
1301
1302 pub(crate) fn debug_dump_transaction_state(
1307 &self,
1308 tx_digest: &TransactionDigest,
1309 effects: &TransactionEffects,
1310 expected_effects_digest: TransactionEffectsDigest,
1311 inner_temporary_store: &InnerTemporaryStore,
1312 certificate: &VerifiedExecutableTransaction,
1313 debug_dump_config: &StateDebugDumpConfig,
1314 ) -> IotaResult<PathBuf> {
1315 let dump_dir = debug_dump_config
1316 .dump_file_directory
1317 .as_ref()
1318 .cloned()
1319 .unwrap_or(std::env::temp_dir());
1320 let epoch_store = self.load_epoch_store_one_call_per_task();
1321
1322 NodeStateDump::new(
1323 tx_digest,
1324 effects,
1325 expected_effects_digest,
1326 self.get_object_store().as_ref(),
1327 &epoch_store,
1328 inner_temporary_store,
1329 certificate,
1330 )?
1331 .write_to_file(&dump_dir)
1332 .map_err(|e| IotaError::FileIO(e.to_string()))
1333 }
1334
1335 #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?certificate.digest(), sender = ?certificate.data().transaction_data().gas_owner().to_string()))]
1336 pub(crate) fn process_certificate(
1337 &self,
1338 tx_guard: CertTxGuard,
1339 certificate: &VerifiedExecutableTransaction,
1340 tx_input_objects: InputObjects,
1341 account_object: Option<ObjectReadResult>,
1342 authenticator_input_objects: Option<InputObjects>,
1343 expected_effects_digest: Option<TransactionEffectsDigest>,
1344 epoch_store: &Arc<AuthorityPerEpochStore>,
1345 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1346 let process_certificate_start_time = tokio::time::Instant::now();
1347 let digest = *certificate.digest();
1348
1349 let _scope = monitored_scope("Execution::process_certificate");
1350
1351 fail_point_if!("correlated-crash-process-certificate", || {
1352 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1353 iota_simulator::task::kill_current_node(None);
1354 }
1355 });
1356
1357 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1358 let execution_guard = match execution_guard {
1364 Ok(execution_guard) => execution_guard,
1365 Err(err) => {
1366 tx_guard.release();
1367 return Err(err);
1368 }
1369 };
1370 if *execution_guard != epoch_store.epoch() {
1374 tx_guard.release();
1375 info!("The epoch of the execution_guard doesn't match the epoch store");
1376 return Err(IotaError::WrongEpoch {
1377 expected_epoch: epoch_store.epoch(),
1378 actual_epoch: *execution_guard,
1379 });
1380 }
1381
1382 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1388 &execution_guard,
1389 certificate,
1390 tx_input_objects,
1391 account_object,
1392 authenticator_input_objects,
1393 epoch_store,
1394 ) {
1395 Err(e) => {
1396 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1397 tx_guard.release();
1398 return Err(e);
1399 }
1400 Ok(res) => res,
1401 };
1402
1403 if let Some(expected_effects_digest) = expected_effects_digest {
1404 if effects.digest() != expected_effects_digest {
1405 match self.debug_dump_transaction_state(
1407 &digest,
1408 &effects,
1409 expected_effects_digest,
1410 &inner_temporary_store,
1411 certificate,
1412 &self.config.state_debug_dump_config,
1413 ) {
1414 Ok(out_path) => {
1415 info!(
1416 "Dumped node state for transaction {} to {}",
1417 digest,
1418 out_path.as_path().display().to_string()
1419 );
1420 }
1421 Err(e) => {
1422 error!("Error dumping state for transaction {}: {e}", digest);
1423 }
1424 }
1425 error!(
1426 tx_digest = ?digest,
1427 ?expected_effects_digest,
1428 actual_effects = ?effects,
1429 "fork detected!"
1430 );
1431 panic!(
1432 "Transaction {} is expected to have effects digest {}, but got {}!",
1433 digest,
1434 expected_effects_digest,
1435 effects.digest(),
1436 );
1437 }
1438 }
1439
1440 fail_point!("crash");
1441
1442 self.commit_certificate(
1443 certificate,
1444 inner_temporary_store,
1445 &effects,
1446 tx_guard,
1447 execution_guard,
1448 epoch_store,
1449 )?;
1450
1451 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1452 certificate.data().transaction_data().kind()
1453 {
1454 if let Some(err) = &execution_error_opt {
1455 debug_fatal!("Authenticator state update failed: {:?}", err);
1456 }
1457 epoch_store.update_authenticator_state(auth_state);
1458
1459 if cfg!(debug_assertions) {
1462 let authenticator_state = get_authenticator_state(self.get_object_store())
1463 .expect("Read cannot fail")
1464 .expect("Authenticator state must exist");
1465
1466 let mut sys_jwks: Vec<_> = authenticator_state
1467 .active_jwks
1468 .into_iter()
1469 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1470 .collect();
1471 let mut active_jwks: Vec<_> = epoch_store
1472 .signature_verifier
1473 .get_jwks()
1474 .into_iter()
1475 .collect();
1476 sys_jwks.sort();
1477 active_jwks.sort();
1478
1479 assert_eq!(sys_jwks, active_jwks);
1480 }
1481 }
1482
1483 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1484 if elapsed > 0.0 {
1485 self.metrics
1486 .execution_gas_latency_ratio
1487 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1488 };
1489 Ok((effects, execution_error_opt))
1490 }
1491
1492 #[instrument(level = "trace", skip_all)]
1493 fn commit_certificate(
1494 &self,
1495 certificate: &VerifiedExecutableTransaction,
1496 inner_temporary_store: InnerTemporaryStore,
1497 effects: &TransactionEffects,
1498 tx_guard: CertTxGuard,
1499 _execution_guard: ExecutionLockReadGuard<'_>,
1500 epoch_store: &Arc<AuthorityPerEpochStore>,
1501 ) -> IotaResult {
1502 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1503 monitored_scope("Execution::commit_certificate");
1504 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1505
1506 let tx_key = certificate.key();
1507 let tx_digest = certificate.digest();
1508 let input_object_count = inner_temporary_store.input_objects.len();
1509 let shared_object_count = effects.input_shared_objects().len();
1510
1511 let output_keys = inner_temporary_store.get_output_keys(effects);
1512
1513 let _ = self
1515 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1516 .tap_err(|e| {
1517 self.metrics.post_processing_total_failures.inc();
1518 error!(?tx_digest, "tx post processing failed: {e}");
1519 });
1520
1521 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1525
1526 fail_point!("crash");
1528
1529 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1530 certificate.clone().into_unsigned(),
1531 effects.clone(),
1532 inner_temporary_store,
1533 );
1534 self.get_cache_writer()
1535 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1536
1537 if certificate.transaction_data().is_end_of_epoch_tx() {
1538 self.get_object_cache_reader()
1541 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1542 }
1543
1544 tx_guard.commit_tx();
1546
1547 self.transaction_manager
1551 .notify_commit(tx_digest, output_keys, epoch_store);
1552
1553 self.update_metrics(certificate, input_object_count, shared_object_count);
1554
1555 Ok(())
1556 }
1557
1558 fn update_metrics(
1559 &self,
1560 certificate: &VerifiedExecutableTransaction,
1561 input_object_count: usize,
1562 shared_object_count: usize,
1563 ) {
1564 if certificate.has_zklogin_sig() {
1566 self.metrics.zklogin_sig_count.inc();
1567 } else if certificate.has_upgraded_multisig() {
1568 self.metrics.multisig_sig_count.inc();
1569 }
1570
1571 self.metrics.total_effects.inc();
1572 self.metrics.total_certs.inc();
1573
1574 if shared_object_count > 0 {
1575 self.metrics.shared_obj_tx.inc();
1576 }
1577
1578 if certificate.is_sponsored_tx() {
1579 self.metrics.sponsored_tx.inc();
1580 }
1581
1582 self.metrics
1583 .num_input_objs
1584 .observe(input_object_count as f64);
1585 self.metrics
1586 .num_shared_objects
1587 .observe(shared_object_count as f64);
1588 self.metrics.batch_size.observe(
1589 certificate
1590 .data()
1591 .intent_message()
1592 .value
1593 .kind()
1594 .num_commands() as f64,
1595 );
1596 }
1597
1598 #[instrument(level = "trace", skip_all)]
1610 fn prepare_certificate(
1611 &self,
1612 _execution_guard: &ExecutionLockReadGuard<'_>,
1613 certificate: &VerifiedExecutableTransaction,
1614 tx_input_objects: InputObjects,
1615 account_object: Option<ObjectReadResult>,
1616 move_authenticator_input_objects: Option<InputObjects>,
1617 epoch_store: &Arc<AuthorityPerEpochStore>,
1618 ) -> IotaResult<(
1619 InnerTemporaryStore,
1620 TransactionEffects,
1621 Option<ExecutionError>,
1622 )> {
1623 let _scope = monitored_scope("Execution::prepare_certificate");
1624 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1625 let prepare_certificate_start_time = tokio::time::Instant::now();
1626
1627 let protocol_config = epoch_store.protocol_config();
1628
1629 let reference_gas_price = epoch_store.reference_gas_price();
1630
1631 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1632 let epoch_start_timestamp = epoch_store
1633 .epoch_start_config()
1634 .epoch_data()
1635 .epoch_start_timestamp();
1636
1637 let backing_store = self.get_backing_store().as_ref();
1638
1639 let tx_digest = *certificate.digest();
1640
1641 let tx_data = certificate.data().transaction_data();
1644 tx_data.validity_check(protocol_config)?;
1645
1646 let (kind, signer, gas_data) = tx_data.execution_parts();
1647
1648 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1649 let (inner_temp_store, _, mut effects, execution_error_opt) = if let Some(
1650 move_authenticator,
1651 ) =
1652 certificate.sender_move_authenticator()
1653 {
1654 let (
1661 auth_account_object_id,
1662 auth_account_object_seq_number,
1663 auth_account_object_digest,
1664 ) = move_authenticator.object_to_authenticate_components()?;
1665
1666 let account_object = account_object.expect("Account object must be provided");
1669
1670 let authenticator_function_ref_for_execution = self.check_move_account(
1671 auth_account_object_id,
1672 auth_account_object_seq_number,
1673 auth_account_object_digest,
1674 account_object,
1675 &signer,
1676 )?;
1677
1678 let move_authenticator_input_objects = move_authenticator_input_objects.expect(
1679 "In case of a `MoveAuthenticator` signature, the authenticator input objects must be provided",
1680 );
1681
1682 let authenticator_gas_budget = protocol_config.max_auth_gas();
1687 let (
1688 gas_status,
1689 authenticator_checked_input_objects,
1690 authenticator_and_tx_checked_input_objects,
1691 ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1692 certificate,
1693 tx_input_objects,
1694 move_authenticator_input_objects,
1695 authenticator_gas_budget,
1696 protocol_config,
1697 reference_gas_price,
1698 )?;
1699
1700 let owned_object_refs = authenticator_and_tx_checked_input_objects
1701 .inner()
1702 .filter_owned_objects();
1703 self.check_owned_locks(&owned_object_refs)?;
1704
1705 epoch_store
1706 .executor()
1707 .authenticate_then_execute_transaction_to_effects(
1708 backing_store,
1709 protocol_config,
1710 self.metrics.limits_metrics.clone(),
1711 self.config
1712 .expensive_safety_check_config
1713 .enable_deep_per_tx_iota_conservation_check(),
1714 self.config.certificate_deny_config.certificate_deny_set(),
1715 &epoch_id,
1716 epoch_start_timestamp,
1717 gas_data,
1718 gas_status,
1719 move_authenticator.to_owned(),
1720 authenticator_function_ref_for_execution,
1721 authenticator_checked_input_objects,
1722 authenticator_and_tx_checked_input_objects,
1723 kind,
1724 signer,
1725 tx_digest,
1726 &mut None,
1727 )
1728 } else {
1729 let (tx_gas_status, tx_checked_input_objects) =
1734 iota_transaction_checks::check_certificate_input(
1735 certificate,
1736 tx_input_objects,
1737 protocol_config,
1738 reference_gas_price,
1739 )?;
1740
1741 let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1742 self.check_owned_locks(&owned_object_refs)?;
1743 epoch_store.executor().execute_transaction_to_effects(
1744 backing_store,
1745 protocol_config,
1746 self.metrics.limits_metrics.clone(),
1747 self.config
1750 .expensive_safety_check_config
1751 .enable_deep_per_tx_iota_conservation_check(),
1752 self.config.certificate_deny_config.certificate_deny_set(),
1753 &epoch_id,
1754 epoch_start_timestamp,
1755 tx_checked_input_objects,
1756 gas_data,
1757 tx_gas_status,
1758 kind,
1759 signer,
1760 tx_digest,
1761 &mut None,
1762 )
1763 };
1764
1765 fail_point_if!("cp_execution_nondeterminism", || {
1766 #[cfg(msim)]
1767 self.create_fail_state(certificate, epoch_store, &mut effects);
1768 });
1769
1770 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1771 if elapsed > 0.0 {
1772 self.metrics
1773 .prepare_cert_gas_latency_ratio
1774 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1775 }
1776
1777 Ok((inner_temp_store, effects, execution_error_opt.err()))
1778 }
1779
1780 pub fn prepare_certificate_for_benchmark(
1781 &self,
1782 certificate: &VerifiedExecutableTransaction,
1783 input_objects: InputObjects,
1784 epoch_store: &Arc<AuthorityPerEpochStore>,
1785 ) -> IotaResult<(
1786 InnerTemporaryStore,
1787 TransactionEffects,
1788 Option<ExecutionError>,
1789 )> {
1790 let lock = RwLock::new(epoch_store.epoch());
1791 let execution_guard = lock.try_read().unwrap();
1792
1793 self.prepare_certificate(
1794 &execution_guard,
1795 certificate,
1796 input_objects,
1797 None,
1798 None,
1799 epoch_store,
1800 )
1801 }
1802
1803 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1806 #[allow(clippy::type_complexity)]
1807 pub fn dry_exec_transaction(
1808 &self,
1809 transaction: TransactionData,
1810 transaction_digest: TransactionDigest,
1811 ) -> IotaResult<(
1812 DryRunTransactionBlockResponse,
1813 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1814 TransactionEffects,
1815 Option<ObjectID>,
1816 )> {
1817 let epoch_store = self.load_epoch_store_one_call_per_task();
1818 if !self.is_fullnode(&epoch_store) {
1819 return Err(IotaError::UnsupportedFeature {
1820 error: "dry-exec is only supported on fullnodes".to_string(),
1821 });
1822 }
1823
1824 if transaction.kind().is_system_tx() {
1825 return Err(IotaError::UnsupportedFeature {
1826 error: "dry-exec does not support system transactions".to_string(),
1827 });
1828 }
1829
1830 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1831 }
1832
1833 #[allow(clippy::type_complexity)]
1834 pub fn dry_exec_transaction_for_benchmark(
1835 &self,
1836 transaction: TransactionData,
1837 transaction_digest: TransactionDigest,
1838 ) -> IotaResult<(
1839 DryRunTransactionBlockResponse,
1840 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1841 TransactionEffects,
1842 Option<ObjectID>,
1843 )> {
1844 let epoch_store = self.load_epoch_store_one_call_per_task();
1845 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1846 }
1847
1848 #[instrument(level = "trace", skip_all)]
1849 #[allow(clippy::type_complexity)]
1850 fn dry_exec_transaction_impl(
1851 &self,
1852 epoch_store: &AuthorityPerEpochStore,
1853 transaction: TransactionData,
1854 transaction_digest: TransactionDigest,
1855 ) -> IotaResult<(
1856 DryRunTransactionBlockResponse,
1857 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1858 TransactionEffects,
1859 Option<ObjectID>,
1860 )> {
1861 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1863
1864 let input_object_kinds = transaction.input_objects()?;
1865 let receiving_object_refs = transaction.receiving_objects();
1866
1867 iota_transaction_checks::deny::check_transaction_for_signing(
1868 &transaction,
1869 &[],
1870 &input_object_kinds,
1871 &receiving_object_refs,
1872 &self.config.transaction_deny_config,
1873 self.get_backing_package_store().as_ref(),
1874 )?;
1875
1876 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1877 None,
1879 &input_object_kinds,
1880 &receiving_object_refs,
1881 epoch_store.epoch(),
1882 )?;
1883
1884 let mut transaction = transaction;
1886 let reference_gas_price = epoch_store.reference_gas_price();
1887 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1888 let sender = transaction.gas_owner();
1889 let gas_object_id = ObjectID::random();
1890 let gas_object = Object::new_move(
1891 MoveObject::new_gas_coin(
1892 OBJECT_START_VERSION,
1893 gas_object_id,
1894 SIMULATION_GAS_COIN_VALUE,
1895 ),
1896 Owner::AddressOwner(sender),
1897 TransactionDigest::genesis_marker(),
1898 );
1899 let gas_object_ref = gas_object.compute_object_reference();
1900 transaction.gas_data_mut().payment = vec![gas_object_ref];
1902 (
1903 iota_transaction_checks::check_transaction_input_with_given_gas(
1904 epoch_store.protocol_config(),
1905 reference_gas_price,
1906 &transaction,
1907 input_objects,
1908 receiving_objects,
1909 gas_object,
1910 &self.metrics.bytecode_verifier_metrics,
1911 &self.config.verifier_signing_config,
1912 )?,
1913 Some(gas_object_id),
1914 )
1915 } else {
1916 let authenticator_gas_budget = 0;
1919
1920 (
1921 iota_transaction_checks::check_transaction_input(
1922 epoch_store.protocol_config(),
1923 reference_gas_price,
1924 &transaction,
1925 input_objects,
1926 &receiving_objects,
1927 &self.metrics.bytecode_verifier_metrics,
1928 &self.config.verifier_signing_config,
1929 authenticator_gas_budget,
1930 )?,
1931 None,
1932 )
1933 };
1934
1935 let protocol_config = epoch_store.protocol_config();
1936 let (kind, signer, gas_data) = transaction.execution_parts();
1937
1938 let silent = true;
1939 let executor = iota_execution::executor(protocol_config, silent, None)
1940 .expect("Creating an executor should not fail here");
1941
1942 let expensive_checks = false;
1943 let (inner_temp_store, _, effects, execution_error) = executor
1944 .execute_transaction_to_effects(
1945 self.get_backing_store().as_ref(),
1946 protocol_config,
1947 self.metrics.limits_metrics.clone(),
1948 expensive_checks,
1949 self.config.certificate_deny_config.certificate_deny_set(),
1950 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1951 epoch_store
1952 .epoch_start_config()
1953 .epoch_data()
1954 .epoch_start_timestamp(),
1955 checked_input_objects,
1956 gas_data,
1957 gas_status,
1958 kind,
1959 signer,
1960 transaction_digest,
1961 &mut None,
1962 );
1963 let tx_digest = *effects.transaction_digest();
1964
1965 let module_cache =
1966 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1967
1968 let mut layout_resolver =
1969 epoch_store
1970 .executor()
1971 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1972 &inner_temp_store,
1973 self.get_backing_package_store(),
1974 )));
1975 let object_changes = Vec::new();
1977
1978 let balance_changes = Vec::new();
1980
1981 let written_with_kind = effects
1982 .created()
1983 .into_iter()
1984 .map(|(oref, _)| (oref, WriteKind::Create))
1985 .chain(
1986 effects
1987 .unwrapped()
1988 .into_iter()
1989 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1990 )
1991 .chain(
1992 effects
1993 .mutated()
1994 .into_iter()
1995 .map(|(oref, _)| (oref, WriteKind::Mutate)),
1996 )
1997 .map(|(oref, kind)| {
1998 let obj = inner_temp_store.written.get(&oref.0).unwrap();
1999 (oref.0, (oref, obj.clone(), kind))
2001 })
2002 .collect();
2003
2004 let execution_error_source = execution_error
2005 .as_ref()
2006 .err()
2007 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2008
2009 Ok((
2010 DryRunTransactionBlockResponse {
2011 suggested_gas_price: self
2013 .congestion_tracker
2014 .get_prediction_suggested_gas_price(&transaction),
2015 input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
2016 .map_err(|e| IotaError::TransactionSerialization {
2017 error: format!(
2018 "Failed to convert transaction to IotaTransactionBlockData: {e}",
2019 ),
2020 })?, effects: effects.clone().try_into()?,
2022 events: IotaTransactionBlockEvents::try_from(
2023 inner_temp_store.events.clone(),
2024 tx_digest,
2025 None,
2026 layout_resolver.as_mut(),
2027 )?,
2028 object_changes,
2029 balance_changes,
2030 execution_error_source,
2031 },
2032 written_with_kind,
2033 effects,
2034 mock_gas,
2035 ))
2036 }
2037
2038 pub fn simulate_transaction(
2039 &self,
2040 mut transaction: TransactionData,
2041 checks: VmChecks,
2042 ) -> IotaResult<SimulateTransactionResult> {
2043 if transaction.kind().is_system_tx() {
2044 return Err(IotaError::UnsupportedFeature {
2045 error: "simulate does not support system transactions".to_string(),
2046 });
2047 }
2048
2049 let epoch_store = self.load_epoch_store_one_call_per_task();
2050 if !self.is_fullnode(&epoch_store) {
2051 return Err(IotaError::UnsupportedFeature {
2052 error: "simulate is only supported on fullnodes".to_string(),
2053 });
2054 }
2055
2056 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2060
2061 let input_object_kinds = transaction.input_objects()?;
2062 let receiving_object_refs = transaction.receiving_objects();
2063
2064 iota_transaction_checks::deny::check_transaction_for_signing(
2067 &transaction,
2068 &[],
2069 &input_object_kinds,
2070 &receiving_object_refs,
2071 &self.config.transaction_deny_config,
2072 self.get_backing_package_store().as_ref(),
2073 )?;
2074
2075 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2077 None,
2079 &input_object_kinds,
2080 &receiving_object_refs,
2081 epoch_store.epoch(),
2082 )?;
2083
2084 let mock_gas_id = if transaction.gas().is_empty() {
2086 let mock_gas_object = Object::new_move(
2087 MoveObject::new_gas_coin(
2088 OBJECT_START_VERSION,
2089 ObjectID::MAX,
2090 SIMULATION_GAS_COIN_VALUE,
2091 ),
2092 Owner::AddressOwner(transaction.gas_data().owner),
2093 TransactionDigest::genesis_marker(),
2094 );
2095 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2096 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2097 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2098 Some(mock_gas_object.id())
2099 } else {
2100 None
2101 };
2102
2103 let protocol_config = epoch_store.protocol_config();
2104
2105 let authenticator_gas_budget = 0;
2108
2109 let (gas_status, checked_input_objects) = if checks.enabled() {
2112 iota_transaction_checks::check_transaction_input(
2113 protocol_config,
2114 epoch_store.reference_gas_price(),
2115 &transaction,
2116 input_objects,
2117 &receiving_objects,
2118 &self.metrics.bytecode_verifier_metrics,
2119 &self.config.verifier_signing_config,
2120 authenticator_gas_budget,
2121 )?
2122 } else {
2123 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2124 protocol_config,
2125 transaction.kind(),
2126 input_objects,
2127 receiving_objects,
2128 )?;
2129 let gas_status = IotaGasStatus::new(
2130 transaction.gas_budget(),
2131 transaction.gas_price(),
2132 epoch_store.reference_gas_price(),
2133 protocol_config,
2134 )?;
2135
2136 (gas_status, checked_input_objects)
2137 };
2138
2139 let executor = iota_execution::executor(
2141 protocol_config,
2142 true, None,
2144 )
2145 .expect("Creating an executor should not fail here");
2146
2147 let (kind, signer, gas_data) = transaction.execution_parts();
2149 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2150 self.get_backing_store().as_ref(),
2151 protocol_config,
2152 self.metrics.limits_metrics.clone(),
2153 false, self.config.certificate_deny_config.certificate_deny_set(),
2155 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2156 epoch_store
2157 .epoch_start_config()
2158 .epoch_data()
2159 .epoch_start_timestamp(),
2160 checked_input_objects,
2161 gas_data,
2162 gas_status,
2163 kind,
2164 signer,
2165 transaction.digest(),
2166 checks.disabled(),
2167 );
2168
2169 Ok(SimulateTransactionResult {
2172 input_objects: inner_temp_store.input_objects,
2173 output_objects: inner_temp_store.written,
2174 events: effects.events_digest().map(|_| inner_temp_store.events),
2175 effects,
2176 execution_result,
2177 suggested_gas_price: self
2178 .congestion_tracker
2179 .get_prediction_suggested_gas_price(&transaction),
2180 mock_gas_id,
2181 })
2182 }
2183
2184 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2189 pub async fn dev_inspect_transaction_block(
2190 &self,
2191 sender: IotaAddress,
2192 transaction_kind: TransactionKind,
2193 gas_price: Option<u64>,
2194 gas_budget: Option<u64>,
2195 gas_sponsor: Option<IotaAddress>,
2196 gas_objects: Option<Vec<ObjectRef>>,
2197 show_raw_txn_data_and_effects: Option<bool>,
2198 skip_checks: Option<bool>,
2199 ) -> IotaResult<DevInspectResults> {
2200 let epoch_store = self.load_epoch_store_one_call_per_task();
2201
2202 if !self.is_fullnode(&epoch_store) {
2203 return Err(IotaError::UnsupportedFeature {
2204 error: "dev-inspect is only supported on fullnodes".to_string(),
2205 });
2206 }
2207
2208 if transaction_kind.is_system_tx() {
2209 return Err(IotaError::UnsupportedFeature {
2210 error: "system transactions are not supported".to_string(),
2211 });
2212 }
2213
2214 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2215 let skip_checks = skip_checks.unwrap_or(true);
2216 let reference_gas_price = epoch_store.reference_gas_price();
2217 let protocol_config = epoch_store.protocol_config();
2218 let max_tx_gas = protocol_config.max_tx_gas();
2219
2220 let price = gas_price.unwrap_or(reference_gas_price);
2221 let budget = gas_budget.unwrap_or(max_tx_gas);
2222 let owner = gas_sponsor.unwrap_or(sender);
2223 let payment = gas_objects.unwrap_or_default();
2226 let mut transaction = TransactionData::V1(TransactionDataV1 {
2227 kind: transaction_kind.clone(),
2228 sender,
2229 gas_data: GasData {
2230 payment,
2231 owner,
2232 price,
2233 budget,
2234 },
2235 expiration: TransactionExpiration::None,
2236 });
2237
2238 let raw_txn_data = if show_raw_txn_data_and_effects {
2239 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2240 error: "Failed to serialize transaction during dev inspect".to_string(),
2241 })?
2242 } else {
2243 vec![]
2244 };
2245
2246 transaction.validity_check_no_gas_check(protocol_config)?;
2247
2248 let input_object_kinds = transaction.input_objects()?;
2249 let receiving_object_refs = transaction.receiving_objects();
2250
2251 iota_transaction_checks::deny::check_transaction_for_signing(
2252 &transaction,
2253 &[],
2254 &input_object_kinds,
2255 &receiving_object_refs,
2256 &self.config.transaction_deny_config,
2257 self.get_backing_package_store().as_ref(),
2258 )?;
2259
2260 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2261 None,
2263 &input_object_kinds,
2264 &receiving_object_refs,
2265 epoch_store.epoch(),
2266 )?;
2267
2268 let (gas_status, checked_input_objects) = if skip_checks {
2269 if transaction.gas().is_empty() {
2274 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2276 SIMULATION_GAS_COIN_VALUE,
2277 transaction.gas_owner(),
2278 );
2279 let gas_object_ref = dummy_gas_object.compute_object_reference();
2280 transaction.gas_data_mut().payment = vec![gas_object_ref];
2281 input_objects.push(ObjectReadResult::new(
2282 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2283 dummy_gas_object.into(),
2284 ));
2285 }
2286 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2287 protocol_config,
2288 &transaction_kind,
2289 input_objects,
2290 receiving_objects,
2291 )?;
2292 let gas_status = IotaGasStatus::new(
2293 max_tx_gas,
2294 transaction.gas_price(),
2295 reference_gas_price,
2296 protocol_config,
2297 )?;
2298
2299 (gas_status, checked_input_objects)
2300 } else {
2301 if transaction.gas().is_empty() {
2305 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2307 SIMULATION_GAS_COIN_VALUE,
2308 transaction.gas_owner(),
2309 );
2310 let gas_object_ref = dummy_gas_object.compute_object_reference();
2311 transaction.gas_data_mut().payment = vec![gas_object_ref];
2312 iota_transaction_checks::check_transaction_input_with_given_gas(
2313 epoch_store.protocol_config(),
2314 reference_gas_price,
2315 &transaction,
2316 input_objects,
2317 receiving_objects,
2318 dummy_gas_object,
2319 &self.metrics.bytecode_verifier_metrics,
2320 &self.config.verifier_signing_config,
2321 )?
2322 } else {
2323 let authenticator_gas_budget = 0;
2326
2327 iota_transaction_checks::check_transaction_input(
2328 epoch_store.protocol_config(),
2329 reference_gas_price,
2330 &transaction,
2331 input_objects,
2332 &receiving_objects,
2333 &self.metrics.bytecode_verifier_metrics,
2334 &self.config.verifier_signing_config,
2335 authenticator_gas_budget,
2336 )?
2337 }
2338 };
2339
2340 let executor = iota_execution::executor(protocol_config, true, None)
2341 .expect("Creating an executor should not fail here");
2342 let gas_data = transaction.gas_data().clone();
2343 let intent_msg = IntentMessage::new(
2344 Intent {
2345 version: IntentVersion::V0,
2346 scope: IntentScope::TransactionData,
2347 app_id: IntentAppId::Iota,
2348 },
2349 transaction,
2350 );
2351 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2352 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2353 self.get_backing_store().as_ref(),
2354 protocol_config,
2355 self.metrics.limits_metrics.clone(),
2356 false,
2358 self.config.certificate_deny_config.certificate_deny_set(),
2359 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2360 epoch_store
2361 .epoch_start_config()
2362 .epoch_data()
2363 .epoch_start_timestamp(),
2364 checked_input_objects,
2365 gas_data,
2366 gas_status,
2367 transaction_kind,
2368 sender,
2369 transaction_digest,
2370 skip_checks,
2371 );
2372
2373 let raw_effects = if show_raw_txn_data_and_effects {
2374 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2375 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2376 })?
2377 } else {
2378 vec![]
2379 };
2380
2381 let mut layout_resolver =
2382 epoch_store
2383 .executor()
2384 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2385 &inner_temp_store,
2386 self.get_backing_package_store(),
2387 )));
2388
2389 DevInspectResults::new(
2390 effects,
2391 inner_temp_store.events.clone(),
2392 execution_result,
2393 raw_txn_data,
2394 raw_effects,
2395 layout_resolver.as_mut(),
2396 )
2397 }
2398
2399 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2401 let epoch_store = self.epoch_store_for_testing();
2402 Ok(epoch_store.reference_gas_price())
2403 }
2404
2405 #[instrument(level = "trace", skip_all)]
2406 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2407 self.get_transaction_cache_reader()
2408 .try_is_tx_already_executed(digest)
2409 }
2410
2411 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2413 self.try_is_tx_already_executed(digest)
2414 .expect("storage access failed")
2415 }
2416
2417 #[instrument(level = "debug", skip_all, err)]
2419 fn index_tx(
2420 &self,
2421 indexes: &IndexStore,
2422 digest: &TransactionDigest,
2423 cert: &VerifiedExecutableTransaction,
2425 effects: &TransactionEffects,
2426 events: &TransactionEvents,
2427 timestamp_ms: u64,
2428 tx_coins: Option<TxCoins>,
2429 written: &WrittenObjects,
2430 inner_temporary_store: &InnerTemporaryStore,
2431 ) -> IotaResult<u64> {
2432 let changes = self
2433 .process_object_index(effects, written, inner_temporary_store)
2434 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2435
2436 indexes.index_tx(
2437 cert.data().intent_message().value.sender(),
2438 cert.data()
2439 .intent_message()
2440 .value
2441 .input_objects()?
2442 .iter()
2443 .map(|o| o.object_id()),
2444 effects
2445 .all_changed_objects()
2446 .into_iter()
2447 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2448 cert.data()
2449 .intent_message()
2450 .value
2451 .move_calls()
2452 .into_iter()
2453 .map(|(package, module, function)| {
2454 (*package, module.to_owned(), function.to_owned())
2455 }),
2456 events,
2457 changes,
2458 digest,
2459 timestamp_ms,
2460 tx_coins,
2461 )
2462 }
2463
2464 #[cfg(msim)]
2465 fn create_fail_state(
2466 &self,
2467 certificate: &VerifiedExecutableTransaction,
2468 epoch_store: &Arc<AuthorityPerEpochStore>,
2469 effects: &mut TransactionEffects,
2470 ) {
2471 use std::cell::RefCell;
2472 thread_local! {
2473 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2474 }
2475 if !certificate.data().intent_message().value.is_system_tx() {
2476 let committee = epoch_store.committee();
2477 let cur_stake = (**committee).weight(&self.name);
2478 if cur_stake > 0 {
2479 FAIL_STATE.with_borrow_mut(|fail_state| {
2480 if fail_state.0 < committee.validity_threshold() {
2482 fail_state.0 += cur_stake;
2483 fail_state.1.insert(self.name);
2484 }
2485
2486 if fail_state.1.contains(&self.name) {
2487 info!("cp_exec failing tx");
2488 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2489 }
2490 });
2491 }
2492 }
2493 }
2494
2495 fn process_object_index(
2496 &self,
2497 effects: &TransactionEffects,
2498 written: &WrittenObjects,
2499 inner_temporary_store: &InnerTemporaryStore,
2500 ) -> IotaResult<ObjectIndexChanges> {
2501 let epoch_store = self.load_epoch_store_one_call_per_task();
2502 let mut layout_resolver =
2503 epoch_store
2504 .executor()
2505 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2506 inner_temporary_store,
2507 self.get_backing_package_store(),
2508 )));
2509
2510 let modified_at_version = effects
2511 .modified_at_versions()
2512 .into_iter()
2513 .collect::<HashMap<_, _>>();
2514
2515 let tx_digest = effects.transaction_digest();
2516 let mut deleted_owners = vec![];
2517 let mut deleted_dynamic_fields = vec![];
2518 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2519 let old_version = modified_at_version.get(&id).unwrap();
2520 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2523 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2524 ) {
2525 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2526 Owner::ObjectOwner(object_id) => {
2527 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2528 }
2529 _ => {}
2530 }
2531 }
2532
2533 let mut new_owners = vec![];
2534 let mut new_dynamic_fields = vec![];
2535
2536 for (oref, owner, kind) in effects.all_changed_objects() {
2537 let id = &oref.0;
2538 if let WriteKind::Mutate = kind {
2541 let Some(old_version) = modified_at_version.get(id) else {
2542 panic!(
2543 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2544 );
2545 };
2546 let Some(old_object) = self
2549 .get_object_store()
2550 .try_get_object_by_key(id, *old_version)?
2551 else {
2552 panic!(
2553 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2554 );
2555 };
2556 if old_object.owner != owner {
2557 match old_object.owner {
2558 Owner::AddressOwner(addr) => {
2559 deleted_owners.push((addr, *id));
2560 }
2561 Owner::ObjectOwner(object_id) => {
2562 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2563 }
2564 _ => {}
2565 }
2566 }
2567 }
2568
2569 match owner {
2570 Owner::AddressOwner(addr) => {
2571 let new_object = written.get(id).unwrap_or_else(
2574 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2575 );
2576 assert_eq!(
2577 new_object.version(),
2578 oref.1,
2579 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2580 tx_digest,
2581 id,
2582 new_object.version(),
2583 oref.1
2584 );
2585
2586 let type_ = new_object
2587 .type_()
2588 .map(|type_| ObjectType::Struct(type_.clone()))
2589 .unwrap_or(ObjectType::Package);
2590
2591 new_owners.push((
2592 (addr, *id),
2593 ObjectInfo {
2594 object_id: *id,
2595 version: oref.1,
2596 digest: oref.2,
2597 type_,
2598 owner,
2599 previous_transaction: *effects.transaction_digest(),
2600 },
2601 ));
2602 }
2603 Owner::ObjectOwner(owner) => {
2604 let new_object = written.get(id).unwrap_or_else(
2605 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2606 );
2607 assert_eq!(
2608 new_object.version(),
2609 oref.1,
2610 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2611 tx_digest,
2612 id,
2613 new_object.version(),
2614 oref.1
2615 );
2616
2617 let Some(df_info) = self
2618 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2619 .unwrap_or_else(|e| {
2620 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2621 None
2622 }
2623 )
2624 else {
2625 continue;
2627 };
2628 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2629 }
2630 _ => {}
2631 }
2632 }
2633
2634 Ok(ObjectIndexChanges {
2635 deleted_owners,
2636 deleted_dynamic_fields,
2637 new_owners,
2638 new_dynamic_fields,
2639 })
2640 }
2641
2642 fn try_create_dynamic_field_info(
2643 &self,
2644 o: &Object,
2645 written: &WrittenObjects,
2646 resolver: &mut dyn LayoutResolver,
2647 ) -> IotaResult<Option<DynamicFieldInfo>> {
2648 let Some(move_object) = o.data.try_as_move().cloned() else {
2650 return Ok(None);
2651 };
2652
2653 if !move_object.type_().is_dynamic_field() {
2655 return Ok(None);
2656 }
2657
2658 let layout = resolver
2659 .get_annotated_layout(&move_object.type_().clone().into())?
2660 .into_layout();
2661
2662 let field =
2663 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2664 IotaError::ObjectDeserialization {
2665 error: e.to_string(),
2666 }
2667 })?;
2668
2669 let type_ = field.kind;
2670 let name_type: TypeTag = field.name_layout.into();
2671 let bcs_name = field.name_bytes.to_owned();
2672
2673 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2674 .map_err(|e| {
2675 warn!("{e}");
2676 IotaError::ObjectDeserialization {
2677 error: e.to_string(),
2678 }
2679 })?;
2680
2681 let name = DynamicFieldName {
2682 type_: name_type,
2683 value: IotaMoveValue::from(name_value).to_json_value(),
2684 };
2685
2686 let value_metadata = field.value_metadata().map_err(|e| {
2687 warn!("{e}");
2688 IotaError::ObjectDeserialization {
2689 error: e.to_string(),
2690 }
2691 })?;
2692
2693 Ok(Some(match value_metadata {
2694 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2695 name,
2696 bcs_name,
2697 type_,
2698 object_type: object_type.to_canonical_string(true),
2699 object_id: o.id(),
2700 version: o.version(),
2701 digest: o.digest(),
2702 },
2703
2704 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2705 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2710 (
2711 object.version(),
2712 object.digest(),
2713 object.data.type_().unwrap().clone(),
2714 )
2715 } else {
2716 let object = self
2718 .get_object_store()
2719 .try_get_object_by_key(&object_id, o.version())?
2720 .ok_or_else(|| UserInputError::ObjectNotFound {
2721 object_id,
2722 version: Some(o.version()),
2723 })?;
2724 let version = object.version();
2725 let digest = object.digest();
2726 let object_type = object.data.type_().unwrap().clone();
2727 (version, digest, object_type)
2728 };
2729
2730 DynamicFieldInfo {
2731 name,
2732 bcs_name,
2733 type_,
2734 object_type: object_type.to_string(),
2735 object_id,
2736 version,
2737 digest,
2738 }
2739 }
2740 }))
2741 }
2742
2743 #[instrument(level = "trace", skip_all, err)]
2744 fn post_process_one_tx(
2745 &self,
2746 certificate: &VerifiedExecutableTransaction,
2747 effects: &TransactionEffects,
2748 inner_temporary_store: &InnerTemporaryStore,
2749 epoch_store: &Arc<AuthorityPerEpochStore>,
2750 ) -> IotaResult {
2751 if self.indexes.is_none() {
2752 return Ok(());
2753 }
2754
2755 let _scope = monitored_scope("Execution::post_process_one_tx");
2756
2757 let tx_digest = certificate.digest();
2758 let timestamp_ms = Self::unixtime_now_ms();
2759 let events = &inner_temporary_store.events;
2760 let written = &inner_temporary_store.written;
2761 let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2762 effects,
2763 inner_temporary_store,
2764 epoch_store,
2765 );
2766
2767 if let Some(indexes) = &self.indexes {
2769 let _ = self
2770 .index_tx(
2771 indexes.as_ref(),
2772 tx_digest,
2773 certificate,
2774 effects,
2775 events,
2776 timestamp_ms,
2777 tx_coins,
2778 written,
2779 inner_temporary_store,
2780 )
2781 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2782 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2783 .expect("Indexing tx should not fail");
2784
2785 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2786 let events = self.make_transaction_block_events(
2787 events.clone(),
2788 *tx_digest,
2789 timestamp_ms,
2790 epoch_store,
2791 inner_temporary_store,
2792 )?;
2793 self.subscription_handler
2795 .process_tx(certificate.data().transaction_data(), &effects, &events)
2796 .tap_ok(|_| {
2797 self.metrics
2798 .post_processing_total_tx_had_event_processed
2799 .inc()
2800 })
2801 .tap_err(|e| {
2802 warn!(
2803 ?tx_digest,
2804 "Post processing - Couldn't process events for tx: {}", e
2805 )
2806 })?;
2807
2808 self.metrics
2809 .post_processing_total_events_emitted
2810 .inc_by(events.data.len() as u64);
2811 };
2812 Ok(())
2813 }
2814
2815 fn make_transaction_block_events(
2816 &self,
2817 transaction_events: TransactionEvents,
2818 digest: TransactionDigest,
2819 timestamp_ms: u64,
2820 epoch_store: &Arc<AuthorityPerEpochStore>,
2821 inner_temporary_store: &InnerTemporaryStore,
2822 ) -> IotaResult<IotaTransactionBlockEvents> {
2823 let mut layout_resolver =
2824 epoch_store
2825 .executor()
2826 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2827 inner_temporary_store,
2828 self.get_backing_package_store(),
2829 )));
2830 IotaTransactionBlockEvents::try_from(
2831 transaction_events,
2832 digest,
2833 Some(timestamp_ms),
2834 layout_resolver.as_mut(),
2835 )
2836 }
2837
2838 pub fn unixtime_now_ms() -> u64 {
2839 let now = SystemTime::now()
2840 .duration_since(UNIX_EPOCH)
2841 .expect("Time went backwards")
2842 .as_millis();
2843 u64::try_from(now).expect("Travelling in time machine")
2844 }
2845
2846 #[instrument(level = "trace", skip_all)]
2847 pub async fn handle_transaction_info_request(
2848 &self,
2849 request: TransactionInfoRequest,
2850 ) -> IotaResult<TransactionInfoResponse> {
2851 let epoch_store = self.load_epoch_store_one_call_per_task();
2852 let (transaction, status) = self
2853 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2854 .ok_or(IotaError::TransactionNotFound {
2855 digest: request.transaction_digest,
2856 })?;
2857 Ok(TransactionInfoResponse {
2858 transaction,
2859 status,
2860 })
2861 }
2862
2863 #[instrument(level = "trace", skip_all)]
2864 pub async fn handle_object_info_request(
2865 &self,
2866 request: ObjectInfoRequest,
2867 ) -> IotaResult<ObjectInfoResponse> {
2868 let epoch_store = self.load_epoch_store_one_call_per_task();
2869
2870 let requested_object_seq = match request.request_kind {
2871 ObjectInfoRequestKind::LatestObjectInfo => {
2872 let (_, seq, _) = self
2873 .try_get_object_or_tombstone(request.object_id)
2874 .await?
2875 .ok_or_else(|| {
2876 IotaError::from(UserInputError::ObjectNotFound {
2877 object_id: request.object_id,
2878 version: None,
2879 })
2880 })?;
2881 seq
2882 }
2883 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2884 };
2885
2886 let object = self
2887 .get_object_store()
2888 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2889 .ok_or_else(|| {
2890 IotaError::from(UserInputError::ObjectNotFound {
2891 object_id: request.object_id,
2892 version: Some(requested_object_seq),
2893 })
2894 })?;
2895
2896 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2897 (request.generate_layout, object.data.try_as_move())
2898 {
2899 Some(into_struct_layout(
2900 epoch_store
2901 .executor()
2902 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2903 .get_annotated_layout(&move_obj.type_().clone().into())?,
2904 )?)
2905 } else {
2906 None
2907 };
2908
2909 let lock = if !object.is_address_owned() {
2910 None
2912 } else {
2913 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2914 .await?
2915 .map(|s| s.into_inner())
2916 };
2917
2918 Ok(ObjectInfoResponse {
2919 object,
2920 layout,
2921 lock_for_debugging: lock,
2922 })
2923 }
2924
2925 #[instrument(level = "trace", skip_all)]
2926 pub fn handle_checkpoint_request(
2927 &self,
2928 request: &CheckpointRequest,
2929 ) -> IotaResult<CheckpointResponse> {
2930 let summary = if request.certified {
2931 let summary = match request.sequence_number {
2932 Some(seq) => self
2933 .checkpoint_store
2934 .get_checkpoint_by_sequence_number(seq)?,
2935 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2936 }
2937 .map(|v| v.into_inner());
2938 summary.map(CheckpointSummaryResponse::Certified)
2939 } else {
2940 let summary = match request.sequence_number {
2941 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2942 None => self
2943 .checkpoint_store
2944 .get_latest_locally_computed_checkpoint()?,
2945 };
2946 summary.map(CheckpointSummaryResponse::Pending)
2947 };
2948 let contents = match &summary {
2949 Some(s) => self
2950 .checkpoint_store
2951 .get_checkpoint_contents(&s.content_digest())?,
2952 None => None,
2953 };
2954 Ok(CheckpointResponse {
2955 checkpoint: summary,
2956 contents,
2957 })
2958 }
2959
2960 fn check_protocol_version(
2961 supported_protocol_versions: SupportedProtocolVersions,
2962 current_version: ProtocolVersion,
2963 ) {
2964 info!("current protocol version is now {:?}", current_version);
2965 info!("supported versions are: {:?}", supported_protocol_versions);
2966 if !supported_protocol_versions.is_version_supported(current_version) {
2967 let msg = format!(
2968 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2969 );
2970
2971 error!("{}", msg);
2972 eprintln!("{msg}");
2973
2974 #[cfg(not(msim))]
2975 std::process::exit(1);
2976
2977 #[cfg(msim)]
2978 iota_simulator::task::shutdown_current_node();
2979 }
2980 }
2981
2982 #[expect(clippy::disallowed_methods)] pub async fn new(
2984 name: AuthorityName,
2985 secret: StableSyncAuthoritySigner,
2986 supported_protocol_versions: SupportedProtocolVersions,
2987 store: Arc<AuthorityStore>,
2988 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2989 epoch_store: Arc<AuthorityPerEpochStore>,
2990 committee_store: Arc<CommitteeStore>,
2991 indexes: Option<Arc<IndexStore>>,
2992 rest_index: Option<Arc<RestIndexStore>>,
2993 checkpoint_store: Arc<CheckpointStore>,
2994 prometheus_registry: &Registry,
2995 genesis_objects: &[Object],
2996 db_checkpoint_config: &DBCheckpointConfig,
2997 config: NodeConfig,
2998 archive_readers: ArchiveReaderBalancer,
2999 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3000 chain_identifier: ChainIdentifier,
3001 pruner_db: Option<Arc<AuthorityPrunerTables>>,
3002 ) -> Arc<Self> {
3003 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3004
3005 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3006 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3007 let transaction_manager = Arc::new(TransactionManager::new(
3008 execution_cache_trait_pointers.object_cache_reader.clone(),
3009 execution_cache_trait_pointers
3010 .transaction_cache_reader
3011 .clone(),
3012 &epoch_store,
3013 tx_ready_certificates,
3014 metrics.clone(),
3015 ));
3016 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3017
3018 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3019 epoch_store.get_parent_path(),
3020 &config.authority_store_pruning_config,
3021 );
3022 let _pruner = AuthorityStorePruner::new(
3023 store.perpetual_tables.clone(),
3024 checkpoint_store.clone(),
3025 rest_index.clone(),
3026 indexes.clone(),
3027 config.authority_store_pruning_config.clone(),
3028 epoch_store.committee().authority_exists(&name),
3029 epoch_store.epoch_start_state().epoch_duration_ms(),
3030 prometheus_registry,
3031 archive_readers,
3032 pruner_db,
3033 );
3034 let input_loader =
3035 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3036 let epoch = epoch_store.epoch();
3037 let rgp = epoch_store.reference_gas_price();
3038 let state = Arc::new(AuthorityState {
3039 name,
3040 secret,
3041 execution_lock: RwLock::new(epoch),
3042 epoch_store: ArcSwap::new(epoch_store.clone()),
3043 input_loader,
3044 execution_cache_trait_pointers,
3045 indexes,
3046 rest_index,
3047 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3048 checkpoint_store,
3049 committee_store,
3050 transaction_manager,
3051 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3052 metrics,
3053 _pruner,
3054 _authority_per_epoch_pruner,
3055 db_checkpoint_config: db_checkpoint_config.clone(),
3056 config,
3057 overload_info: AuthorityOverloadInfo::default(),
3058 validator_tx_finalizer,
3059 chain_identifier,
3060 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3061 });
3062
3063 let authority_state = Arc::downgrade(&state);
3065 spawn_monitored_task!(execution_process(
3066 authority_state,
3067 rx_ready_certificates,
3068 rx_execution_shutdown,
3069 ));
3070 spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3071
3072 state
3074 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3075 .expect("Error indexing genesis objects.");
3076
3077 state
3078 }
3079
3080 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3082 &self.execution_cache_trait_pointers.object_cache_reader
3083 }
3084
3085 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3086 &self.execution_cache_trait_pointers.transaction_cache_reader
3087 }
3088
3089 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3090 &self.execution_cache_trait_pointers.cache_writer
3091 }
3092
3093 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3094 &self.execution_cache_trait_pointers.backing_store
3095 }
3096
3097 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3098 &self.execution_cache_trait_pointers.backing_package_store
3099 }
3100
3101 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3102 &self.execution_cache_trait_pointers.object_store
3103 }
3104
3105 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3106 &self.execution_cache_trait_pointers.reconfig_api
3107 }
3108
3109 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3110 &self.execution_cache_trait_pointers.accumulator_store
3111 }
3112
3113 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3114 &self.execution_cache_trait_pointers.checkpoint_cache
3115 }
3116
3117 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3118 &self.execution_cache_trait_pointers.state_sync_store
3119 }
3120
3121 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3122 &self.execution_cache_trait_pointers.cache_commit
3123 }
3124
3125 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3126 self.execution_cache_trait_pointers
3127 .testing_api
3128 .database_for_testing()
3129 }
3130
3131 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3132 &self,
3133 config: NodeConfig,
3134 metrics: Arc<AuthorityStorePruningMetrics>,
3135 ) -> anyhow::Result<()> {
3136 let archive_readers =
3137 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3138 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3139 &self.database_for_testing().perpetual_tables,
3140 &self.checkpoint_store,
3141 self.rest_index.as_deref(),
3142 None,
3143 config.authority_store_pruning_config,
3144 metrics,
3145 archive_readers,
3146 EPOCH_DURATION_MS_FOR_TESTING,
3147 )
3148 .await
3149 }
3150
3151 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3152 &self.transaction_manager
3153 }
3154
3155 pub fn enqueue_transactions_for_execution(
3158 &self,
3159 txns: Vec<VerifiedExecutableTransaction>,
3160 epoch_store: &Arc<AuthorityPerEpochStore>,
3161 ) {
3162 self.transaction_manager.enqueue(txns, epoch_store)
3163 }
3164
3165 pub fn enqueue_certificates_for_execution(
3167 &self,
3168 certs: Vec<VerifiedCertificate>,
3169 epoch_store: &Arc<AuthorityPerEpochStore>,
3170 ) {
3171 self.transaction_manager
3172 .enqueue_certificates(certs, epoch_store)
3173 }
3174
3175 pub fn enqueue_with_expected_effects_digest(
3176 &self,
3177 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3178 epoch_store: &AuthorityPerEpochStore,
3179 ) {
3180 self.transaction_manager
3181 .enqueue_with_expected_effects_digest(certs, epoch_store)
3182 }
3183
3184 fn create_owner_index_if_empty(
3185 &self,
3186 genesis_objects: &[Object],
3187 epoch_store: &Arc<AuthorityPerEpochStore>,
3188 ) -> IotaResult {
3189 let Some(index_store) = &self.indexes else {
3190 return Ok(());
3191 };
3192 if !index_store.is_empty() {
3193 return Ok(());
3194 }
3195
3196 let mut new_owners = vec![];
3197 let mut new_dynamic_fields = vec![];
3198 let mut layout_resolver = epoch_store
3199 .executor()
3200 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3201 for o in genesis_objects.iter() {
3202 match o.owner {
3203 Owner::AddressOwner(addr) => new_owners.push((
3204 (addr, o.id()),
3205 ObjectInfo::new(&o.compute_object_reference(), o),
3206 )),
3207 Owner::ObjectOwner(object_id) => {
3208 let id = o.id();
3209 let info = match self.try_create_dynamic_field_info(
3210 o,
3211 &BTreeMap::new(),
3212 layout_resolver.as_mut(),
3213 ) {
3214 Ok(Some(info)) => info,
3215 Ok(None) => continue,
3216 Err(IotaError::UserInput {
3217 error:
3218 UserInputError::ObjectNotFound {
3219 object_id: not_found_id,
3220 version,
3221 },
3222 }) => {
3223 warn!(
3224 ?not_found_id,
3225 ?version,
3226 object_owner=?object_id,
3227 field=?id,
3228 "Skipping dynamic field: referenced genesis object not found"
3229 );
3230 continue;
3231 }
3232 Err(e) => return Err(e),
3233 };
3234 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3235 }
3236 _ => {}
3237 }
3238 }
3239
3240 index_store.insert_genesis_objects(ObjectIndexChanges {
3241 deleted_owners: vec![],
3242 deleted_dynamic_fields: vec![],
3243 new_owners,
3244 new_dynamic_fields,
3245 })
3246 }
3247
3248 pub fn execution_lock_for_executable_transaction(
3252 &self,
3253 transaction: &VerifiedExecutableTransaction,
3254 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3255 let lock = self
3256 .execution_lock
3257 .try_read()
3258 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3259 if *lock == transaction.auth_sig().epoch() {
3260 Ok(lock)
3261 } else {
3262 Err(IotaError::WrongEpoch {
3263 expected_epoch: *lock,
3264 actual_epoch: transaction.auth_sig().epoch(),
3265 })
3266 }
3267 }
3268
3269 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3275 self.execution_lock
3276 .try_read()
3277 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3278 }
3279
3280 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3281 self.execution_lock.write().await
3282 }
3283
3284 #[instrument(level = "error", skip_all)]
3285 pub async fn reconfigure(
3286 &self,
3287 cur_epoch_store: &AuthorityPerEpochStore,
3288 supported_protocol_versions: SupportedProtocolVersions,
3289 new_committee: Committee,
3290 epoch_start_configuration: EpochStartConfiguration,
3291 accumulator: Arc<StateAccumulator>,
3292 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3293 epoch_supply_change: i64,
3294 epoch_last_checkpoint: CheckpointSequenceNumber,
3295 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3296 Self::check_protocol_version(
3297 supported_protocol_versions,
3298 epoch_start_configuration
3299 .epoch_start_state()
3300 .protocol_version(),
3301 );
3302 self.metrics.reset_on_reconfigure();
3303 self.committee_store.insert_new_committee(&new_committee)?;
3304
3305 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3307
3308 cur_epoch_store.epoch_terminated().await;
3310
3311 let highest_locally_built_checkpoint_seq = self
3312 .checkpoint_store
3313 .get_latest_locally_computed_checkpoint()?
3314 .map(|c| *c.sequence_number())
3315 .unwrap_or(0);
3316
3317 assert!(
3318 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3319 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3320 );
3321 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3322 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3326 if num_shared_version_assignments > 1 {
3329 debug_fatal!(
3331 "all shared_version_assignments should have been removed \
3332 (num_shared_version_assignments: {num_shared_version_assignments})"
3333 );
3334 }
3335 }
3336
3337 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3343 .await?;
3344 self.get_reconfig_api()
3345 .clear_state_end_of_epoch(&execution_lock);
3346 self.check_system_consistency(
3347 cur_epoch_store,
3348 accumulator,
3349 expensive_safety_check_config,
3350 epoch_supply_change,
3351 )?;
3352 self.get_reconfig_api()
3353 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3354 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3355 if self
3356 .db_checkpoint_config
3357 .perform_db_checkpoints_at_epoch_end
3358 {
3359 let checkpoint_indexes = self
3360 .db_checkpoint_config
3361 .perform_index_db_checkpoints_at_epoch_end
3362 .unwrap_or(false);
3363 let current_epoch = cur_epoch_store.epoch();
3364 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3365 self.checkpoint_all_dbs(
3366 &epoch_checkpoint_path,
3367 cur_epoch_store,
3368 checkpoint_indexes,
3369 )?;
3370 }
3371 }
3372
3373 self.get_reconfig_api()
3374 .reconfigure_cache(&epoch_start_configuration)
3375 .await;
3376
3377 let new_epoch = new_committee.epoch;
3378 let new_epoch_store = self
3379 .reopen_epoch_db(
3380 cur_epoch_store,
3381 new_committee,
3382 epoch_start_configuration,
3383 expensive_safety_check_config,
3384 epoch_last_checkpoint,
3385 )
3386 .await?;
3387 assert_eq!(new_epoch_store.epoch(), new_epoch);
3388 self.transaction_manager.reconfigure(new_epoch);
3389 *execution_lock = new_epoch;
3390 Ok(new_epoch_store)
3394 }
3395
3396 pub async fn reconfigure_for_testing(&self) {
3401 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3402 let epoch_store = self.epoch_store_for_testing().clone();
3403 let protocol_config = epoch_store.protocol_config().clone();
3404 let _guard =
3412 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3413 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3414 self.get_backing_package_store().clone(),
3415 self.get_object_store().clone(),
3416 &self.config.expensive_safety_check_config,
3417 self.checkpoint_store
3418 .get_epoch_last_checkpoint(epoch_store.epoch())
3419 .unwrap()
3420 .map(|c| *c.sequence_number())
3421 .unwrap_or_default(),
3422 );
3423 let new_epoch = new_epoch_store.epoch();
3424 self.transaction_manager.reconfigure(new_epoch);
3425 self.epoch_store.store(new_epoch_store);
3426 epoch_store.epoch_terminated().await;
3427 *execution_lock = new_epoch;
3428 }
3429
3430 #[instrument(level = "error", skip_all)]
3431 fn check_system_consistency(
3432 &self,
3433 cur_epoch_store: &AuthorityPerEpochStore,
3434 accumulator: Arc<StateAccumulator>,
3435 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3436 epoch_supply_change: i64,
3437 ) -> IotaResult<()> {
3438 info!(
3439 "Performing iota conservation consistency check for epoch {}",
3440 cur_epoch_store.epoch()
3441 );
3442
3443 if cfg!(debug_assertions) {
3444 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3445 }
3446
3447 self.get_reconfig_api()
3448 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3449
3450 if expensive_safety_check_config.enable_state_consistency_check() {
3452 info!(
3453 "Performing state consistency check for epoch {}",
3454 cur_epoch_store.epoch()
3455 );
3456 self.expensive_check_is_consistent_state(
3457 accumulator,
3458 cur_epoch_store,
3459 cfg!(debug_assertions), );
3461 }
3462
3463 if expensive_safety_check_config.enable_secondary_index_checks() {
3464 if let Some(indexes) = self.indexes.clone() {
3465 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3466 .expect("secondary indexes are inconsistent");
3467 }
3468 }
3469
3470 Ok(())
3471 }
3472
3473 fn expensive_check_is_consistent_state(
3474 &self,
3475 accumulator: Arc<StateAccumulator>,
3476 cur_epoch_store: &AuthorityPerEpochStore,
3477 panic: bool,
3478 ) {
3479 let live_object_set_hash = accumulator.digest_live_object_set();
3480
3481 let root_state_hash: ECMHLiveObjectSetDigest = self
3482 .get_accumulator_store()
3483 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3484 .expect("Retrieving root state hash cannot fail")
3485 .expect("Root state hash for epoch must exist")
3486 .1
3487 .digest()
3488 .into();
3489
3490 let is_inconsistent = root_state_hash != live_object_set_hash;
3491 if is_inconsistent {
3492 if panic {
3493 panic!(
3494 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3495 );
3496 } else {
3497 error!(
3498 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3499 root_state_hash, live_object_set_hash
3500 );
3501 }
3502 } else {
3503 info!("State consistency check passed");
3504 }
3505
3506 if !panic {
3507 accumulator.set_inconsistent_state(is_inconsistent);
3508 }
3509 }
3510
3511 pub fn current_epoch_for_testing(&self) -> EpochId {
3512 self.epoch_store_for_testing().epoch()
3513 }
3514
3515 #[instrument(level = "error", skip_all)]
3516 pub fn checkpoint_all_dbs(
3517 &self,
3518 checkpoint_path: &Path,
3519 cur_epoch_store: &AuthorityPerEpochStore,
3520 checkpoint_indexes: bool,
3521 ) -> IotaResult {
3522 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3523 let current_epoch = cur_epoch_store.epoch();
3524
3525 if checkpoint_path.exists() {
3526 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3527 return Ok(());
3528 }
3529
3530 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3531 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3532
3533 if checkpoint_path_tmp.exists() {
3534 fs::remove_dir_all(&checkpoint_path_tmp)
3535 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3536 }
3537
3538 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3539 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3540
3541 self.checkpoint_store
3544 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3545
3546 self.get_reconfig_api()
3547 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3548
3549 self.committee_store
3550 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3551
3552 if checkpoint_indexes {
3553 if let Some(indexes) = self.indexes.as_ref() {
3554 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3555 }
3556 if let Some(rest_index) = self.rest_index.as_ref() {
3557 rest_index.checkpoint_db(&checkpoint_path_tmp.join("grpc_indexes"))?;
3558 }
3559 }
3560
3561 fs::rename(checkpoint_path_tmp, checkpoint_path)
3562 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3563 Ok(())
3564 }
3565
3566 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3573 self.epoch_store.load()
3574 }
3575
3576 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3578 self.load_epoch_store_one_call_per_task()
3579 }
3580
3581 pub fn clone_committee_for_testing(&self) -> Committee {
3582 Committee::clone(self.epoch_store_for_testing().committee())
3583 }
3584
3585 #[instrument(level = "trace", skip_all)]
3586 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3587 self.get_object_store()
3588 .try_get_object(object_id)
3589 .map_err(Into::into)
3590 }
3591
3592 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3594 self.try_get_object(object_id)
3595 .await
3596 .expect("storage access failed")
3597 }
3598
3599 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3600 Ok(self
3601 .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3602 .await?
3603 .expect("framework object should always exist")
3604 .compute_object_reference())
3605 }
3606
3607 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3609 self.get_object_cache_reader()
3610 .try_get_iota_system_state_object_unsafe()
3611 }
3612
3613 #[instrument(level = "trace", skip_all)]
3614 pub fn get_checkpoint_by_sequence_number(
3615 &self,
3616 sequence_number: CheckpointSequenceNumber,
3617 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3618 Ok(self
3619 .checkpoint_store
3620 .get_checkpoint_by_sequence_number(sequence_number)?)
3621 }
3622
3623 #[instrument(level = "trace", skip_all)]
3624 pub fn get_transaction_checkpoint_for_tests(
3625 &self,
3626 digest: &TransactionDigest,
3627 epoch_store: &AuthorityPerEpochStore,
3628 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3629 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3630 let Some(checkpoint) = checkpoint else {
3631 return Ok(None);
3632 };
3633 let checkpoint = self
3634 .checkpoint_store
3635 .get_checkpoint_by_sequence_number(checkpoint)?;
3636 Ok(checkpoint)
3637 }
3638
3639 #[instrument(level = "trace", skip_all)]
3640 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3641 Ok(
3642 match self
3643 .get_object_cache_reader()
3644 .try_get_latest_object_or_tombstone(*object_id)?
3645 {
3646 Some((_, ObjectOrTombstone::Object(object))) => {
3647 let layout = self.get_object_layout(&object)?;
3648 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3649 }
3650 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3651 None => ObjectRead::NotExists(*object_id),
3652 },
3653 )
3654 }
3655
3656 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3658 self.chain_identifier
3659 }
3660
3661 #[instrument(level = "trace", skip_all)]
3662 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3663 where
3664 T: DeserializeOwned,
3665 {
3666 let o = self.get_object_read(object_id)?.into_object()?;
3667 if let Some(move_object) = o.data.try_as_move() {
3668 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3669 IotaError::ObjectDeserialization {
3670 error: format!("{e}"),
3671 }
3672 })?)
3673 } else {
3674 Err(IotaError::ObjectDeserialization {
3675 error: format!("Provided object : [{object_id}] is not a Move object."),
3676 })
3677 }
3678 }
3679
3680 #[instrument(level = "trace", skip_all)]
3686 pub fn get_past_object_read(
3687 &self,
3688 object_id: &ObjectID,
3689 version: SequenceNumber,
3690 ) -> IotaResult<PastObjectRead> {
3691 let Some(obj_ref) = self
3693 .get_object_cache_reader()
3694 .try_get_latest_object_ref_or_tombstone(*object_id)?
3695 else {
3696 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3697 };
3698
3699 if version > obj_ref.1 {
3700 return Ok(PastObjectRead::VersionTooHigh {
3701 object_id: *object_id,
3702 asked_version: version,
3703 latest_version: obj_ref.1,
3704 });
3705 }
3706
3707 if version < obj_ref.1 {
3708 return Ok(match self.read_object_at_version(object_id, version)? {
3710 Some((object, layout)) => {
3711 let obj_ref = object.compute_object_reference();
3712 PastObjectRead::VersionFound(obj_ref, object, layout)
3713 }
3714
3715 None => PastObjectRead::VersionNotFound(*object_id, version),
3716 });
3717 }
3718
3719 if !obj_ref.2.is_alive() {
3720 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3721 }
3722
3723 match self.read_object_at_version(object_id, obj_ref.1)? {
3724 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3725 None => {
3726 error!(
3727 "Object with in parent_entry is missing from object store, datastore is \
3728 inconsistent",
3729 );
3730 Err(UserInputError::ObjectNotFound {
3731 object_id: *object_id,
3732 version: Some(obj_ref.1),
3733 }
3734 .into())
3735 }
3736 }
3737 }
3738
3739 #[instrument(level = "trace", skip_all)]
3740 fn read_object_at_version(
3741 &self,
3742 object_id: &ObjectID,
3743 version: SequenceNumber,
3744 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3745 let Some(object) = self
3746 .get_object_cache_reader()
3747 .try_get_object_by_key(object_id, version)?
3748 else {
3749 return Ok(None);
3750 };
3751
3752 let layout = self.get_object_layout(&object)?;
3753 Ok(Some((object, layout)))
3754 }
3755
3756 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3757 let layout = object
3758 .data
3759 .try_as_move()
3760 .map(|object| {
3761 into_struct_layout(
3762 self.load_epoch_store_one_call_per_task()
3763 .executor()
3764 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3766 .get_annotated_layout(&object.type_().clone().into())?,
3767 )
3768 })
3769 .transpose()?;
3770 Ok(layout)
3771 }
3772
3773 fn get_owner_at_version(
3774 &self,
3775 object_id: &ObjectID,
3776 version: SequenceNumber,
3777 ) -> IotaResult<Owner> {
3778 self.get_object_store()
3779 .try_get_object_by_key(object_id, version)?
3780 .ok_or_else(|| {
3781 IotaError::from(UserInputError::ObjectNotFound {
3782 object_id: *object_id,
3783 version: Some(version),
3784 })
3785 })
3786 .map(|o| o.owner)
3787 }
3788
3789 #[instrument(level = "trace", skip_all)]
3790 pub fn get_owner_objects(
3791 &self,
3792 owner: IotaAddress,
3793 cursor: Option<ObjectID>,
3795 limit: usize,
3796 filter: Option<IotaObjectDataFilter>,
3797 ) -> IotaResult<Vec<ObjectInfo>> {
3798 if let Some(indexes) = &self.indexes {
3799 indexes.get_owner_objects(owner, cursor, limit, filter)
3800 } else {
3801 Err(IotaError::IndexStoreNotAvailable)
3802 }
3803 }
3804
3805 #[instrument(level = "trace", skip_all)]
3806 pub fn get_owned_coins_iterator_with_cursor(
3807 &self,
3808 owner: IotaAddress,
3809 cursor: (String, ObjectID),
3811 limit: usize,
3812 one_coin_type_only: bool,
3813 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3814 if let Some(indexes) = &self.indexes {
3815 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3816 } else {
3817 Err(IotaError::IndexStoreNotAvailable)
3818 }
3819 }
3820
3821 #[instrument(level = "trace", skip_all)]
3822 pub fn get_owner_objects_iterator(
3823 &self,
3824 owner: IotaAddress,
3825 cursor: Option<ObjectID>,
3827 filter: Option<IotaObjectDataFilter>,
3828 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3829 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3830 if let Some(indexes) = &self.indexes {
3831 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3832 } else {
3833 Err(IotaError::IndexStoreNotAvailable)
3834 }
3835 }
3836
3837 #[instrument(level = "trace", skip_all)]
3838 pub async fn get_move_objects<T>(
3839 &self,
3840 owner: IotaAddress,
3841 type_: MoveObjectType,
3842 ) -> IotaResult<Vec<T>>
3843 where
3844 T: DeserializeOwned,
3845 {
3846 let object_ids = self
3847 .get_owner_objects_iterator(owner, None, None)?
3848 .filter(|o| match &o.type_ {
3849 ObjectType::Struct(s) => &type_ == s,
3850 ObjectType::Package => false,
3851 })
3852 .map(|info| ObjectKey(info.object_id, info.version))
3853 .collect::<Vec<_>>();
3854 let mut move_objects = vec![];
3855
3856 let objects = self
3857 .get_object_store()
3858 .try_multi_get_objects_by_key(&object_ids)?;
3859
3860 for (o, id) in objects.into_iter().zip(object_ids) {
3861 let object = o.ok_or_else(|| {
3862 IotaError::from(UserInputError::ObjectNotFound {
3863 object_id: id.0,
3864 version: Some(id.1),
3865 })
3866 })?;
3867 let move_object = object.data.try_as_move().ok_or_else(|| {
3868 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3869 })?;
3870 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3871 IotaError::ObjectDeserialization {
3872 error: format!("{e}"),
3873 }
3874 })?);
3875 }
3876 Ok(move_objects)
3877 }
3878
3879 #[instrument(level = "trace", skip_all)]
3880 pub fn get_dynamic_fields(
3881 &self,
3882 owner: ObjectID,
3883 cursor: Option<ObjectID>,
3885 limit: usize,
3886 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3887 Ok(self
3888 .get_dynamic_fields_iterator(owner, cursor)?
3889 .take(limit)
3890 .collect::<Result<Vec<_>, _>>()?)
3891 }
3892
3893 fn get_dynamic_fields_iterator(
3894 &self,
3895 owner: ObjectID,
3896 cursor: Option<ObjectID>,
3898 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3899 {
3900 if let Some(indexes) = &self.indexes {
3901 indexes.get_dynamic_fields_iterator(owner, cursor)
3902 } else {
3903 Err(IotaError::IndexStoreNotAvailable)
3904 }
3905 }
3906
3907 #[instrument(level = "trace", skip_all)]
3908 pub fn get_dynamic_field_object_id(
3909 &self,
3910 owner: ObjectID,
3911 name_type: TypeTag,
3912 name_bcs_bytes: &[u8],
3913 ) -> IotaResult<Option<ObjectID>> {
3914 if let Some(indexes) = &self.indexes {
3915 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3916 } else {
3917 Err(IotaError::IndexStoreNotAvailable)
3918 }
3919 }
3920
3921 #[instrument(level = "trace", skip_all)]
3922 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3923 Ok(self.get_indexes()?.next_sequence_number())
3924 }
3925
3926 #[instrument(level = "trace", skip_all)]
3927 pub async fn get_executed_transaction_and_effects(
3928 &self,
3929 digest: TransactionDigest,
3930 kv_store: Arc<TransactionKeyValueStore>,
3931 ) -> IotaResult<(Transaction, TransactionEffects)> {
3932 let transaction = kv_store.get_tx(digest).await?;
3933 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3934 Ok((transaction, effects))
3935 }
3936
3937 #[instrument(level = "trace", skip_all)]
3938 pub fn multi_get_checkpoint_by_sequence_number(
3939 &self,
3940 sequence_numbers: &[CheckpointSequenceNumber],
3941 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3942 Ok(self
3943 .checkpoint_store
3944 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3945 }
3946
3947 #[instrument(level = "trace", skip_all)]
3948 pub fn get_transaction_events(
3949 &self,
3950 digest: &TransactionDigest,
3951 ) -> IotaResult<TransactionEvents> {
3952 self.get_transaction_cache_reader()
3953 .try_get_events(digest)?
3954 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3955 }
3956
3957 pub fn get_transaction_input_objects(
3958 &self,
3959 effects: &TransactionEffects,
3960 ) -> anyhow::Result<Vec<Object>> {
3961 iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
3962 .map_err(Into::into)
3963 }
3964
3965 pub fn get_transaction_output_objects(
3966 &self,
3967 effects: &TransactionEffects,
3968 ) -> anyhow::Result<Vec<Object>> {
3969 iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
3970 .map_err(Into::into)
3971 }
3972
3973 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3974 match &self.indexes {
3975 Some(i) => Ok(i.clone()),
3976 None => Err(IotaError::UnsupportedFeature {
3977 error: "extended object indexing is not enabled on this server".into(),
3978 }),
3979 }
3980 }
3981
3982 pub async fn get_transactions_for_tests(
3983 self: &Arc<Self>,
3984 filter: Option<TransactionFilter>,
3985 cursor: Option<TransactionDigest>,
3986 limit: Option<usize>,
3987 reverse: bool,
3988 ) -> IotaResult<Vec<TransactionDigest>> {
3989 let metrics = KeyValueStoreMetrics::new_for_tests();
3990 let kv_store = Arc::new(TransactionKeyValueStore::new(
3991 "rocksdb",
3992 metrics,
3993 self.clone(),
3994 ));
3995 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3996 .await
3997 }
3998
3999 #[instrument(level = "trace", skip_all)]
4000 pub async fn get_transactions(
4001 &self,
4002 kv_store: &Arc<TransactionKeyValueStore>,
4003 filter: Option<TransactionFilter>,
4004 cursor: Option<TransactionDigest>,
4006 limit: Option<usize>,
4007 reverse: bool,
4008 ) -> IotaResult<Vec<TransactionDigest>> {
4009 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4010 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4011 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4012 if reverse {
4013 let iter = iter
4014 .rev()
4015 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4016 .skip(usize::from(cursor.is_some()));
4017 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4018 } else {
4019 let iter = iter
4020 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4021 .skip(usize::from(cursor.is_some()));
4022 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4023 }
4024 }
4025 self.get_indexes()?
4026 .get_transactions(filter, cursor, limit, reverse)
4027 }
4028
4029 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4030 &self.checkpoint_store
4031 }
4032
4033 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4034 self.get_checkpoint_store()
4035 .get_highest_executed_checkpoint_seq_number()?
4036 .ok_or(IotaError::UserInput {
4037 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4038 })
4039 }
4040
4041 #[cfg(msim)]
4042 pub fn get_highest_pruned_checkpoint_for_testing(
4043 &self,
4044 ) -> IotaResult<CheckpointSequenceNumber> {
4045 self.database_for_testing()
4046 .perpetual_tables
4047 .get_highest_pruned_checkpoint()
4048 .map(|c| c.unwrap_or(0))
4049 .map_err(Into::into)
4050 }
4051
4052 #[instrument(level = "trace", skip_all)]
4053 pub fn get_checkpoint_summary_by_sequence_number(
4054 &self,
4055 sequence_number: CheckpointSequenceNumber,
4056 ) -> IotaResult<CheckpointSummary> {
4057 let verified_checkpoint = self
4058 .get_checkpoint_store()
4059 .get_checkpoint_by_sequence_number(sequence_number)?;
4060 match verified_checkpoint {
4061 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4062 None => Err(IotaError::UserInput {
4063 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4064 }),
4065 }
4066 }
4067
4068 #[instrument(level = "trace", skip_all)]
4069 pub fn get_checkpoint_summary_by_digest(
4070 &self,
4071 digest: CheckpointDigest,
4072 ) -> IotaResult<CheckpointSummary> {
4073 let verified_checkpoint = self
4074 .get_checkpoint_store()
4075 .get_checkpoint_by_digest(&digest)?;
4076 match verified_checkpoint {
4077 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4078 None => Err(IotaError::UserInput {
4079 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4080 }),
4081 }
4082 }
4083
4084 #[instrument(level = "trace", skip_all)]
4085 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4086 if is_system_package(package_id) {
4087 return self.find_genesis_txn_digest();
4088 }
4089 Ok(self
4090 .get_object_read(&package_id)?
4091 .into_object()?
4092 .previous_transaction)
4093 }
4094
4095 #[instrument(level = "trace", skip_all)]
4096 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4097 let summary = self
4098 .get_verified_checkpoint_by_sequence_number(0)?
4099 .into_message();
4100 let content = self.get_checkpoint_contents(summary.content_digest)?;
4101 let genesis_transaction = content.enumerate_transactions(&summary).next();
4102 Ok(genesis_transaction
4103 .ok_or(IotaError::UserInput {
4104 error: UserInputError::GenesisTransactionNotFound,
4105 })?
4106 .1
4107 .transaction)
4108 }
4109
4110 #[instrument(level = "trace", skip_all)]
4111 pub fn get_verified_checkpoint_by_sequence_number(
4112 &self,
4113 sequence_number: CheckpointSequenceNumber,
4114 ) -> IotaResult<VerifiedCheckpoint> {
4115 let verified_checkpoint = self
4116 .get_checkpoint_store()
4117 .get_checkpoint_by_sequence_number(sequence_number)?;
4118 match verified_checkpoint {
4119 Some(verified_checkpoint) => Ok(verified_checkpoint),
4120 None => Err(IotaError::UserInput {
4121 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4122 }),
4123 }
4124 }
4125
4126 #[instrument(level = "trace", skip_all)]
4127 pub fn get_verified_checkpoint_summary_by_digest(
4128 &self,
4129 digest: CheckpointDigest,
4130 ) -> IotaResult<VerifiedCheckpoint> {
4131 let verified_checkpoint = self
4132 .get_checkpoint_store()
4133 .get_checkpoint_by_digest(&digest)?;
4134 match verified_checkpoint {
4135 Some(verified_checkpoint) => Ok(verified_checkpoint),
4136 None => Err(IotaError::UserInput {
4137 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4138 }),
4139 }
4140 }
4141
4142 #[instrument(level = "trace", skip_all)]
4143 pub fn get_checkpoint_contents(
4144 &self,
4145 digest: CheckpointContentsDigest,
4146 ) -> IotaResult<CheckpointContents> {
4147 self.get_checkpoint_store()
4148 .get_checkpoint_contents(&digest)?
4149 .ok_or(IotaError::UserInput {
4150 error: UserInputError::CheckpointContentsNotFound(digest),
4151 })
4152 }
4153
4154 #[instrument(level = "trace", skip_all)]
4155 pub fn get_checkpoint_contents_by_sequence_number(
4156 &self,
4157 sequence_number: CheckpointSequenceNumber,
4158 ) -> IotaResult<CheckpointContents> {
4159 let verified_checkpoint = self
4160 .get_checkpoint_store()
4161 .get_checkpoint_by_sequence_number(sequence_number)?;
4162 match verified_checkpoint {
4163 Some(verified_checkpoint) => {
4164 let content_digest = verified_checkpoint.into_inner().content_digest;
4165 self.get_checkpoint_contents(content_digest)
4166 }
4167 None => Err(IotaError::UserInput {
4168 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4169 }),
4170 }
4171 }
4172
4173 #[instrument(level = "trace", skip_all)]
4174 pub async fn query_events(
4175 &self,
4176 kv_store: &Arc<TransactionKeyValueStore>,
4177 query: EventFilter,
4178 cursor: Option<EventID>,
4180 limit: usize,
4181 descending: bool,
4182 ) -> IotaResult<Vec<IotaEvent>> {
4183 let index_store = self.get_indexes()?;
4184
4185 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4187 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4188 IotaError::TransactionNotFound {
4189 digest: cursor.tx_digest,
4190 },
4191 )?;
4192 (tx_seq, cursor.event_seq as usize)
4193 } else if descending {
4194 (u64::MAX, usize::MAX)
4195 } else {
4196 (0, 0)
4197 };
4198
4199 let limit = limit + 1;
4200 let mut event_keys = match query {
4201 EventFilter::All(filters) => {
4202 if filters.is_empty() {
4203 index_store.all_events(tx_num, event_num, limit, descending)?
4204 } else {
4205 return Err(IotaError::UserInput {
4206 error: UserInputError::Unsupported(
4207 "This query type does not currently support filter combinations"
4208 .to_string(),
4209 ),
4210 });
4211 }
4212 }
4213 EventFilter::Transaction(digest) => {
4214 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4215 }
4216 EventFilter::MoveModule { package, module } => {
4217 let module_id = ModuleId::new(package.into(), module);
4218 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4219 }
4220 EventFilter::MoveEventType(struct_name) => index_store
4221 .events_by_move_event_struct_name(
4222 &struct_name,
4223 tx_num,
4224 event_num,
4225 limit,
4226 descending,
4227 )?,
4228 EventFilter::Sender(sender) => {
4229 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4230 }
4231 EventFilter::TimeRange {
4232 start_time,
4233 end_time,
4234 } => index_store
4235 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4236 EventFilter::MoveEventModule { package, module } => index_store
4237 .events_by_move_event_module(
4238 &ModuleId::new(package.into(), module),
4239 tx_num,
4240 event_num,
4241 limit,
4242 descending,
4243 )?,
4244 EventFilter::Package(_)
4246 | EventFilter::MoveEventField { .. }
4247 | EventFilter::Any(_)
4248 | EventFilter::And(_, _)
4249 | EventFilter::Or(_, _) => {
4250 return Err(IotaError::UserInput {
4251 error: UserInputError::Unsupported(
4252 "This query type is not supported by the full node.".to_string(),
4253 ),
4254 });
4255 }
4256 };
4257
4258 if cursor.is_some() {
4261 if !event_keys.is_empty() {
4262 event_keys.remove(0);
4263 }
4264 } else {
4265 event_keys.truncate(limit - 1);
4266 }
4267
4268 let transaction_digests = event_keys
4270 .iter()
4271 .map(|(_, digest, _, _)| *digest)
4272 .collect::<HashSet<_>>()
4273 .into_iter()
4274 .collect::<Vec<_>>();
4275
4276 let events = kv_store
4277 .multi_get_events_by_tx_digests(&transaction_digests)
4278 .await?;
4279
4280 let events_map: HashMap<_, _> =
4281 transaction_digests.iter().zip(events.into_iter()).collect();
4282
4283 let stored_events = event_keys
4284 .into_iter()
4285 .map(|k| {
4286 (
4287 k,
4288 events_map
4289 .get(&k.1)
4290 .expect("fetched digest is missing")
4291 .clone()
4292 .and_then(|e| e.data.get(k.2).cloned()),
4293 )
4294 })
4295 .map(
4296 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4297 event
4298 .map(|e| (e, tx_digest, event_seq, timestamp))
4299 .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4300 },
4301 )
4302 .collect::<Result<Vec<_>, _>>()?;
4303
4304 let epoch_store = self.load_epoch_store_one_call_per_task();
4305 let backing_store = self.get_backing_package_store().as_ref();
4306 let mut layout_resolver = epoch_store
4307 .executor()
4308 .type_layout_resolver(Box::new(backing_store));
4309 let mut events = vec![];
4310 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4311 events.push(IotaEvent::try_from(
4312 e.clone(),
4313 tx_digest,
4314 event_seq as u64,
4315 Some(timestamp),
4316 layout_resolver.get_annotated_layout(&e.type_)?,
4317 )?)
4318 }
4319 Ok(events)
4320 }
4321
4322 pub async fn insert_genesis_object(&self, object: Object) {
4323 self.get_reconfig_api()
4324 .try_insert_genesis_object(object)
4325 .expect("Cannot insert genesis object")
4326 }
4327
4328 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4329 futures::future::join_all(
4330 objects
4331 .iter()
4332 .map(|o| self.insert_genesis_object(o.clone())),
4333 )
4334 .await;
4335 }
4336
4337 #[instrument(level = "trace", skip_all)]
4339 pub fn get_transaction_status(
4340 &self,
4341 transaction_digest: &TransactionDigest,
4342 epoch_store: &Arc<AuthorityPerEpochStore>,
4343 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4344 if let Some(effects) =
4346 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4347 {
4348 if let Some(transaction) = self
4349 .get_transaction_cache_reader()
4350 .try_get_transaction_block(transaction_digest)?
4351 {
4352 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4353 let events = if effects.events_digest().is_some() {
4354 self.get_transaction_events(effects.transaction_digest())?
4355 } else {
4356 TransactionEvents::default()
4357 };
4358 return Ok(Some((
4359 (*transaction).clone().into_message(),
4360 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4361 )));
4362 } else {
4363 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4368 }
4369 }
4370 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4371 self.metrics.tx_already_processed.inc();
4372 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4373 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4374 } else {
4375 Ok(None)
4376 }
4377 }
4378
4379 #[instrument(level = "trace", skip_all)]
4383 pub fn get_signed_effects_and_maybe_resign(
4384 &self,
4385 transaction_digest: &TransactionDigest,
4386 epoch_store: &Arc<AuthorityPerEpochStore>,
4387 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4388 let effects = self
4389 .get_transaction_cache_reader()
4390 .try_get_executed_effects(transaction_digest)?;
4391 match effects {
4392 Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4393 None => Ok(None),
4394 }
4395 }
4396
4397 #[instrument(level = "trace", skip_all)]
4398 pub(crate) fn sign_effects(
4399 &self,
4400 effects: TransactionEffects,
4401 epoch_store: &Arc<AuthorityPerEpochStore>,
4402 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4403 let tx_digest = *effects.transaction_digest();
4404 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4405 Some(sig) if sig.epoch == epoch_store.epoch() => {
4406 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4407 }
4408 _ => {
4409 debug!(
4432 ?tx_digest,
4433 epoch=?epoch_store.epoch(),
4434 "Re-signing the effects with the current epoch"
4435 );
4436
4437 let sig = AuthoritySignInfo::new(
4438 epoch_store.epoch(),
4439 &effects,
4440 Intent::iota_app(IntentScope::TransactionEffects),
4441 self.name,
4442 &*self.secret,
4443 );
4444
4445 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4446
4447 epoch_store.insert_effects_digest_and_signature(
4448 &tx_digest,
4449 effects.digest(),
4450 &sig,
4451 )?;
4452
4453 effects
4454 }
4455 };
4456
4457 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4458 signed_effects,
4459 ))
4460 }
4461
4462 #[instrument(level = "trace", skip_all)]
4464 fn fullnode_only_get_tx_coins_for_indexing(
4465 &self,
4466 effects: &TransactionEffects,
4467 inner_temporary_store: &InnerTemporaryStore,
4468 epoch_store: &Arc<AuthorityPerEpochStore>,
4469 ) -> Option<TxCoins> {
4470 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4471 return None;
4472 }
4473 let written_coin_objects = inner_temporary_store
4474 .written
4475 .iter()
4476 .filter_map(|(k, v)| {
4477 if v.is_coin() {
4478 Some((*k, v.clone()))
4479 } else {
4480 None
4481 }
4482 })
4483 .collect();
4484 let mut input_coin_objects = inner_temporary_store
4485 .input_objects
4486 .iter()
4487 .filter_map(|(k, v)| {
4488 if v.is_coin() {
4489 Some((*k, v.clone()))
4490 } else {
4491 None
4492 }
4493 })
4494 .collect::<ObjectMap>();
4495
4496 for (object_id, version) in effects.modified_at_versions() {
4501 if inner_temporary_store
4502 .loaded_runtime_objects
4503 .contains_key(&object_id)
4504 {
4505 if let Some(object) = self
4506 .get_object_store()
4507 .get_object_by_key(&object_id, version)
4508 {
4509 if object.is_coin() {
4510 input_coin_objects.insert(object_id, object);
4511 }
4512 }
4513 }
4514 }
4515
4516 Some((input_coin_objects, written_coin_objects))
4517 }
4518
4519 #[instrument(level = "trace", skip_all)]
4531 pub async fn get_transaction_lock(
4532 &self,
4533 object_ref: &ObjectRef,
4534 epoch_store: &AuthorityPerEpochStore,
4535 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4536 let lock_info = self
4537 .get_object_cache_reader()
4538 .try_get_lock(*object_ref, epoch_store)?;
4539 let lock_info = match lock_info {
4540 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4541 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4542 provided_obj_ref: *object_ref,
4543 current_version: locked_ref.1,
4544 }
4545 .into());
4546 }
4547 ObjectLockStatus::Initialized => {
4548 return Ok(None);
4549 }
4550 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4551 };
4552
4553 epoch_store.get_signed_transaction(&lock_info)
4554 }
4555
4556 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4557 self.get_object_cache_reader().try_get_objects(objects)
4558 }
4559
4560 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4562 self.try_get_objects(objects)
4563 .await
4564 .expect("storage access failed")
4565 }
4566
4567 pub async fn try_get_object_or_tombstone(
4568 &self,
4569 object_id: ObjectID,
4570 ) -> IotaResult<Option<ObjectRef>> {
4571 self.get_object_cache_reader()
4572 .try_get_latest_object_ref_or_tombstone(object_id)
4573 }
4574
4575 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4577 self.try_get_object_or_tombstone(object_id)
4578 .await
4579 .expect("storage access failed")
4580 }
4581
4582 pub fn set_override_protocol_upgrade_buffer_stake(
4592 &self,
4593 expected_epoch: EpochId,
4594 buffer_stake_bps: u64,
4595 ) -> IotaResult {
4596 let epoch_store = self.load_epoch_store_one_call_per_task();
4597 let actual_epoch = epoch_store.epoch();
4598 if actual_epoch != expected_epoch {
4599 return Err(IotaError::WrongEpoch {
4600 expected_epoch,
4601 actual_epoch,
4602 });
4603 }
4604
4605 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4606 }
4607
4608 pub fn clear_override_protocol_upgrade_buffer_stake(
4609 &self,
4610 expected_epoch: EpochId,
4611 ) -> IotaResult {
4612 let epoch_store = self.load_epoch_store_one_call_per_task();
4613 let actual_epoch = epoch_store.epoch();
4614 if actual_epoch != expected_epoch {
4615 return Err(IotaError::WrongEpoch {
4616 expected_epoch,
4617 actual_epoch,
4618 });
4619 }
4620
4621 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4622 }
4623
4624 pub async fn get_available_system_packages(
4628 &self,
4629 binary_config: &BinaryConfig,
4630 ) -> Vec<ObjectRef> {
4631 let mut results = vec![];
4632
4633 let system_packages = BuiltInFramework::iter_system_packages();
4634
4635 #[cfg(msim)]
4637 let extra_packages = framework_injection::get_extra_packages(self.name);
4638 #[cfg(msim)]
4639 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4640
4641 for system_package in system_packages {
4642 let modules = system_package.modules().to_vec();
4643 #[cfg(msim)]
4645 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4646 .unwrap_or(modules);
4647
4648 let Some(obj_ref) = iota_framework::compare_system_package(
4649 &self.get_object_store(),
4650 &system_package.id,
4651 &modules,
4652 system_package.dependencies.to_vec(),
4653 binary_config,
4654 )
4655 .await
4656 else {
4657 return vec![];
4658 };
4659 results.push(obj_ref);
4660 }
4661
4662 results
4663 }
4664
4665 async fn get_system_package_bytes(
4682 &self,
4683 system_packages: Vec<ObjectRef>,
4684 binary_config: &BinaryConfig,
4685 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4686 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4687 let objects = self.get_objects(&ids).await;
4688
4689 let mut res = Vec::with_capacity(system_packages.len());
4690 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4691 let prev_transaction = match object {
4692 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4693 info!("Framework {} does not need updating", system_package_ref.0);
4695 continue;
4696 }
4697
4698 Some(cur_object) => cur_object.previous_transaction,
4699 None => TransactionDigest::genesis_marker(),
4700 };
4701
4702 #[cfg(msim)]
4703 let SystemPackage {
4704 id: _,
4705 bytes,
4706 dependencies,
4707 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4708 .unwrap_or_else(|| {
4709 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4710 });
4711
4712 #[cfg(not(msim))]
4713 let SystemPackage {
4714 id: _,
4715 bytes,
4716 dependencies,
4717 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4718
4719 let modules: Vec<_> = bytes
4720 .iter()
4721 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4722 .collect();
4723
4724 let new_object = Object::new_system_package(
4725 &modules,
4726 system_package_ref.1,
4727 dependencies.clone(),
4728 prev_transaction,
4729 );
4730
4731 let new_ref = new_object.compute_object_reference();
4732 if new_ref != system_package_ref {
4733 error!(
4734 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4735 );
4736 return None;
4737 }
4738
4739 res.push((system_package_ref.1, bytes, dependencies));
4740 }
4741
4742 Some(res)
4743 }
4744
4745 fn is_protocol_version_supported_v1(
4749 proposed_protocol_version: ProtocolVersion,
4750 committee: &Committee,
4751 capabilities: Vec<AuthorityCapabilitiesV1>,
4752 mut buffer_stake_bps: u64,
4753 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4754 if buffer_stake_bps > 10000 {
4755 warn!("clamping buffer_stake_bps to 10000");
4756 buffer_stake_bps = 10000;
4757 }
4758
4759 let mut desired_upgrades: Vec<_> = capabilities
4762 .into_iter()
4763 .filter_map(|mut cap| {
4764 if cap.available_system_packages.is_empty() {
4766 return None;
4767 }
4768
4769 cap.available_system_packages.sort();
4770
4771 info!(
4772 "validator {:?} supports {:?} with system packages: {:?}",
4773 cap.authority.concise(),
4774 cap.supported_protocol_versions,
4775 cap.available_system_packages,
4776 );
4777
4778 cap.supported_protocol_versions
4782 .get_version_digest(proposed_protocol_version)
4783 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4784 })
4785 .collect();
4786
4787 desired_upgrades.sort();
4790 desired_upgrades
4791 .into_iter()
4792 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4793 .into_iter()
4794 .find_map(|((digest, packages), group)| {
4795 assert!(!packages.is_empty());
4797
4798 let mut stake_aggregator: StakeAggregator<(), true> =
4799 StakeAggregator::new(Arc::new(committee.clone()));
4800
4801 for (_, _, authority) in group {
4802 stake_aggregator.insert_generic(authority, ());
4803 }
4804
4805 let total_votes = stake_aggregator.total_votes();
4806 let quorum_threshold = committee.quorum_threshold();
4807 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4808
4809 info!(
4810 protocol_config_digest = ?digest,
4811 ?total_votes,
4812 ?quorum_threshold,
4813 ?buffer_stake_bps,
4814 ?effective_threshold,
4815 ?proposed_protocol_version,
4816 ?packages,
4817 "support for upgrade"
4818 );
4819
4820 let has_support = total_votes >= effective_threshold;
4821 has_support.then_some((proposed_protocol_version, digest, packages))
4822 })
4823 }
4824
4825 fn choose_protocol_version_and_system_packages_v1(
4829 current_protocol_version: ProtocolVersion,
4830 current_protocol_digest: Digest,
4831 committee: &Committee,
4832 capabilities: Vec<AuthorityCapabilitiesV1>,
4833 buffer_stake_bps: u64,
4834 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4835 let mut next_protocol_version = current_protocol_version;
4836 let mut system_packages = vec![];
4837 let mut protocol_version_digest = current_protocol_digest;
4838
4839 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4843 next_protocol_version + 1,
4844 committee,
4845 capabilities.clone(),
4846 buffer_stake_bps,
4847 ) {
4848 next_protocol_version = version;
4849 protocol_version_digest = digest;
4850 system_packages = packages;
4851 }
4852
4853 (
4854 next_protocol_version,
4855 protocol_version_digest,
4856 system_packages,
4857 )
4858 }
4859
4860 fn get_validators_supporting_protocol_version(
4865 target_protocol_version: ProtocolVersion,
4866 target_digest: Digest,
4867 active_validators: &[AuthorityPublicKey],
4868 capabilities: &[AuthorityCapabilitiesV1],
4869 ) -> Vec<u64> {
4870 let mut eligible_validators = Vec::new();
4871
4872 for capability in capabilities {
4873 if let Some(digest) = capability
4875 .supported_protocol_versions
4876 .get_version_digest(target_protocol_version)
4877 {
4878 if digest == target_digest {
4879 if let Some(index) = active_validators
4881 .iter()
4882 .position(|name| AuthorityName::from(name) == capability.authority)
4883 {
4884 eligible_validators.push(index as u64);
4885 }
4886 }
4887 }
4888 }
4889
4890 eligible_validators.sort();
4892 eligible_validators
4893 }
4894
4895 fn calculate_eligible_validators_weight(
4900 eligible_validator_indices: &[u64],
4901 active_validators: &[AuthorityPublicKey],
4902 committee: &Committee,
4903 ) -> u64 {
4904 let mut total_weight = 0u64;
4905
4906 for &index in eligible_validator_indices {
4907 let authority_pubkey = &active_validators[index as usize];
4908 if let Some((_, weight)) = committee
4910 .members()
4911 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4912 {
4913 total_weight += weight;
4914 }
4915 }
4916
4917 total_weight
4918 }
4919
4920 #[instrument(level = "debug", skip_all)]
4921 fn create_authenticator_state_tx(
4922 &self,
4923 epoch_store: &Arc<AuthorityPerEpochStore>,
4924 ) -> Option<EndOfEpochTransactionKind> {
4925 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4926 info!("authenticator state transactions not enabled");
4927 return None;
4928 }
4929
4930 let authenticator_state_exists = epoch_store.authenticator_state_exists();
4931 let tx = if authenticator_state_exists {
4932 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4933 let min_epoch =
4934 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4935 let authenticator_obj_initial_shared_version = epoch_store
4936 .epoch_start_config()
4937 .authenticator_obj_initial_shared_version()
4938 .expect("initial version must exist");
4939
4940 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4941 min_epoch,
4942 authenticator_obj_initial_shared_version,
4943 );
4944
4945 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4946
4947 tx
4948 } else {
4949 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4950 info!("Creating AuthenticatorStateCreate tx");
4951 tx
4952 };
4953 Some(tx)
4954 }
4955
4956 #[instrument(level = "error", skip_all)]
4969 pub async fn create_and_execute_advance_epoch_tx(
4970 &self,
4971 epoch_store: &Arc<AuthorityPerEpochStore>,
4972 gas_cost_summary: &GasCostSummary,
4973 checkpoint: CheckpointSequenceNumber,
4974 epoch_start_timestamp_ms: CheckpointTimestamp,
4975 scores: Vec<u64>,
4976 ) -> anyhow::Result<(
4977 IotaSystemState,
4978 Option<SystemEpochInfoEvent>,
4979 TransactionEffects,
4980 )> {
4981 let mut txns = Vec::new();
4982
4983 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4984 txns.push(tx);
4985 }
4986
4987 let next_epoch = epoch_store.epoch() + 1;
4988
4989 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4990 let authority_capabilities = epoch_store
4991 .get_capabilities_v1()
4992 .expect("read capabilities from db cannot fail");
4993 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4994 Self::choose_protocol_version_and_system_packages_v1(
4995 epoch_store.protocol_version(),
4996 SupportedProtocolVersionsWithHashes::protocol_config_digest(
4997 epoch_store.protocol_config(),
4998 ),
4999 epoch_store.committee(),
5000 authority_capabilities.clone(),
5001 buffer_stake_bps,
5002 );
5003
5004 let config = epoch_store.protocol_config();
5008 let binary_config = to_binary_config(config);
5009 let Some(next_epoch_system_package_bytes) = self
5010 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5011 .await
5012 else {
5013 error!(
5014 "upgraded system packages {:?} are not locally available, cannot create \
5015 ChangeEpochTx. validator binary must be upgraded to the correct version!",
5016 next_epoch_system_packages
5017 );
5018 bail!("missing system packages: cannot form ChangeEpochTx");
5028 };
5029
5030 if config.select_committee_from_eligible_validators() {
5033 let active_validators = epoch_store.epoch_start_state().get_active_validators();
5035
5036 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5037
5038 if config.select_committee_supporting_next_epoch_version() {
5042 eligible_active_validators = Self::get_validators_supporting_protocol_version(
5043 next_epoch_protocol_version,
5044 next_epoch_protocol_digest,
5045 &active_validators,
5046 &authority_capabilities,
5047 );
5048
5049 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5051 &eligible_active_validators,
5052 &active_validators,
5053 epoch_store.committee(),
5054 );
5055
5056 let committee = epoch_store.committee();
5060 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5061
5062 if eligible_validators_weight < effective_threshold {
5063 error!(
5064 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5065 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5066 );
5067 eligible_active_validators = (0..active_validators.len() as u64).collect();
5070 }
5071 }
5072
5073 if config.pass_validator_scores_to_advance_epoch() {
5076 txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5077 next_epoch,
5078 next_epoch_protocol_version,
5079 gas_cost_summary.storage_cost,
5080 gas_cost_summary.computation_cost,
5081 gas_cost_summary.computation_cost_burned,
5082 gas_cost_summary.storage_rebate,
5083 gas_cost_summary.non_refundable_storage_fee,
5084 epoch_start_timestamp_ms,
5085 next_epoch_system_package_bytes,
5086 eligible_active_validators,
5087 scores,
5088 config.adjust_rewards_by_score(),
5089 ));
5090 } else {
5091 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5092 next_epoch,
5093 next_epoch_protocol_version,
5094 gas_cost_summary.storage_cost,
5095 gas_cost_summary.computation_cost,
5096 gas_cost_summary.computation_cost_burned,
5097 gas_cost_summary.storage_rebate,
5098 gas_cost_summary.non_refundable_storage_fee,
5099 epoch_start_timestamp_ms,
5100 next_epoch_system_package_bytes,
5101 eligible_active_validators,
5102 ));
5103 }
5104 } else if config.protocol_defined_base_fee()
5105 && config.max_committee_members_count_as_option().is_some()
5106 {
5107 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5108 next_epoch,
5109 next_epoch_protocol_version,
5110 gas_cost_summary.storage_cost,
5111 gas_cost_summary.computation_cost,
5112 gas_cost_summary.computation_cost_burned,
5113 gas_cost_summary.storage_rebate,
5114 gas_cost_summary.non_refundable_storage_fee,
5115 epoch_start_timestamp_ms,
5116 next_epoch_system_package_bytes,
5117 ));
5118 } else {
5119 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5120 next_epoch,
5121 next_epoch_protocol_version,
5122 gas_cost_summary.storage_cost,
5123 gas_cost_summary.computation_cost,
5124 gas_cost_summary.storage_rebate,
5125 gas_cost_summary.non_refundable_storage_fee,
5126 epoch_start_timestamp_ms,
5127 next_epoch_system_package_bytes,
5128 ));
5129 }
5130
5131 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5132
5133 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5134 tx.clone(),
5135 epoch_store.epoch(),
5136 checkpoint,
5137 );
5138
5139 let tx_digest = executable_tx.digest();
5140
5141 info!(
5142 ?next_epoch,
5143 ?next_epoch_protocol_version,
5144 ?next_epoch_system_packages,
5145 computation_cost=?gas_cost_summary.computation_cost,
5146 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5147 storage_cost=?gas_cost_summary.storage_cost,
5148 storage_rebate=?gas_cost_summary.storage_rebate,
5149 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5150 ?tx_digest,
5151 "Creating advance epoch transaction"
5152 );
5153
5154 fail_point_async!("change_epoch_tx_delay");
5155 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5156
5157 if self
5161 .get_transaction_cache_reader()
5162 .try_is_tx_already_executed(tx_digest)?
5163 {
5164 warn!("change epoch tx has already been executed via state sync");
5165 bail!("change epoch tx has already been executed via state sync",);
5166 }
5167
5168 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5169
5170 epoch_store.assign_shared_object_versions_idempotent(
5174 self.get_object_cache_reader().as_ref(),
5175 std::slice::from_ref(&executable_tx),
5176 )?;
5177
5178 let (input_objects, _, _) =
5179 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5180
5181 let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5182 &execution_guard,
5183 &executable_tx,
5184 input_objects,
5185 None,
5186 None,
5187 epoch_store,
5188 )?;
5189 let system_obj = get_iota_system_state(&temporary_store.written)
5190 .expect("change epoch tx must write to system object");
5191 let system_epoch_info_event = temporary_store
5193 .events
5194 .data
5195 .into_iter()
5196 .find(|event| event.is_system_epoch_info_event())
5197 .map(SystemEpochInfoEvent::from);
5198 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5201
5202 self.get_state_sync_store()
5206 .try_insert_transaction_and_effects(&tx, &effects)
5207 .map_err(|err| {
5208 let err: anyhow::Error = err.into();
5209 err
5210 })?;
5211
5212 info!(
5213 "Effects summary of the change epoch transaction: {:?}",
5214 effects.summary_for_debug()
5215 );
5216 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5217 assert!(effects.status().is_ok());
5219 Ok((system_obj, system_epoch_info_event, effects))
5220 }
5221
5222 #[instrument(level = "error", skip_all)]
5226 async fn revert_uncommitted_epoch_transactions(
5227 &self,
5228 epoch_store: &AuthorityPerEpochStore,
5229 ) -> IotaResult {
5230 {
5231 let state = epoch_store.get_reconfig_state_write_lock_guard();
5232 if state.should_accept_user_certs() {
5233 epoch_store.close_user_certs(state);
5242 }
5243 }
5245 let pending_certificates = epoch_store.pending_consensus_certificates();
5246 info!(
5247 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5248 pending_certificates.len(),
5249 pending_certificates,
5250 );
5251 for digest in pending_certificates {
5252 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5253 info!(
5254 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5255 digest
5256 );
5257 continue;
5258 }
5259 info!("Reverting {:?} at the end of epoch", digest);
5260 epoch_store.revert_executed_transaction(&digest)?;
5261 self.get_reconfig_api().try_revert_state_update(&digest)?;
5262 }
5263 info!("All uncommitted local transactions reverted");
5264 Ok(())
5265 }
5266
5267 #[instrument(level = "error", skip_all)]
5268 async fn reopen_epoch_db(
5269 &self,
5270 cur_epoch_store: &AuthorityPerEpochStore,
5271 new_committee: Committee,
5272 epoch_start_configuration: EpochStartConfiguration,
5273 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5274 epoch_last_checkpoint: CheckpointSequenceNumber,
5275 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5276 let new_epoch = new_committee.epoch;
5277 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5278 assert_eq!(
5279 epoch_start_configuration.epoch_start_state().epoch(),
5280 new_committee.epoch
5281 );
5282 fail_point!("before-open-new-epoch-store");
5283 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5284 self.name,
5285 new_committee,
5286 epoch_start_configuration,
5287 self.get_backing_package_store().clone(),
5288 self.get_object_store().clone(),
5289 expensive_safety_check_config,
5290 epoch_last_checkpoint,
5291 )?;
5292 self.epoch_store.store(new_epoch_store.clone());
5293 Ok(new_epoch_store)
5294 }
5295
5296 fn check_move_account(
5299 &self,
5300 auth_account_object_id: ObjectID,
5301 auth_account_object_seq_number: Option<SequenceNumber>,
5302 auth_account_object_digest: Option<ObjectDigest>,
5303 account_object: ObjectReadResult,
5304 signer: &IotaAddress,
5305 ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5306 let account_object = match account_object.object {
5307 ObjectReadResultKind::Object(object) => Ok(object),
5308 ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5309 Err(UserInputError::AccountObjectDeleted {
5310 account_id: account_object.id(),
5311 account_version: version,
5312 transaction_digest: digest,
5313 })
5314 }
5315 ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5318 Err(UserInputError::AccountObjectInCanceledTransaction {
5319 account_id: account_object.id(),
5320 account_version: version,
5321 })
5322 }
5323 }?;
5324
5325 let account_object_addr = IotaAddress::from(auth_account_object_id);
5326
5327 fp_ensure!(
5328 signer == &account_object_addr,
5329 UserInputError::IncorrectUserSignature {
5330 error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5331 }
5332 .into()
5333 );
5334
5335 fp_ensure!(
5336 account_object.is_shared() || account_object.is_immutable(),
5337 UserInputError::AccountObjectNotSupported {
5338 object_id: auth_account_object_id
5339 }
5340 .into()
5341 );
5342
5343 let auth_account_object_seq_number =
5344 if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5345 let account_object_version = account_object.version();
5346
5347 fp_ensure!(
5348 account_object_version == auth_account_object_seq_number,
5349 UserInputError::AccountObjectVersionMismatch {
5350 object_id: auth_account_object_id,
5351 expected_version: auth_account_object_seq_number,
5352 actual_version: account_object_version,
5353 }
5354 .into()
5355 );
5356
5357 auth_account_object_seq_number
5358 } else {
5359 account_object.version()
5360 };
5361
5362 if let Some(auth_account_object_digest) = auth_account_object_digest {
5363 let expected_digest = account_object.digest();
5364 fp_ensure!(
5365 expected_digest == auth_account_object_digest,
5366 UserInputError::InvalidAccountObjectDigest {
5367 object_id: auth_account_object_id,
5368 expected_digest,
5369 actual_digest: auth_account_object_digest,
5370 }
5371 .into()
5372 );
5373 }
5374
5375 let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5376 auth_account_object_id,
5377 &AuthenticatorFunctionRefV1Key::tag().into(),
5378 &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5379 )
5380 .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5381 account_object_id: auth_account_object_id,
5382 })?;
5383
5384 let authenticator_function_ref_field = self
5385 .get_object_cache_reader()
5386 .try_find_object_lt_or_eq_version(
5387 authenticator_function_ref_field_id,
5388 auth_account_object_seq_number,
5389 )?;
5390
5391 if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5392 let field_move_object = authenticator_function_ref_field_obj
5393 .data
5394 .try_as_move()
5395 .expect("dynamic field should never be a package object");
5396
5397 let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5398 field_move_object.to_rust().ok_or(
5399 UserInputError::InvalidAuthenticatorFunctionRefField {
5400 account_object_id: auth_account_object_id,
5401 },
5402 )?;
5403
5404 Ok(AuthenticatorFunctionRefForExecution::new_v1(
5405 field.value,
5406 authenticator_function_ref_field_obj.compute_object_reference(),
5407 authenticator_function_ref_field_obj.owner,
5408 authenticator_function_ref_field_obj.storage_rebate,
5409 authenticator_function_ref_field_obj.previous_transaction,
5410 ))
5411 } else {
5412 Err(UserInputError::MoveAuthenticatorNotFound {
5413 authenticator_function_ref_id: authenticator_function_ref_field_id,
5414 account_object_id: auth_account_object_id,
5415 account_object_version: auth_account_object_seq_number,
5416 }
5417 .into())
5418 }
5419 }
5420
5421 fn read_objects_for_signing(
5422 &self,
5423 transaction: &VerifiedTransaction,
5424 epoch: u64,
5425 ) -> IotaResult<(
5426 InputObjects,
5427 ReceivingObjects,
5428 Option<InputObjects>,
5429 Option<ObjectReadResult>,
5430 )> {
5431 let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5432 Some(transaction.digest()),
5433 &transaction.collect_all_input_object_kind_for_reading()?,
5434 &transaction.data().transaction_data().receiving_objects(),
5435 epoch,
5436 )?;
5437
5438 transaction
5439 .split_input_objects_into_groups_for_reading(input_objects)
5440 .map(|(tx_input_objects, auth_input_objects, account_object)| {
5441 (
5442 tx_input_objects,
5443 tx_receiving_objects,
5444 auth_input_objects,
5445 account_object,
5446 )
5447 })
5448 }
5449
5450 fn check_transaction_inputs_for_signing(
5451 &self,
5452 protocol_config: &ProtocolConfig,
5453 reference_gas_price: u64,
5454 tx_data: &TransactionData,
5455 tx_input_objects: InputObjects,
5456 tx_receiving_objects: &ReceivingObjects,
5457 move_authenticator: Option<&MoveAuthenticator>,
5458 auth_input_objects: Option<InputObjects>,
5459 account_object: Option<ObjectReadResult>,
5460 ) -> IotaResult<(
5461 IotaGasStatus,
5462 CheckedInputObjects,
5463 Option<CheckedInputObjects>,
5464 Option<AuthenticatorFunctionRef>,
5465 )> {
5466 let (
5467 auth_checked_input_objects_union,
5468 authenticator_function_ref,
5469 authenticator_gas_budget,
5470 ) = if let Some(move_authenticator) = move_authenticator {
5471 let auth_input_objects =
5472 auth_input_objects.expect("MoveAuthenticator input objects must be provided");
5473 let account_object = account_object.expect("Move account object must be provided");
5474
5475 let (
5477 auth_account_object_id,
5478 auth_account_object_seq_number,
5479 auth_account_object_digest,
5480 ) = move_authenticator.object_to_authenticate_components()?;
5481
5482 let AuthenticatorFunctionRefForExecution {
5484 authenticator_function_ref,
5485 ..
5486 } = self.check_move_account(
5487 auth_account_object_id,
5488 auth_account_object_seq_number,
5489 auth_account_object_digest,
5490 account_object,
5491 &tx_data.sender(),
5492 )?;
5493
5494 let auth_checked_input_objects =
5496 iota_transaction_checks::check_move_authenticator_input_for_signing(
5497 auth_input_objects,
5498 )?;
5499
5500 let authenticator_gas_budget = protocol_config.max_auth_gas();
5503
5504 (
5505 Some(auth_checked_input_objects),
5506 Some(authenticator_function_ref),
5507 authenticator_gas_budget,
5508 )
5509 } else {
5510 (None, None, 0)
5511 };
5512
5513 let (gas_status, tx_checked_input_objects) =
5515 iota_transaction_checks::check_transaction_input(
5516 protocol_config,
5517 reference_gas_price,
5518 tx_data,
5519 tx_input_objects,
5520 tx_receiving_objects,
5521 &self.metrics.bytecode_verifier_metrics,
5522 &self.config.verifier_signing_config,
5523 authenticator_gas_budget,
5524 )?;
5525
5526 Ok((
5527 gas_status,
5528 tx_checked_input_objects,
5529 auth_checked_input_objects_union,
5530 authenticator_function_ref,
5531 ))
5532 }
5533
5534 #[cfg(test)]
5535 pub(crate) fn iter_live_object_set_for_testing(
5536 &self,
5537 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5538 self.get_accumulator_store()
5539 .iter_cached_live_object_set_for_testing()
5540 }
5541
5542 #[cfg(test)]
5543 pub(crate) fn shutdown_execution_for_test(&self) {
5544 self.tx_execution_shutdown
5545 .lock()
5546 .take()
5547 .unwrap()
5548 .send(())
5549 .unwrap();
5550 }
5551
5552 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5555 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5556 self.get_object_cache_reader()
5557 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5558 self.get_reconfig_api()
5559 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5560 }
5561}
5562
5563pub struct RandomnessRoundReceiver {
5564 authority_state: Arc<AuthorityState>,
5565 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5566}
5567
5568impl RandomnessRoundReceiver {
5569 pub fn spawn(
5570 authority_state: Arc<AuthorityState>,
5571 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5572 ) -> JoinHandle<()> {
5573 let rrr = RandomnessRoundReceiver {
5574 authority_state,
5575 randomness_rx,
5576 };
5577 spawn_monitored_task!(rrr.run())
5578 }
5579
5580 async fn run(mut self) {
5581 info!("RandomnessRoundReceiver event loop started");
5582
5583 loop {
5584 tokio::select! {
5585 maybe_recv = self.randomness_rx.recv() => {
5586 if let Some((epoch, round, bytes)) = maybe_recv {
5587 self.handle_new_randomness(epoch, round, bytes);
5588 } else {
5589 break;
5590 }
5591 },
5592 }
5593 }
5594
5595 info!("RandomnessRoundReceiver event loop ended");
5596 }
5597
5598 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5599 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5600 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5601 if epoch_store.epoch() != epoch {
5602 warn!(
5603 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5604 epoch_store.epoch()
5605 );
5606 return;
5607 }
5608 let transaction = VerifiedTransaction::new_randomness_state_update(
5609 epoch,
5610 round,
5611 bytes,
5612 epoch_store
5613 .epoch_start_config()
5614 .randomness_obj_initial_shared_version(),
5615 );
5616 debug!(
5617 "created randomness state update transaction with digest: {:?}",
5618 transaction.digest()
5619 );
5620 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5621 let digest = *transaction.digest();
5622
5623 self.authority_state
5628 .get_cache_commit()
5629 .persist_transaction(&transaction);
5630
5631 self.authority_state
5633 .transaction_manager()
5634 .enqueue(vec![transaction], &epoch_store);
5635
5636 let authority_state = self.authority_state.clone();
5637 spawn_monitored_task!(async move {
5638 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5647 let result = tokio::time::timeout(
5648 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5649 authority_state
5650 .get_transaction_cache_reader()
5651 .try_notify_read_executed_effects(&[digest]),
5652 )
5653 .await;
5654 let result = match result {
5655 Ok(result) => result,
5656 Err(_) => {
5657 if cfg!(debug_assertions) {
5658 panic!(
5660 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5661 );
5662 }
5663 warn!(
5664 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5665 );
5666 authority_state
5668 .get_transaction_cache_reader()
5669 .try_notify_read_executed_effects(&[digest])
5670 .await
5671 }
5672 };
5673
5674 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5675 let effects = effects.pop().expect("should return effects");
5676 if *effects.status() != ExecutionStatus::Success {
5677 fatal!(
5678 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5679 );
5680 }
5681 debug!(
5682 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5683 );
5684 });
5685 }
5686}
5687
5688#[async_trait]
5689impl TransactionKeyValueStoreTrait for AuthorityState {
5690 async fn multi_get(
5691 &self,
5692 transaction_keys: &[TransactionDigest],
5693 effects_keys: &[TransactionDigest],
5694 ) -> IotaResult<KVStoreTransactionData> {
5695 let txns = if !transaction_keys.is_empty() {
5696 self.get_transaction_cache_reader()
5697 .try_multi_get_transaction_blocks(transaction_keys)?
5698 .into_iter()
5699 .map(|t| t.map(|t| (*t).clone().into_inner()))
5700 .collect()
5701 } else {
5702 vec![]
5703 };
5704
5705 let fx = if !effects_keys.is_empty() {
5706 self.get_transaction_cache_reader()
5707 .try_multi_get_executed_effects(effects_keys)?
5708 } else {
5709 vec![]
5710 };
5711
5712 Ok((txns, fx))
5713 }
5714
5715 async fn multi_get_checkpoints(
5716 &self,
5717 checkpoint_summaries: &[CheckpointSequenceNumber],
5718 checkpoint_contents: &[CheckpointSequenceNumber],
5719 checkpoint_summaries_by_digest: &[CheckpointDigest],
5720 ) -> IotaResult<(
5721 Vec<Option<CertifiedCheckpointSummary>>,
5722 Vec<Option<CheckpointContents>>,
5723 Vec<Option<CertifiedCheckpointSummary>>,
5724 )> {
5725 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5727 let store = self.get_checkpoint_store();
5728 for seq in checkpoint_summaries {
5729 let checkpoint = store
5730 .get_checkpoint_by_sequence_number(*seq)?
5731 .map(|c| c.into_inner());
5732
5733 summaries.push(checkpoint);
5734 }
5735
5736 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5737 for seq in checkpoint_contents {
5738 let checkpoint = store
5739 .get_checkpoint_by_sequence_number(*seq)?
5740 .and_then(|summary| {
5741 store
5742 .get_checkpoint_contents(&summary.content_digest)
5743 .expect("db read cannot fail")
5744 });
5745 contents.push(checkpoint);
5746 }
5747
5748 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5749 for digest in checkpoint_summaries_by_digest {
5750 let checkpoint = store
5751 .get_checkpoint_by_digest(digest)?
5752 .map(|c| c.into_inner());
5753 summaries_by_digest.push(checkpoint);
5754 }
5755
5756 Ok((summaries, contents, summaries_by_digest))
5757 }
5758
5759 async fn get_transaction_perpetual_checkpoint(
5760 &self,
5761 digest: TransactionDigest,
5762 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5763 self.get_checkpoint_cache()
5764 .try_get_transaction_perpetual_checkpoint(&digest)
5765 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5766 }
5767
5768 async fn get_object(
5769 &self,
5770 object_id: ObjectID,
5771 version: VersionNumber,
5772 ) -> IotaResult<Option<Object>> {
5773 self.get_object_cache_reader()
5774 .try_get_object_by_key(&object_id, version)
5775 }
5776
5777 async fn multi_get_transactions_perpetual_checkpoints(
5778 &self,
5779 digests: &[TransactionDigest],
5780 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5781 let res = self
5782 .get_checkpoint_cache()
5783 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5784
5785 Ok(res
5786 .into_iter()
5787 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5788 .collect())
5789 }
5790
5791 #[instrument(skip(self))]
5792 async fn multi_get_events_by_tx_digests(
5793 &self,
5794 digests: &[TransactionDigest],
5795 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5796 if digests.is_empty() {
5797 return Ok(vec![]);
5798 }
5799
5800 Ok(self
5801 .get_transaction_cache_reader()
5802 .multi_get_events(digests))
5803 }
5804}
5805
5806#[cfg(msim)]
5807pub mod framework_injection {
5808 use std::{
5809 cell::RefCell,
5810 collections::{BTreeMap, BTreeSet},
5811 };
5812
5813 use iota_framework::{BuiltInFramework, SystemPackage};
5814 use iota_types::{
5815 base_types::{AuthorityName, ObjectID},
5816 is_system_package,
5817 };
5818 use move_binary_format::CompiledModule;
5819
5820 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5821
5822 thread_local! {
5824 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5825 }
5826
5827 type Framework = Vec<CompiledModule>;
5828
5829 pub type PackageUpgradeCallback =
5830 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5831
5832 enum PackageOverrideConfig {
5833 Global(Framework),
5834 PerValidator(PackageUpgradeCallback),
5835 }
5836
5837 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5838 modules
5839 .iter()
5840 .map(|m| {
5841 let mut buf = Vec::new();
5842 m.serialize_with_version(m.version, &mut buf).unwrap();
5843 buf
5844 })
5845 .collect()
5846 }
5847
5848 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5849 OVERRIDE.with(|bs| {
5850 bs.borrow_mut()
5851 .insert(package_id, PackageOverrideConfig::Global(modules))
5852 });
5853 }
5854
5855 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5856 OVERRIDE.with(|bs| {
5857 bs.borrow_mut()
5858 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5859 });
5860 }
5861
5862 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5863 OVERRIDE.with(|cfg| {
5864 cfg.borrow().get(package_id).and_then(|entry| match entry {
5865 PackageOverrideConfig::Global(framework) => {
5866 Some(compiled_modules_to_bytes(framework))
5867 }
5868 PackageOverrideConfig::PerValidator(func) => {
5869 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5870 }
5871 })
5872 })
5873 }
5874
5875 pub fn get_override_modules(
5876 package_id: &ObjectID,
5877 name: AuthorityName,
5878 ) -> Option<Vec<CompiledModule>> {
5879 OVERRIDE.with(|cfg| {
5880 cfg.borrow().get(package_id).and_then(|entry| match entry {
5881 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5882 PackageOverrideConfig::PerValidator(func) => func(name),
5883 })
5884 })
5885 }
5886
5887 pub fn get_override_system_package(
5888 package_id: &ObjectID,
5889 name: AuthorityName,
5890 ) -> Option<SystemPackage> {
5891 let bytes = get_override_bytes(package_id, name)?;
5892 let dependencies = if is_system_package(*package_id) {
5893 BuiltInFramework::get_package_by_id(package_id)
5894 .dependencies
5895 .to_vec()
5896 } else {
5897 BuiltInFramework::all_package_ids()
5900 };
5901 Some(SystemPackage {
5902 id: *package_id,
5903 bytes,
5904 dependencies,
5905 })
5906 }
5907
5908 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5909 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5910 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5911 cfg.borrow()
5912 .keys()
5913 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5914 .collect()
5915 });
5916
5917 extra
5918 .into_iter()
5919 .map(|package| SystemPackage {
5920 id: package,
5921 bytes: get_override_bytes(&package, name).unwrap(),
5922 dependencies: BuiltInFramework::all_package_ids(),
5923 })
5924 .collect()
5925 }
5926}
5927
5928#[derive(Debug, Serialize, Deserialize, Clone)]
5929pub struct ObjDumpFormat {
5930 pub id: ObjectID,
5931 pub version: VersionNumber,
5932 pub digest: ObjectDigest,
5933 pub object: Object,
5934}
5935
5936impl ObjDumpFormat {
5937 fn new(object: Object) -> Self {
5938 let oref = object.compute_object_reference();
5939 Self {
5940 id: oref.0,
5941 version: oref.1,
5942 digest: oref.2,
5943 object,
5944 }
5945 }
5946}
5947
5948#[derive(Debug, Serialize, Deserialize, Clone)]
5949pub struct NodeStateDump {
5950 pub tx_digest: TransactionDigest,
5951 pub sender_signed_data: SenderSignedData,
5952 pub executed_epoch: u64,
5953 pub reference_gas_price: u64,
5954 pub protocol_version: u64,
5955 pub epoch_start_timestamp_ms: u64,
5956 pub computed_effects: TransactionEffects,
5957 pub expected_effects_digest: TransactionEffectsDigest,
5958 pub relevant_system_packages: Vec<ObjDumpFormat>,
5959 pub shared_objects: Vec<ObjDumpFormat>,
5960 pub loaded_child_objects: Vec<ObjDumpFormat>,
5961 pub modified_at_versions: Vec<ObjDumpFormat>,
5962 pub runtime_reads: Vec<ObjDumpFormat>,
5963 pub input_objects: Vec<ObjDumpFormat>,
5964}
5965
5966impl NodeStateDump {
5967 pub fn new(
5968 tx_digest: &TransactionDigest,
5969 effects: &TransactionEffects,
5970 expected_effects_digest: TransactionEffectsDigest,
5971 object_store: &dyn ObjectStore,
5972 epoch_store: &Arc<AuthorityPerEpochStore>,
5973 inner_temporary_store: &InnerTemporaryStore,
5974 certificate: &VerifiedExecutableTransaction,
5975 ) -> IotaResult<Self> {
5976 let executed_epoch = epoch_store.epoch();
5978 let reference_gas_price = epoch_store.reference_gas_price();
5979 let epoch_start_config = epoch_store.epoch_start_config();
5980 let protocol_version = epoch_store.protocol_version().as_u64();
5981 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5982
5983 let mut relevant_system_packages = Vec::new();
5985 for sys_package_id in BuiltInFramework::all_package_ids() {
5986 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5987 relevant_system_packages.push(ObjDumpFormat::new(w))
5988 }
5989 }
5990
5991 let mut shared_objects = Vec::new();
5993 for kind in effects.input_shared_objects() {
5994 match kind {
5995 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5996 if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5997 shared_objects.push(ObjDumpFormat::new(w))
5998 }
5999 }
6000 InputSharedObject::ReadDeleted(..)
6001 | InputSharedObject::MutateDeleted(..)
6002 | InputSharedObject::Cancelled(..) => (), }
6005 }
6006
6007 let mut loaded_child_objects = Vec::new();
6010 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6011 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6012 loaded_child_objects.push(ObjDumpFormat::new(w))
6013 }
6014 }
6015
6016 let mut modified_at_versions = Vec::new();
6018 for (id, ver) in effects.modified_at_versions() {
6019 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6020 modified_at_versions.push(ObjDumpFormat::new(w))
6021 }
6022 }
6023
6024 let mut runtime_reads = Vec::new();
6028 for obj in inner_temporary_store
6029 .runtime_packages_loaded_from_db
6030 .values()
6031 {
6032 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6033 }
6034
6035 Ok(Self {
6038 tx_digest: *tx_digest,
6039 executed_epoch,
6040 reference_gas_price,
6041 epoch_start_timestamp_ms,
6042 protocol_version,
6043 relevant_system_packages,
6044 shared_objects,
6045 loaded_child_objects,
6046 modified_at_versions,
6047 runtime_reads,
6048 sender_signed_data: certificate.clone().into_message(),
6049 input_objects: inner_temporary_store
6050 .input_objects
6051 .values()
6052 .map(|o| ObjDumpFormat::new(o.clone()))
6053 .collect(),
6054 computed_effects: effects.clone(),
6055 expected_effects_digest,
6056 })
6057 }
6058
6059 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6060 let mut objects = Vec::new();
6061 objects.extend(self.relevant_system_packages.clone());
6062 objects.extend(self.shared_objects.clone());
6063 objects.extend(self.loaded_child_objects.clone());
6064 objects.extend(self.modified_at_versions.clone());
6065 objects.extend(self.runtime_reads.clone());
6066 objects.extend(self.input_objects.clone());
6067 objects
6068 }
6069
6070 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6071 let file_name = format!(
6072 "{}_{}_NODE_DUMP.json",
6073 self.tx_digest,
6074 AuthorityState::unixtime_now_ms()
6075 );
6076 let mut path = path.to_path_buf();
6077 path.push(&file_name);
6078 let mut file = File::create(path.clone())?;
6079 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6080 Ok(path)
6081 }
6082
6083 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6084 let file = File::open(path)?;
6085 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6086 }
6087}