1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49 key_value_store::{
50 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51 },
52 key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57 IOTA_SYSTEM_ADDRESS, TypeTag,
58 account_abstraction::{
59 account::AuthenticatorFunctionRefV1Key,
60 authenticator_function::{
61 AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62 AuthenticatorFunctionRefV1,
63 },
64 },
65 authenticator_state::get_authenticator_state,
66 base_types::*,
67 committee::{Committee, EpochId, ProtocolVersion},
68 crypto::{
69 AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70 default_hash,
71 },
72 deny_list_v1::check_coin_deny_list_v1_during_signing,
73 digests::{ChainIdentifier, Digest, TransactionEventsDigest},
74 dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75 effects::{
76 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77 TransactionEvents, VerifiedSignedTransactionEffects,
78 },
79 error::{ExecutionError, IotaError, IotaResult, UserInputError},
80 event::{Event, EventID, SystemEpochInfoEvent},
81 executable_transaction::VerifiedExecutableTransaction,
82 execution_config_utils::to_binary_config,
83 execution_status::ExecutionStatus,
84 fp_ensure,
85 gas::{GasCostSummary, IotaGasStatus},
86 gas_coin::NANOS_PER_IOTA,
87 inner_temporary_store::{
88 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89 WrittenObjects,
90 },
91 iota_system_state::{
92 IotaSystemState, IotaSystemStateTrait,
93 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94 },
95 is_system_package,
96 layout_resolver::{LayoutResolver, into_struct_layout},
97 message_envelope::Message,
98 messages_checkpoint::{
99 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103 },
104 messages_consensus::AuthorityCapabilitiesV1,
105 messages_grpc::{
106 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108 TransactionStatus,
109 },
110 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111 move_authenticator::MoveAuthenticator,
112 object::{
113 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114 bounded_visitor::BoundedVisitor,
115 },
116 storage::{
117 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118 },
119 supported_protocol_versions::{
120 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121 },
122 transaction::*,
123 transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131 register_histogram_vec_with_registry, register_histogram_with_registry,
132 register_int_counter_vec_with_registry, register_int_counter_with_registry,
133 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139 task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152 authority::{
153 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157 authority_store_tables::AuthorityPrunerTables,
158 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159 },
160 authority_client::NetworkAuthorityClient,
161 checkpoints::CheckpointStore,
162 congestion_tracker::CongestionTracker,
163 consensus_adapter::ConsensusAdapter,
164 epoch::committee_store::CommitteeStore,
165 execution_cache::{
166 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168 TransactionCacheRead,
169 },
170 execution_driver::execution_process,
171 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
172 metrics::{LatencyObserver, RateTracker},
173 module_cache_metrics::ResolverMetrics,
174 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
175 rest_index::RestIndexStore,
176 stake_aggregator::StakeAggregator,
177 state_accumulator::{AccumulatorStore, StateAccumulator},
178 subscription_handler::SubscriptionHandler,
179 transaction_input_loader::TransactionInputLoader,
180 transaction_manager::TransactionManager,
181 transaction_outputs::TransactionOutputs,
182 validator_tx_finalizer::ValidatorTxFinalizer,
183 verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223pub mod authority_store_pruner;
224pub mod authority_store_tables;
225pub mod authority_store_types;
226pub mod epoch_start_configuration;
227pub mod shared_object_congestion_tracker;
228pub mod shared_object_version_manager;
229pub mod suggested_gas_price_calculator;
230pub mod test_authority_builder;
231pub mod transaction_deferral;
232
233pub(crate) mod authority_store;
234pub mod backpressure;
235
236pub struct AuthorityMetrics {
238 tx_orders: IntCounter,
239 total_certs: IntCounter,
240 total_cert_attempts: IntCounter,
241 total_effects: IntCounter,
242 pub shared_obj_tx: IntCounter,
243 sponsored_tx: IntCounter,
244 tx_already_processed: IntCounter,
245 num_input_objs: Histogram,
246 num_shared_objects: Histogram,
247 batch_size: Histogram,
248
249 authority_state_handle_transaction_latency: Histogram,
250
251 execute_certificate_latency_single_writer: Histogram,
252 execute_certificate_latency_shared_object: Histogram,
253
254 internal_execution_latency: Histogram,
255 execution_load_input_objects_latency: Histogram,
256 prepare_certificate_latency: Histogram,
257 commit_certificate_latency: Histogram,
258 db_checkpoint_latency: Histogram,
259
260 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
261 pub(crate) transaction_manager_num_missing_objects: IntGauge,
262 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
263 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
264 pub(crate) transaction_manager_num_ready: IntGauge,
265 pub(crate) transaction_manager_object_cache_size: IntGauge,
266 pub(crate) transaction_manager_object_cache_hits: IntCounter,
267 pub(crate) transaction_manager_object_cache_misses: IntCounter,
268 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
269 pub(crate) transaction_manager_package_cache_size: IntGauge,
270 pub(crate) transaction_manager_package_cache_hits: IntCounter,
271 pub(crate) transaction_manager_package_cache_misses: IntCounter,
272 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
273 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
274
275 pub(crate) execution_driver_executed_transactions: IntCounter,
276 pub(crate) execution_driver_dispatch_queue: IntGauge,
277 pub(crate) execution_queueing_delay_s: Histogram,
278 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
279 pub(crate) execution_gas_latency_ratio: Histogram,
280
281 pub(crate) skipped_consensus_txns: IntCounter,
282 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
283
284 pub(crate) authority_overload_status: IntGauge,
285 pub(crate) authority_load_shedding_percentage: IntGauge,
286
287 pub(crate) transaction_overload_sources: IntCounterVec,
288
289 post_processing_total_events_emitted: IntCounter,
291 post_processing_total_tx_indexed: IntCounter,
292 post_processing_total_tx_had_event_processed: IntCounter,
293 post_processing_total_failures: IntCounter,
294
295 pub consensus_handler_processed: IntCounterVec,
297 pub consensus_handler_transaction_sizes: HistogramVec,
298 pub consensus_handler_num_low_scoring_authorities: IntGauge,
299 pub consensus_handler_scores: IntGaugeVec,
300 pub consensus_handler_deferred_transactions: IntCounter,
301 pub consensus_handler_congested_transactions: IntCounter,
302 pub consensus_handler_cancelled_transactions: IntCounter,
303 pub consensus_handler_max_object_costs: IntGaugeVec,
304 pub consensus_committed_subdags: IntCounterVec,
305 pub consensus_committed_messages: IntGaugeVec,
306 pub consensus_committed_user_transactions: IntGaugeVec,
307 pub consensus_handler_leader_round: IntGauge,
308 pub consensus_calculated_throughput: IntGauge,
309 pub consensus_calculated_throughput_profile: IntGauge,
310
311 pub limits_metrics: Arc<LimitsMetrics>,
312
313 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
315
316 pub zklogin_sig_count: IntCounter,
318 pub multisig_sig_count: IntCounter,
320
321 pub execution_queueing_latency: LatencyObserver,
324
325 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
332
333 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
336}
337
338const POSITIVE_INT_BUCKETS: &[f64] = &[
340 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
341 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
342 7000000., 10000000.,
343];
344
345const LATENCY_SEC_BUCKETS: &[f64] = &[
346 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
347 10., 20., 30., 60., 90.,
348];
349
350const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
352 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
353 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
354];
355
356const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
357 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
358 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
359];
360
361pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
365 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
366 let execute_certificate_latency = register_histogram_vec_with_registry!(
367 "authority_state_execute_certificate_latency",
368 "Latency of executing certificates, including waiting for inputs",
369 &["tx_type"],
370 LATENCY_SEC_BUCKETS.to_vec(),
371 registry,
372 )
373 .unwrap();
374
375 let execute_certificate_latency_single_writer =
376 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
377 let execute_certificate_latency_shared_object =
378 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
379
380 Self {
381 tx_orders: register_int_counter_with_registry!(
382 "total_transaction_orders",
383 "Total number of transaction orders",
384 registry,
385 )
386 .unwrap(),
387 total_certs: register_int_counter_with_registry!(
388 "total_transaction_certificates",
389 "Total number of transaction certificates handled",
390 registry,
391 )
392 .unwrap(),
393 total_cert_attempts: register_int_counter_with_registry!(
394 "total_handle_certificate_attempts",
395 "Number of calls to handle_certificate",
396 registry,
397 )
398 .unwrap(),
399 total_effects: register_int_counter_with_registry!(
401 "total_transaction_effects",
402 "Total number of transaction effects produced",
403 registry,
404 )
405 .unwrap(),
406
407 shared_obj_tx: register_int_counter_with_registry!(
408 "num_shared_obj_tx",
409 "Number of transactions involving shared objects",
410 registry,
411 )
412 .unwrap(),
413
414 sponsored_tx: register_int_counter_with_registry!(
415 "num_sponsored_tx",
416 "Number of sponsored transactions",
417 registry,
418 )
419 .unwrap(),
420
421 tx_already_processed: register_int_counter_with_registry!(
422 "num_tx_already_processed",
423 "Number of transaction orders already processed previously",
424 registry,
425 )
426 .unwrap(),
427 num_input_objs: register_histogram_with_registry!(
428 "num_input_objects",
429 "Distribution of number of input TX objects per TX",
430 POSITIVE_INT_BUCKETS.to_vec(),
431 registry,
432 )
433 .unwrap(),
434 num_shared_objects: register_histogram_with_registry!(
435 "num_shared_objects",
436 "Number of shared input objects per TX",
437 POSITIVE_INT_BUCKETS.to_vec(),
438 registry,
439 )
440 .unwrap(),
441 batch_size: register_histogram_with_registry!(
442 "batch_size",
443 "Distribution of size of transaction batch",
444 POSITIVE_INT_BUCKETS.to_vec(),
445 registry,
446 )
447 .unwrap(),
448 authority_state_handle_transaction_latency: register_histogram_with_registry!(
449 "authority_state_handle_transaction_latency",
450 "Latency of handling transactions",
451 LATENCY_SEC_BUCKETS.to_vec(),
452 registry,
453 )
454 .unwrap(),
455 execute_certificate_latency_single_writer,
456 execute_certificate_latency_shared_object,
457 internal_execution_latency: register_histogram_with_registry!(
458 "authority_state_internal_execution_latency",
459 "Latency of actual certificate executions",
460 LATENCY_SEC_BUCKETS.to_vec(),
461 registry,
462 )
463 .unwrap(),
464 execution_load_input_objects_latency: register_histogram_with_registry!(
465 "authority_state_execution_load_input_objects_latency",
466 "Latency of loading input objects for execution",
467 LOW_LATENCY_SEC_BUCKETS.to_vec(),
468 registry,
469 )
470 .unwrap(),
471 prepare_certificate_latency: register_histogram_with_registry!(
472 "authority_state_prepare_certificate_latency",
473 "Latency of executing certificates, before committing the results",
474 LATENCY_SEC_BUCKETS.to_vec(),
475 registry,
476 )
477 .unwrap(),
478 commit_certificate_latency: register_histogram_with_registry!(
479 "authority_state_commit_certificate_latency",
480 "Latency of committing certificate execution results",
481 LATENCY_SEC_BUCKETS.to_vec(),
482 registry,
483 )
484 .unwrap(),
485 db_checkpoint_latency: register_histogram_with_registry!(
486 "db_checkpoint_latency",
487 "Latency of checkpointing dbs",
488 LATENCY_SEC_BUCKETS.to_vec(),
489 registry,
490 ).unwrap(),
491 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
492 "transaction_manager_num_enqueued_certificates",
493 "Current number of certificates enqueued to TransactionManager",
494 &["result"],
495 registry,
496 )
497 .unwrap(),
498 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
499 "transaction_manager_num_missing_objects",
500 "Current number of missing objects in TransactionManager",
501 registry,
502 )
503 .unwrap(),
504 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
505 "transaction_manager_num_pending_certificates",
506 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
507 registry,
508 )
509 .unwrap(),
510 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
511 "transaction_manager_num_executing_certificates",
512 "Number of executing certificates, including queued and actually running certificates",
513 registry,
514 )
515 .unwrap(),
516 transaction_manager_num_ready: register_int_gauge_with_registry!(
517 "transaction_manager_num_ready",
518 "Number of ready transactions in TransactionManager",
519 registry,
520 )
521 .unwrap(),
522 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
523 "transaction_manager_object_cache_size",
524 "Current size of object-availability cache in TransactionManager",
525 registry,
526 )
527 .unwrap(),
528 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
529 "transaction_manager_object_cache_hits",
530 "Number of object-availability cache hits in TransactionManager",
531 registry,
532 )
533 .unwrap(),
534 authority_overload_status: register_int_gauge_with_registry!(
535 "authority_overload_status",
536 "Whether authority is current experiencing overload and enters load shedding mode.",
537 registry)
538 .unwrap(),
539 authority_load_shedding_percentage: register_int_gauge_with_registry!(
540 "authority_load_shedding_percentage",
541 "The percentage of transactions is shed when the authority is in load shedding mode.",
542 registry)
543 .unwrap(),
544 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
545 "transaction_manager_object_cache_misses",
546 "Number of object-availability cache misses in TransactionManager",
547 registry,
548 )
549 .unwrap(),
550 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
551 "transaction_manager_object_cache_evictions",
552 "Number of object-availability cache evictions in TransactionManager",
553 registry,
554 )
555 .unwrap(),
556 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
557 "transaction_manager_package_cache_size",
558 "Current size of package-availability cache in TransactionManager",
559 registry,
560 )
561 .unwrap(),
562 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
563 "transaction_manager_package_cache_hits",
564 "Number of package-availability cache hits in TransactionManager",
565 registry,
566 )
567 .unwrap(),
568 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
569 "transaction_manager_package_cache_misses",
570 "Number of package-availability cache misses in TransactionManager",
571 registry,
572 )
573 .unwrap(),
574 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
575 "transaction_manager_package_cache_evictions",
576 "Number of package-availability cache evictions in TransactionManager",
577 registry,
578 )
579 .unwrap(),
580 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
581 "transaction_manager_transaction_queue_age_s",
582 "Time spent in waiting for transaction in the queue",
583 LATENCY_SEC_BUCKETS.to_vec(),
584 registry,
585 )
586 .unwrap(),
587 transaction_overload_sources: register_int_counter_vec_with_registry!(
588 "transaction_overload_sources",
589 "Number of times each source indicates transaction overload.",
590 &["source"],
591 registry)
592 .unwrap(),
593 execution_driver_executed_transactions: register_int_counter_with_registry!(
594 "execution_driver_executed_transactions",
595 "Cumulative number of transaction executed by execution driver",
596 registry,
597 )
598 .unwrap(),
599 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
600 "execution_driver_dispatch_queue",
601 "Number of transaction pending in execution driver dispatch queue",
602 registry,
603 )
604 .unwrap(),
605 execution_queueing_delay_s: register_histogram_with_registry!(
606 "execution_queueing_delay_s",
607 "Queueing delay between a transaction is ready for execution until it starts executing.",
608 LATENCY_SEC_BUCKETS.to_vec(),
609 registry
610 )
611 .unwrap(),
612 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
613 "prepare_cert_gas_latency_ratio",
614 "The ratio of computation gas divided by VM execution latency.",
615 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
616 registry
617 )
618 .unwrap(),
619 execution_gas_latency_ratio: register_histogram_with_registry!(
620 "execution_gas_latency_ratio",
621 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
622 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
623 registry
624 )
625 .unwrap(),
626 skipped_consensus_txns: register_int_counter_with_registry!(
627 "skipped_consensus_txns",
628 "Total number of consensus transactions skipped",
629 registry,
630 )
631 .unwrap(),
632 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
633 "skipped_consensus_txns_cache_hit",
634 "Total number of consensus transactions skipped because of local cache hit",
635 registry,
636 )
637 .unwrap(),
638 post_processing_total_events_emitted: register_int_counter_with_registry!(
639 "post_processing_total_events_emitted",
640 "Total number of events emitted in post processing",
641 registry,
642 )
643 .unwrap(),
644 post_processing_total_tx_indexed: register_int_counter_with_registry!(
645 "post_processing_total_tx_indexed",
646 "Total number of txes indexed in post processing",
647 registry,
648 )
649 .unwrap(),
650 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
651 "post_processing_total_tx_had_event_processed",
652 "Total number of txes finished event processing in post processing",
653 registry,
654 )
655 .unwrap(),
656 post_processing_total_failures: register_int_counter_with_registry!(
657 "post_processing_total_failures",
658 "Total number of failure in post processing",
659 registry,
660 )
661 .unwrap(),
662 consensus_handler_processed: register_int_counter_vec_with_registry!(
663 "consensus_handler_processed",
664 "Number of transactions processed by consensus handler",
665 &["class"],
666 registry
667 ).unwrap(),
668 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
669 "consensus_handler_transaction_sizes",
670 "Sizes of each type of transactions processed by consensus handler",
671 &["class"],
672 POSITIVE_INT_BUCKETS.to_vec(),
673 registry
674 ).unwrap(),
675 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
676 "consensus_handler_num_low_scoring_authorities",
677 "Number of low scoring authorities based on reputation scores from consensus",
678 registry
679 ).unwrap(),
680 consensus_handler_scores: register_int_gauge_vec_with_registry!(
681 "consensus_handler_scores",
682 "scores from consensus for each authority",
683 &["authority"],
684 registry,
685 ).unwrap(),
686 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
687 "consensus_handler_deferred_transactions",
688 "Number of transactions deferred by consensus handler",
689 registry,
690 ).unwrap(),
691 consensus_handler_congested_transactions: register_int_counter_with_registry!(
692 "consensus_handler_congested_transactions",
693 "Number of transactions deferred by consensus handler due to congestion",
694 registry,
695 ).unwrap(),
696 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
697 "consensus_handler_cancelled_transactions",
698 "Number of transactions cancelled by consensus handler",
699 registry,
700 ).unwrap(),
701 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
702 "consensus_handler_max_congestion_control_object_costs",
703 "Max object costs for congestion control in the current consensus commit",
704 &["commit_type"],
705 registry,
706 ).unwrap(),
707 consensus_committed_subdags: register_int_counter_vec_with_registry!(
708 "consensus_committed_subdags",
709 "Number of committed subdags, sliced by leader",
710 &["authority"],
711 registry,
712 ).unwrap(),
713 consensus_committed_messages: register_int_gauge_vec_with_registry!(
714 "consensus_committed_messages",
715 "Total number of committed consensus messages, sliced by author",
716 &["authority"],
717 registry,
718 ).unwrap(),
719 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
720 "consensus_committed_user_transactions",
721 "Number of committed user transactions, sliced by submitter",
722 &["authority"],
723 registry,
724 ).unwrap(),
725 consensus_handler_leader_round: register_int_gauge_with_registry!(
726 "consensus_handler_leader_round",
727 "The leader round of the current consensus output being processed in the consensus handler",
728 registry,
729 ).unwrap(),
730 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
731 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
732 zklogin_sig_count: register_int_counter_with_registry!(
733 "zklogin_sig_count",
734 "Count of zkLogin signatures",
735 registry,
736 )
737 .unwrap(),
738 multisig_sig_count: register_int_counter_with_registry!(
739 "multisig_sig_count",
740 "Count of zkLogin signatures",
741 registry,
742 )
743 .unwrap(),
744 consensus_calculated_throughput: register_int_gauge_with_registry!(
745 "consensus_calculated_throughput",
746 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
747 registry,
748 ).unwrap(),
749 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
750 "consensus_calculated_throughput_profile",
751 "The current active calculated throughput profile",
752 registry
753 ).unwrap(),
754 execution_queueing_latency: LatencyObserver::new(),
755 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
756 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757 }
758 }
759
760 pub fn reset_on_reconfigure(&self) {
764 self.consensus_committed_messages.reset();
765 self.consensus_handler_scores.reset();
766 self.consensus_committed_user_transactions.reset();
767 }
768}
769
770pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
777
778pub struct AuthorityState {
779 pub name: AuthorityName,
782 pub secret: StableSyncAuthoritySigner,
784
785 input_loader: TransactionInputLoader,
787 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
788
789 epoch_store: ArcSwap<AuthorityPerEpochStore>,
790
791 execution_lock: RwLock<EpochId>,
797
798 pub indexes: Option<Arc<IndexStore>>,
799 pub rest_index: Option<Arc<RestIndexStore>>,
800
801 pub subscription_handler: Arc<SubscriptionHandler>,
802 pub checkpoint_store: Arc<CheckpointStore>,
803
804 committee_store: Arc<CommitteeStore>,
805
806 transaction_manager: Arc<TransactionManager>,
808
809 #[cfg_attr(not(test), expect(unused))]
811 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
812
813 pub metrics: Arc<AuthorityMetrics>,
814 _pruner: AuthorityStorePruner,
815 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
816
817 db_checkpoint_config: DBCheckpointConfig,
819
820 pub config: NodeConfig,
821
822 pub overload_info: AuthorityOverloadInfo,
824
825 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
826
827 chain_identifier: ChainIdentifier,
830
831 pub(crate) congestion_tracker: Arc<CongestionTracker>,
832}
833
834impl AuthorityState {
843 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
844 epoch_store.committee().authority_exists(&self.name)
845 }
846
847 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
848 epoch_store
849 .active_validators()
850 .iter()
851 .any(|a| AuthorityName::from(a) == self.name)
852 }
853
854 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
855 !self.is_committee_validator(epoch_store)
856 }
857
858 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
859 &self.committee_store
860 }
861
862 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
863 self.committee_store.clone()
864 }
865
866 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
867 &self.config.authority_overload_config
868 }
869
870 pub fn get_epoch_state_commitments(
871 &self,
872 epoch: EpochId,
873 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
874 self.checkpoint_store.get_epoch_state_commitments(epoch)
875 }
876
877 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
881 async fn handle_transaction_impl(
882 &self,
883 transaction: VerifiedTransaction,
884 epoch_store: &Arc<AuthorityPerEpochStore>,
885 ) -> IotaResult<VerifiedSignedTransaction> {
886 let _execution_lock = self.execution_lock_for_signing();
888
889 let protocol_config = epoch_store.protocol_config();
890 let reference_gas_price = epoch_store.reference_gas_price();
891
892 let epoch = epoch_store.epoch();
893
894 let tx_data = transaction.data().transaction_data();
895
896 iota_transaction_checks::deny::check_transaction_for_signing(
900 tx_data,
901 transaction.tx_signatures(),
902 &transaction.input_objects()?,
903 &tx_data.receiving_objects(),
904 &self.config.transaction_deny_config,
905 self.get_backing_package_store().as_ref(),
906 )?;
907
908 let (tx_input_objects, tx_receiving_objects, auth_input_objects, account_object) =
913 self.read_objects_for_signing(&transaction, epoch)?;
914
915 let move_authenticator = transaction.sender_move_authenticator();
919
920 let (
926 gas_status,
927 tx_checked_input_objects,
928 auth_checked_input_objects,
929 authenticator_function_ref,
930 ) = self.check_transaction_inputs_for_signing(
931 protocol_config,
932 reference_gas_price,
933 tx_data,
934 tx_input_objects,
935 &tx_receiving_objects,
936 move_authenticator,
937 auth_input_objects,
938 account_object,
939 )?;
940
941 check_coin_deny_list_v1_during_signing(
942 tx_data.sender(),
943 &tx_checked_input_objects,
944 &tx_receiving_objects,
945 &auth_checked_input_objects,
946 &self.get_object_store(),
947 )?;
948
949 if let Some(move_authenticator) = move_authenticator {
950 let auth_checked_input_objects = auth_checked_input_objects
954 .expect("MoveAuthenticator input objects must be provided");
955 let authenticator_function_ref = authenticator_function_ref
956 .expect("AuthenticatorFunctionRefV1 object must be provided");
957
958 let (kind, signer, _) = tx_data.execution_parts();
959
960 let validation_result = epoch_store.executor().authenticate_transaction(
962 self.get_backing_store().as_ref(),
963 protocol_config,
964 self.metrics.limits_metrics.clone(),
965 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
966 epoch_store
967 .epoch_start_config()
968 .epoch_data()
969 .epoch_start_timestamp(),
970 gas_status,
971 move_authenticator.to_owned(),
972 authenticator_function_ref,
973 auth_checked_input_objects,
974 kind,
975 signer,
976 transaction.digest().to_owned(),
977 &mut None,
978 );
979
980 if let Err(validation_error) = validation_result {
981 return Err(IotaError::MoveAuthenticatorExecutionFailure {
982 error: validation_error.to_string(),
983 });
984 }
985 }
986
987 let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
988
989 let signed_transaction =
990 VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
991
992 self.get_cache_writer().try_acquire_transaction_locks(
997 epoch_store,
998 &owned_objects,
999 signed_transaction.clone(),
1000 )?;
1001
1002 Ok(signed_transaction)
1003 }
1004
1005 #[instrument(name = "handle_transaction", level = "trace", skip_all)]
1007 pub async fn handle_transaction(
1008 &self,
1009 epoch_store: &Arc<AuthorityPerEpochStore>,
1010 transaction: VerifiedTransaction,
1011 ) -> IotaResult<HandleTransactionResponse> {
1012 let tx_digest = *transaction.digest();
1013 debug!("handle_transaction");
1014
1015 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1017 return Ok(HandleTransactionResponse { status });
1018 }
1019
1020 let _metrics_guard = self
1021 .metrics
1022 .authority_state_handle_transaction_latency
1023 .start_timer();
1024 self.metrics.tx_orders.inc();
1025
1026 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1027 match signed {
1028 Ok(s) => {
1029 if self.is_committee_validator(epoch_store) {
1030 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1031 let tx = s.clone();
1032 let validator_tx_finalizer = validator_tx_finalizer.clone();
1033 let cache_reader = self.get_transaction_cache_reader().clone();
1034 let epoch_store = epoch_store.clone();
1035 spawn_monitored_task!(epoch_store.within_alive_epoch(
1036 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1037 ));
1038 }
1039 }
1040 Ok(HandleTransactionResponse {
1041 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1042 })
1043 }
1044 Err(err) => Ok(HandleTransactionResponse {
1048 status: self
1049 .get_transaction_status(&tx_digest, epoch_store)?
1050 .ok_or(err)?
1051 .1,
1052 }),
1053 }
1054 }
1055
1056 pub fn check_system_overload_at_signing(&self) -> bool {
1057 self.config
1058 .authority_overload_config
1059 .check_system_overload_at_signing
1060 }
1061
1062 pub fn check_system_overload_at_execution(&self) -> bool {
1063 self.config
1064 .authority_overload_config
1065 .check_system_overload_at_execution
1066 }
1067
1068 pub(crate) fn check_system_overload(
1069 &self,
1070 consensus_adapter: &Arc<ConsensusAdapter>,
1071 tx_data: &SenderSignedData,
1072 do_authority_overload_check: bool,
1073 ) -> IotaResult {
1074 if do_authority_overload_check {
1075 self.check_authority_overload(tx_data).tap_err(|_| {
1076 self.update_overload_metrics("execution_queue");
1077 })?;
1078 }
1079 self.transaction_manager
1080 .check_execution_overload(self.overload_config(), tx_data)
1081 .tap_err(|_| {
1082 self.update_overload_metrics("execution_pending");
1083 })?;
1084 consensus_adapter.check_consensus_overload().tap_err(|_| {
1085 self.update_overload_metrics("consensus");
1086 })?;
1087
1088 let pending_tx_count = self
1089 .get_cache_commit()
1090 .approximate_pending_transaction_count();
1091 if pending_tx_count
1092 > self
1093 .config
1094 .execution_cache_config
1095 .writeback_cache
1096 .backpressure_threshold_for_rpc()
1097 {
1098 return Err(IotaError::ValidatorOverloadedRetryAfter {
1099 retry_after_secs: 10,
1100 });
1101 }
1102
1103 Ok(())
1104 }
1105
1106 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1107 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1108 return Ok(());
1109 }
1110
1111 let load_shedding_percentage = self
1112 .overload_info
1113 .load_shedding_percentage
1114 .load(Ordering::Relaxed);
1115 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1116 }
1117
1118 fn update_overload_metrics(&self, source: &str) {
1119 self.metrics
1120 .transaction_overload_sources
1121 .with_label_values(&[source])
1122 .inc();
1123 }
1124
1125 #[instrument(level = "trace", skip_all)]
1127 pub async fn execute_certificate(
1128 &self,
1129 certificate: &VerifiedCertificate,
1130 epoch_store: &Arc<AuthorityPerEpochStore>,
1131 ) -> IotaResult<TransactionEffects> {
1132 let _metrics_guard = if certificate.contains_shared_object() {
1133 self.metrics
1134 .execute_certificate_latency_shared_object
1135 .start_timer()
1136 } else {
1137 self.metrics
1138 .execute_certificate_latency_single_writer
1139 .start_timer()
1140 };
1141 trace!("execute_certificate");
1142
1143 self.metrics.total_cert_attempts.inc();
1144
1145 if !certificate.contains_shared_object() {
1146 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1151 }
1152
1153 epoch_store
1156 .within_alive_epoch(self.notify_read_effects(certificate))
1157 .await
1158 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1159 .and_then(|r| r)
1160 }
1161
1162 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1177 pub fn try_execute_immediately(
1178 &self,
1179 certificate: &VerifiedExecutableTransaction,
1180 expected_effects_digest: Option<TransactionEffectsDigest>,
1181 epoch_store: &Arc<AuthorityPerEpochStore>,
1182 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1183 let _scope = monitored_scope("Execution::try_execute_immediately");
1184 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1185
1186 let tx_digest = certificate.digest();
1187
1188 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1190
1191 if let Some(effects) = self
1194 .get_transaction_cache_reader()
1195 .try_get_executed_effects(tx_digest)?
1196 {
1197 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1198 assert_eq!(
1199 effects.digest(),
1200 expected_effects_digest_inner,
1201 "Unexpected effects digest for transaction {tx_digest:?}"
1202 );
1203 }
1204 tx_guard.release();
1205 return Ok((effects, None));
1206 }
1207
1208 let (tx_input_objects, authenticator_input_objects, account_object) =
1209 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1210
1211 let expected_effects_digest =
1217 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1218
1219 self.process_certificate(
1220 tx_guard,
1221 certificate,
1222 tx_input_objects,
1223 account_object,
1224 authenticator_input_objects,
1225 expected_effects_digest,
1226 epoch_store,
1227 )
1228 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1229 .tap_ok(
1230 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1231 )
1232 }
1233
1234 pub fn read_objects_for_execution(
1235 &self,
1236 tx_lock: &CertLockGuard,
1237 certificate: &VerifiedExecutableTransaction,
1238 epoch_store: &Arc<AuthorityPerEpochStore>,
1239 ) -> IotaResult<(InputObjects, Option<InputObjects>, Option<ObjectReadResult>)> {
1240 let _scope = monitored_scope("Execution::load_input_objects");
1241 let _metrics_guard = self
1242 .metrics
1243 .execution_load_input_objects_latency
1244 .start_timer();
1245
1246 let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1247
1248 let input_objects = self.input_loader.read_objects_for_execution(
1249 epoch_store,
1250 &certificate.key(),
1251 tx_lock,
1252 &input_objects,
1253 epoch_store.epoch(),
1254 )?;
1255
1256 certificate.split_input_objects_into_groups_for_reading(input_objects)
1257 }
1258
1259 pub fn try_execute_for_test(
1263 &self,
1264 certificate: &VerifiedCertificate,
1265 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1266 let epoch_store = self.epoch_store_for_testing();
1267 let (effects, execution_error_opt) = self.try_execute_immediately(
1268 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1269 None,
1270 &epoch_store,
1271 )?;
1272 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1273 Ok((signed_effects, execution_error_opt))
1274 }
1275
1276 pub fn execute_for_test(
1278 &self,
1279 certificate: &VerifiedCertificate,
1280 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1281 self.try_execute_for_test(certificate)
1282 .expect("try_execute_for_test should not fail")
1283 }
1284
1285 pub async fn notify_read_effects(
1286 &self,
1287 certificate: &VerifiedCertificate,
1288 ) -> IotaResult<TransactionEffects> {
1289 self.get_transaction_cache_reader()
1290 .try_notify_read_executed_effects(&[*certificate.digest()])
1291 .await
1292 .map(|mut r| r.pop().expect("must return correct number of effects"))
1293 }
1294
1295 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1296 self.get_object_cache_reader()
1297 .try_check_owned_objects_are_live(owned_object_refs)
1298 }
1299
1300 pub(crate) fn debug_dump_transaction_state(
1305 &self,
1306 tx_digest: &TransactionDigest,
1307 effects: &TransactionEffects,
1308 expected_effects_digest: TransactionEffectsDigest,
1309 inner_temporary_store: &InnerTemporaryStore,
1310 certificate: &VerifiedExecutableTransaction,
1311 debug_dump_config: &StateDebugDumpConfig,
1312 ) -> IotaResult<PathBuf> {
1313 let dump_dir = debug_dump_config
1314 .dump_file_directory
1315 .as_ref()
1316 .cloned()
1317 .unwrap_or(std::env::temp_dir());
1318 let epoch_store = self.load_epoch_store_one_call_per_task();
1319
1320 NodeStateDump::new(
1321 tx_digest,
1322 effects,
1323 expected_effects_digest,
1324 self.get_object_store().as_ref(),
1325 &epoch_store,
1326 inner_temporary_store,
1327 certificate,
1328 )?
1329 .write_to_file(&dump_dir)
1330 .map_err(|e| IotaError::FileIO(e.to_string()))
1331 }
1332
1333 #[instrument(level = "trace", skip_all)]
1334 pub(crate) fn process_certificate(
1335 &self,
1336 tx_guard: CertTxGuard,
1337 certificate: &VerifiedExecutableTransaction,
1338 tx_input_objects: InputObjects,
1339 account_object: Option<ObjectReadResult>,
1340 authenticator_input_objects: Option<InputObjects>,
1341 expected_effects_digest: Option<TransactionEffectsDigest>,
1342 epoch_store: &Arc<AuthorityPerEpochStore>,
1343 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1344 let process_certificate_start_time = tokio::time::Instant::now();
1345 let digest = *certificate.digest();
1346
1347 fail_point_if!("correlated-crash-process-certificate", || {
1348 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1349 iota_simulator::task::kill_current_node(None);
1350 }
1351 });
1352
1353 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1354 let execution_guard = match execution_guard {
1360 Ok(execution_guard) => execution_guard,
1361 Err(err) => {
1362 tx_guard.release();
1363 return Err(err);
1364 }
1365 };
1366 if *execution_guard != epoch_store.epoch() {
1370 tx_guard.release();
1371 info!("The epoch of the execution_guard doesn't match the epoch store");
1372 return Err(IotaError::WrongEpoch {
1373 expected_epoch: epoch_store.epoch(),
1374 actual_epoch: *execution_guard,
1375 });
1376 }
1377
1378 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1384 &execution_guard,
1385 certificate,
1386 tx_input_objects,
1387 account_object,
1388 authenticator_input_objects,
1389 epoch_store,
1390 ) {
1391 Err(e) => {
1392 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1393 tx_guard.release();
1394 return Err(e);
1395 }
1396 Ok(res) => res,
1397 };
1398
1399 if let Some(expected_effects_digest) = expected_effects_digest {
1400 if effects.digest() != expected_effects_digest {
1401 match self.debug_dump_transaction_state(
1403 &digest,
1404 &effects,
1405 expected_effects_digest,
1406 &inner_temporary_store,
1407 certificate,
1408 &self.config.state_debug_dump_config,
1409 ) {
1410 Ok(out_path) => {
1411 info!(
1412 "Dumped node state for transaction {} to {}",
1413 digest,
1414 out_path.as_path().display().to_string()
1415 );
1416 }
1417 Err(e) => {
1418 error!("Error dumping state for transaction {}: {e}", digest);
1419 }
1420 }
1421 error!(
1422 tx_digest = ?digest,
1423 ?expected_effects_digest,
1424 actual_effects = ?effects,
1425 "fork detected!"
1426 );
1427 panic!(
1428 "Transaction {} is expected to have effects digest {}, but got {}!",
1429 digest,
1430 expected_effects_digest,
1431 effects.digest(),
1432 );
1433 }
1434 }
1435
1436 fail_point!("crash");
1437
1438 self.commit_certificate(
1439 certificate,
1440 inner_temporary_store,
1441 &effects,
1442 tx_guard,
1443 execution_guard,
1444 epoch_store,
1445 )?;
1446
1447 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1448 certificate.data().transaction_data().kind()
1449 {
1450 if let Some(err) = &execution_error_opt {
1451 debug_fatal!("Authenticator state update failed: {:?}", err);
1452 }
1453 epoch_store.update_authenticator_state(auth_state);
1454
1455 if cfg!(debug_assertions) {
1458 let authenticator_state = get_authenticator_state(self.get_object_store())
1459 .expect("Read cannot fail")
1460 .expect("Authenticator state must exist");
1461
1462 let mut sys_jwks: Vec<_> = authenticator_state
1463 .active_jwks
1464 .into_iter()
1465 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1466 .collect();
1467 let mut active_jwks: Vec<_> = epoch_store
1468 .signature_verifier
1469 .get_jwks()
1470 .into_iter()
1471 .collect();
1472 sys_jwks.sort();
1473 active_jwks.sort();
1474
1475 assert_eq!(sys_jwks, active_jwks);
1476 }
1477 }
1478
1479 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1480 if elapsed > 0.0 {
1481 self.metrics
1482 .execution_gas_latency_ratio
1483 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1484 };
1485 Ok((effects, execution_error_opt))
1486 }
1487
1488 #[instrument(level = "trace", skip_all)]
1489 fn commit_certificate(
1490 &self,
1491 certificate: &VerifiedExecutableTransaction,
1492 inner_temporary_store: InnerTemporaryStore,
1493 effects: &TransactionEffects,
1494 tx_guard: CertTxGuard,
1495 _execution_guard: ExecutionLockReadGuard<'_>,
1496 epoch_store: &Arc<AuthorityPerEpochStore>,
1497 ) -> IotaResult {
1498 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1499 monitored_scope("Execution::commit_certificate");
1500 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1501
1502 let tx_key = certificate.key();
1503 let tx_digest = certificate.digest();
1504 let input_object_count = inner_temporary_store.input_objects.len();
1505 let shared_object_count = effects.input_shared_objects().len();
1506
1507 let output_keys = inner_temporary_store.get_output_keys(effects);
1508
1509 let _ = self
1511 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1512 .tap_err(|e| {
1513 self.metrics.post_processing_total_failures.inc();
1514 error!(?tx_digest, "tx post processing failed: {e}");
1515 });
1516
1517 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1521
1522 fail_point!("crash");
1524
1525 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1526 certificate.clone().into_unsigned(),
1527 effects.clone(),
1528 inner_temporary_store,
1529 );
1530 self.get_cache_writer()
1531 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1532
1533 if certificate.transaction_data().is_end_of_epoch_tx() {
1534 self.get_object_cache_reader()
1537 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1538 }
1539
1540 tx_guard.commit_tx();
1542
1543 self.transaction_manager
1547 .notify_commit(tx_digest, output_keys, epoch_store);
1548
1549 self.update_metrics(certificate, input_object_count, shared_object_count);
1550
1551 Ok(())
1552 }
1553
1554 fn update_metrics(
1555 &self,
1556 certificate: &VerifiedExecutableTransaction,
1557 input_object_count: usize,
1558 shared_object_count: usize,
1559 ) {
1560 if certificate.has_zklogin_sig() {
1562 self.metrics.zklogin_sig_count.inc();
1563 } else if certificate.has_upgraded_multisig() {
1564 self.metrics.multisig_sig_count.inc();
1565 }
1566
1567 self.metrics.total_effects.inc();
1568 self.metrics.total_certs.inc();
1569
1570 if shared_object_count > 0 {
1571 self.metrics.shared_obj_tx.inc();
1572 }
1573
1574 if certificate.is_sponsored_tx() {
1575 self.metrics.sponsored_tx.inc();
1576 }
1577
1578 self.metrics
1579 .num_input_objs
1580 .observe(input_object_count as f64);
1581 self.metrics
1582 .num_shared_objects
1583 .observe(shared_object_count as f64);
1584 self.metrics.batch_size.observe(
1585 certificate
1586 .data()
1587 .intent_message()
1588 .value
1589 .kind()
1590 .num_commands() as f64,
1591 );
1592 }
1593
1594 #[instrument(level = "trace", skip_all)]
1606 fn prepare_certificate(
1607 &self,
1608 _execution_guard: &ExecutionLockReadGuard<'_>,
1609 certificate: &VerifiedExecutableTransaction,
1610 tx_input_objects: InputObjects,
1611 account_object: Option<ObjectReadResult>,
1612 move_authenticator_input_objects: Option<InputObjects>,
1613 epoch_store: &Arc<AuthorityPerEpochStore>,
1614 ) -> IotaResult<(
1615 InnerTemporaryStore,
1616 TransactionEffects,
1617 Option<ExecutionError>,
1618 )> {
1619 let _scope = monitored_scope("Execution::prepare_certificate");
1620 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1621 let prepare_certificate_start_time = tokio::time::Instant::now();
1622
1623 let protocol_config = epoch_store.protocol_config();
1624
1625 let reference_gas_price = epoch_store.reference_gas_price();
1626
1627 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1628 let epoch_start_timestamp = epoch_store
1629 .epoch_start_config()
1630 .epoch_data()
1631 .epoch_start_timestamp();
1632
1633 let backing_store = self.get_backing_store().as_ref();
1634
1635 let tx_digest = *certificate.digest();
1636
1637 let tx_data = certificate.data().transaction_data();
1640 tx_data.validity_check(protocol_config)?;
1641
1642 let (kind, signer, gas) = tx_data.execution_parts();
1643
1644 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1645 let (inner_temp_store, _, mut effects, execution_error_opt) = if let Some(
1646 move_authenticator,
1647 ) =
1648 certificate.sender_move_authenticator()
1649 {
1650 let (
1657 auth_account_object_id,
1658 auth_account_object_seq_number,
1659 auth_account_object_digest,
1660 ) = move_authenticator.object_to_authenticate_components()?;
1661
1662 let account_object = account_object.expect("Account object must be provided");
1665
1666 let authenticator_function_ref_for_execution = self.check_move_account(
1667 auth_account_object_id,
1668 auth_account_object_seq_number,
1669 auth_account_object_digest,
1670 account_object,
1671 &signer,
1672 )?;
1673
1674 let move_authenticator_input_objects = move_authenticator_input_objects.expect(
1675 "In case of a `MoveAuthenticator` signature, the authenticator input objects must be provided",
1676 );
1677
1678 let authenticator_gas_budget = protocol_config.max_auth_gas();
1683 let (
1684 gas_status,
1685 authenticator_checked_input_objects,
1686 authenticator_and_tx_checked_input_objects,
1687 ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1688 certificate,
1689 tx_input_objects,
1690 move_authenticator_input_objects,
1691 authenticator_gas_budget,
1692 protocol_config,
1693 reference_gas_price,
1694 )?;
1695
1696 let owned_object_refs = authenticator_and_tx_checked_input_objects
1697 .inner()
1698 .filter_owned_objects();
1699 self.check_owned_locks(&owned_object_refs)?;
1700
1701 epoch_store
1702 .executor()
1703 .authenticate_then_execute_transaction_to_effects(
1704 backing_store,
1705 protocol_config,
1706 self.metrics.limits_metrics.clone(),
1707 self.config
1708 .expensive_safety_check_config
1709 .enable_deep_per_tx_iota_conservation_check(),
1710 self.config.certificate_deny_config.certificate_deny_set(),
1711 &epoch_id,
1712 epoch_start_timestamp,
1713 gas_status,
1714 gas,
1715 move_authenticator.to_owned(),
1716 authenticator_function_ref_for_execution,
1717 authenticator_checked_input_objects,
1718 authenticator_and_tx_checked_input_objects,
1719 kind,
1720 signer,
1721 tx_digest,
1722 &mut None,
1723 )
1724 } else {
1725 let (tx_gas_status, tx_checked_input_objects) =
1730 iota_transaction_checks::check_certificate_input(
1731 certificate,
1732 tx_input_objects,
1733 protocol_config,
1734 reference_gas_price,
1735 )?;
1736
1737 let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1738 self.check_owned_locks(&owned_object_refs)?;
1739 epoch_store.executor().execute_transaction_to_effects(
1740 backing_store,
1741 protocol_config,
1742 self.metrics.limits_metrics.clone(),
1743 self.config
1746 .expensive_safety_check_config
1747 .enable_deep_per_tx_iota_conservation_check(),
1748 self.config.certificate_deny_config.certificate_deny_set(),
1749 &epoch_id,
1750 epoch_start_timestamp,
1751 tx_checked_input_objects,
1752 gas,
1753 tx_gas_status,
1754 kind,
1755 signer,
1756 tx_digest,
1757 &mut None,
1758 )
1759 };
1760
1761 fail_point_if!("cp_execution_nondeterminism", || {
1762 #[cfg(msim)]
1763 self.create_fail_state(certificate, epoch_store, &mut effects);
1764 });
1765
1766 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1767 if elapsed > 0.0 {
1768 self.metrics
1769 .prepare_cert_gas_latency_ratio
1770 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1771 }
1772
1773 Ok((inner_temp_store, effects, execution_error_opt.err()))
1774 }
1775
1776 pub fn prepare_certificate_for_benchmark(
1777 &self,
1778 certificate: &VerifiedExecutableTransaction,
1779 input_objects: InputObjects,
1780 epoch_store: &Arc<AuthorityPerEpochStore>,
1781 ) -> IotaResult<(
1782 InnerTemporaryStore,
1783 TransactionEffects,
1784 Option<ExecutionError>,
1785 )> {
1786 let lock = RwLock::new(epoch_store.epoch());
1787 let execution_guard = lock.try_read().unwrap();
1788
1789 self.prepare_certificate(
1790 &execution_guard,
1791 certificate,
1792 input_objects,
1793 None,
1794 None,
1795 epoch_store,
1796 )
1797 }
1798
1799 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1802 #[allow(clippy::type_complexity)]
1803 pub fn dry_exec_transaction(
1804 &self,
1805 transaction: TransactionData,
1806 transaction_digest: TransactionDigest,
1807 ) -> IotaResult<(
1808 DryRunTransactionBlockResponse,
1809 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1810 TransactionEffects,
1811 Option<ObjectID>,
1812 )> {
1813 let epoch_store = self.load_epoch_store_one_call_per_task();
1814 if !self.is_fullnode(&epoch_store) {
1815 return Err(IotaError::UnsupportedFeature {
1816 error: "dry-exec is only supported on fullnodes".to_string(),
1817 });
1818 }
1819
1820 if transaction.kind().is_system_tx() {
1821 return Err(IotaError::UnsupportedFeature {
1822 error: "dry-exec does not support system transactions".to_string(),
1823 });
1824 }
1825
1826 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1827 }
1828
1829 #[allow(clippy::type_complexity)]
1830 pub fn dry_exec_transaction_for_benchmark(
1831 &self,
1832 transaction: TransactionData,
1833 transaction_digest: TransactionDigest,
1834 ) -> IotaResult<(
1835 DryRunTransactionBlockResponse,
1836 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1837 TransactionEffects,
1838 Option<ObjectID>,
1839 )> {
1840 let epoch_store = self.load_epoch_store_one_call_per_task();
1841 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1842 }
1843
1844 #[instrument(level = "trace", skip_all)]
1845 #[allow(clippy::type_complexity)]
1846 fn dry_exec_transaction_impl(
1847 &self,
1848 epoch_store: &AuthorityPerEpochStore,
1849 transaction: TransactionData,
1850 transaction_digest: TransactionDigest,
1851 ) -> IotaResult<(
1852 DryRunTransactionBlockResponse,
1853 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1854 TransactionEffects,
1855 Option<ObjectID>,
1856 )> {
1857 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1859
1860 let input_object_kinds = transaction.input_objects()?;
1861 let receiving_object_refs = transaction.receiving_objects();
1862
1863 iota_transaction_checks::deny::check_transaction_for_signing(
1864 &transaction,
1865 &[],
1866 &input_object_kinds,
1867 &receiving_object_refs,
1868 &self.config.transaction_deny_config,
1869 self.get_backing_package_store().as_ref(),
1870 )?;
1871
1872 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1873 None,
1875 &input_object_kinds,
1876 &receiving_object_refs,
1877 epoch_store.epoch(),
1878 )?;
1879
1880 let mut transaction = transaction;
1882 let mut gas_object_refs = transaction.gas().to_vec();
1883 let reference_gas_price = epoch_store.reference_gas_price();
1884 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1885 let sender = transaction.gas_owner();
1886 let gas_object_id = ObjectID::random();
1887 let gas_object = Object::new_move(
1888 MoveObject::new_gas_coin(
1889 OBJECT_START_VERSION,
1890 gas_object_id,
1891 SIMULATION_GAS_COIN_VALUE,
1892 ),
1893 Owner::AddressOwner(sender),
1894 TransactionDigest::genesis_marker(),
1895 );
1896 let gas_object_ref = gas_object.compute_object_reference();
1897 gas_object_refs = vec![gas_object_ref];
1898 transaction.gas_data_mut().payment = gas_object_refs.clone();
1900 (
1901 iota_transaction_checks::check_transaction_input_with_given_gas(
1902 epoch_store.protocol_config(),
1903 reference_gas_price,
1904 &transaction,
1905 input_objects,
1906 receiving_objects,
1907 gas_object,
1908 &self.metrics.bytecode_verifier_metrics,
1909 &self.config.verifier_signing_config,
1910 )?,
1911 Some(gas_object_id),
1912 )
1913 } else {
1914 let authenticator_gas_budget = 0;
1917
1918 (
1919 iota_transaction_checks::check_transaction_input(
1920 epoch_store.protocol_config(),
1921 reference_gas_price,
1922 &transaction,
1923 input_objects,
1924 &receiving_objects,
1925 &self.metrics.bytecode_verifier_metrics,
1926 &self.config.verifier_signing_config,
1927 authenticator_gas_budget,
1928 )?,
1929 None,
1930 )
1931 };
1932
1933 let protocol_config = epoch_store.protocol_config();
1934 let (kind, signer, _) = transaction.execution_parts();
1935
1936 let silent = true;
1937 let executor = iota_execution::executor(protocol_config, silent, None)
1938 .expect("Creating an executor should not fail here");
1939
1940 let expensive_checks = false;
1941 let (inner_temp_store, _, effects, execution_error) = executor
1942 .execute_transaction_to_effects(
1943 self.get_backing_store().as_ref(),
1944 protocol_config,
1945 self.metrics.limits_metrics.clone(),
1946 expensive_checks,
1947 self.config.certificate_deny_config.certificate_deny_set(),
1948 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1949 epoch_store
1950 .epoch_start_config()
1951 .epoch_data()
1952 .epoch_start_timestamp(),
1953 checked_input_objects,
1954 gas_object_refs,
1955 gas_status,
1956 kind,
1957 signer,
1958 transaction_digest,
1959 &mut None,
1960 );
1961 let tx_digest = *effects.transaction_digest();
1962
1963 let module_cache =
1964 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1965
1966 let mut layout_resolver =
1967 epoch_store
1968 .executor()
1969 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1970 &inner_temp_store,
1971 self.get_backing_package_store(),
1972 )));
1973 let object_changes = Vec::new();
1975
1976 let balance_changes = Vec::new();
1978
1979 let written_with_kind = effects
1980 .created()
1981 .into_iter()
1982 .map(|(oref, _)| (oref, WriteKind::Create))
1983 .chain(
1984 effects
1985 .unwrapped()
1986 .into_iter()
1987 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1988 )
1989 .chain(
1990 effects
1991 .mutated()
1992 .into_iter()
1993 .map(|(oref, _)| (oref, WriteKind::Mutate)),
1994 )
1995 .map(|(oref, kind)| {
1996 let obj = inner_temp_store.written.get(&oref.0).unwrap();
1997 (oref.0, (oref, obj.clone(), kind))
1999 })
2000 .collect();
2001
2002 let execution_error_source = execution_error
2003 .as_ref()
2004 .err()
2005 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2006
2007 Ok((
2008 DryRunTransactionBlockResponse {
2009 suggested_gas_price: self
2011 .congestion_tracker
2012 .get_prediction_suggested_gas_price(&transaction),
2013 input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
2014 .map_err(|e| IotaError::TransactionSerialization {
2015 error: format!(
2016 "Failed to convert transaction to IotaTransactionBlockData: {e}",
2017 ),
2018 })?, effects: effects.clone().try_into()?,
2020 events: IotaTransactionBlockEvents::try_from(
2021 inner_temp_store.events.clone(),
2022 tx_digest,
2023 None,
2024 layout_resolver.as_mut(),
2025 )?,
2026 object_changes,
2027 balance_changes,
2028 execution_error_source,
2029 },
2030 written_with_kind,
2031 effects,
2032 mock_gas,
2033 ))
2034 }
2035
2036 pub fn simulate_transaction(
2037 &self,
2038 mut transaction: TransactionData,
2039 checks: VmChecks,
2040 ) -> IotaResult<SimulateTransactionResult> {
2041 if transaction.kind().is_system_tx() {
2042 return Err(IotaError::UnsupportedFeature {
2043 error: "simulate does not support system transactions".to_string(),
2044 });
2045 }
2046
2047 let epoch_store = self.load_epoch_store_one_call_per_task();
2048 if !self.is_fullnode(&epoch_store) {
2049 return Err(IotaError::UnsupportedFeature {
2050 error: "simulate is only supported on fullnodes".to_string(),
2051 });
2052 }
2053
2054 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2058
2059 let input_object_kinds = transaction.input_objects()?;
2060 let receiving_object_refs = transaction.receiving_objects();
2061
2062 iota_transaction_checks::deny::check_transaction_for_signing(
2065 &transaction,
2066 &[],
2067 &input_object_kinds,
2068 &receiving_object_refs,
2069 &self.config.transaction_deny_config,
2070 self.get_backing_package_store().as_ref(),
2071 )?;
2072
2073 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2075 None,
2077 &input_object_kinds,
2078 &receiving_object_refs,
2079 epoch_store.epoch(),
2080 )?;
2081
2082 let mock_gas_id = if transaction.gas().is_empty() {
2084 let mock_gas_object = Object::new_move(
2085 MoveObject::new_gas_coin(
2086 OBJECT_START_VERSION,
2087 ObjectID::MAX,
2088 SIMULATION_GAS_COIN_VALUE,
2089 ),
2090 Owner::AddressOwner(transaction.gas_data().owner),
2091 TransactionDigest::genesis_marker(),
2092 );
2093 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2094 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2095 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2096 Some(mock_gas_object.id())
2097 } else {
2098 None
2099 };
2100
2101 let protocol_config = epoch_store.protocol_config();
2102
2103 let authenticator_gas_budget = 0;
2106
2107 let (gas_status, checked_input_objects) = if checks.enabled() {
2110 iota_transaction_checks::check_transaction_input(
2111 protocol_config,
2112 epoch_store.reference_gas_price(),
2113 &transaction,
2114 input_objects,
2115 &receiving_objects,
2116 &self.metrics.bytecode_verifier_metrics,
2117 &self.config.verifier_signing_config,
2118 authenticator_gas_budget,
2119 )?
2120 } else {
2121 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2122 protocol_config,
2123 transaction.kind(),
2124 input_objects,
2125 receiving_objects,
2126 )?;
2127 let gas_status = IotaGasStatus::new(
2128 transaction.gas_budget(),
2129 transaction.gas_price(),
2130 epoch_store.reference_gas_price(),
2131 protocol_config,
2132 )?;
2133
2134 (gas_status, checked_input_objects)
2135 };
2136
2137 let executor = iota_execution::executor(
2139 protocol_config,
2140 true, None,
2142 )
2143 .expect("Creating an executor should not fail here");
2144
2145 let (kind, signer, gas_coins) = transaction.execution_parts();
2147 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2148 self.get_backing_store().as_ref(),
2149 protocol_config,
2150 self.metrics.limits_metrics.clone(),
2151 false, self.config.certificate_deny_config.certificate_deny_set(),
2153 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2154 epoch_store
2155 .epoch_start_config()
2156 .epoch_data()
2157 .epoch_start_timestamp(),
2158 checked_input_objects,
2159 gas_coins,
2160 gas_status,
2161 kind,
2162 signer,
2163 transaction.digest(),
2164 checks.disabled(),
2165 );
2166
2167 Ok(SimulateTransactionResult {
2170 input_objects: inner_temp_store.input_objects,
2171 output_objects: inner_temp_store.written,
2172 events: effects.events_digest().map(|_| inner_temp_store.events),
2173 effects,
2174 execution_result,
2175 mock_gas_id,
2176 })
2177 }
2178
2179 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2184 pub async fn dev_inspect_transaction_block(
2185 &self,
2186 sender: IotaAddress,
2187 transaction_kind: TransactionKind,
2188 gas_price: Option<u64>,
2189 gas_budget: Option<u64>,
2190 gas_sponsor: Option<IotaAddress>,
2191 gas_objects: Option<Vec<ObjectRef>>,
2192 show_raw_txn_data_and_effects: Option<bool>,
2193 skip_checks: Option<bool>,
2194 ) -> IotaResult<DevInspectResults> {
2195 let epoch_store = self.load_epoch_store_one_call_per_task();
2196
2197 if !self.is_fullnode(&epoch_store) {
2198 return Err(IotaError::UnsupportedFeature {
2199 error: "dev-inspect is only supported on fullnodes".to_string(),
2200 });
2201 }
2202
2203 if transaction_kind.is_system_tx() {
2204 return Err(IotaError::UnsupportedFeature {
2205 error: "system transactions are not supported".to_string(),
2206 });
2207 }
2208
2209 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2210 let skip_checks = skip_checks.unwrap_or(true);
2211 let reference_gas_price = epoch_store.reference_gas_price();
2212 let protocol_config = epoch_store.protocol_config();
2213 let max_tx_gas = protocol_config.max_tx_gas();
2214
2215 let price = gas_price.unwrap_or(reference_gas_price);
2216 let budget = gas_budget.unwrap_or(max_tx_gas);
2217 let owner = gas_sponsor.unwrap_or(sender);
2218 let payment = gas_objects.unwrap_or_default();
2221 let transaction = TransactionData::V1(TransactionDataV1 {
2222 kind: transaction_kind.clone(),
2223 sender,
2224 gas_data: GasData {
2225 payment,
2226 owner,
2227 price,
2228 budget,
2229 },
2230 expiration: TransactionExpiration::None,
2231 });
2232
2233 let raw_txn_data = if show_raw_txn_data_and_effects {
2234 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2235 error: "Failed to serialize transaction during dev inspect".to_string(),
2236 })?
2237 } else {
2238 vec![]
2239 };
2240
2241 transaction.validity_check_no_gas_check(protocol_config)?;
2242
2243 let input_object_kinds = transaction.input_objects()?;
2244 let receiving_object_refs = transaction.receiving_objects();
2245
2246 iota_transaction_checks::deny::check_transaction_for_signing(
2247 &transaction,
2248 &[],
2249 &input_object_kinds,
2250 &receiving_object_refs,
2251 &self.config.transaction_deny_config,
2252 self.get_backing_package_store().as_ref(),
2253 )?;
2254
2255 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2256 None,
2258 &input_object_kinds,
2259 &receiving_object_refs,
2260 epoch_store.epoch(),
2261 )?;
2262
2263 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2265 SIMULATION_GAS_COIN_VALUE,
2266 transaction.gas_owner(),
2267 );
2268
2269 let gas_objects = if transaction.gas().is_empty() {
2270 let gas_object_ref = dummy_gas_object.compute_object_reference();
2271 vec![gas_object_ref]
2272 } else {
2273 transaction.gas().to_vec()
2274 };
2275
2276 let (gas_status, checked_input_objects) = if skip_checks {
2277 if transaction.gas().is_empty() {
2282 input_objects.push(ObjectReadResult::new(
2283 InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2284 dummy_gas_object.into(),
2285 ));
2286 }
2287 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2288 protocol_config,
2289 &transaction_kind,
2290 input_objects,
2291 receiving_objects,
2292 )?;
2293 let gas_status = IotaGasStatus::new(
2294 max_tx_gas,
2295 transaction.gas_price(),
2296 reference_gas_price,
2297 protocol_config,
2298 )?;
2299
2300 (gas_status, checked_input_objects)
2301 } else {
2302 if transaction.gas().is_empty() {
2306 iota_transaction_checks::check_transaction_input_with_given_gas(
2307 epoch_store.protocol_config(),
2308 reference_gas_price,
2309 &transaction,
2310 input_objects,
2311 receiving_objects,
2312 dummy_gas_object,
2313 &self.metrics.bytecode_verifier_metrics,
2314 &self.config.verifier_signing_config,
2315 )?
2316 } else {
2317 let authenticator_gas_budget = 0;
2320
2321 iota_transaction_checks::check_transaction_input(
2322 epoch_store.protocol_config(),
2323 reference_gas_price,
2324 &transaction,
2325 input_objects,
2326 &receiving_objects,
2327 &self.metrics.bytecode_verifier_metrics,
2328 &self.config.verifier_signing_config,
2329 authenticator_gas_budget,
2330 )?
2331 }
2332 };
2333
2334 let executor = iota_execution::executor(protocol_config, true, None)
2335 .expect("Creating an executor should not fail here");
2336 let intent_msg = IntentMessage::new(
2337 Intent {
2338 version: IntentVersion::V0,
2339 scope: IntentScope::TransactionData,
2340 app_id: IntentAppId::Iota,
2341 },
2342 transaction,
2343 );
2344 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2345 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2346 self.get_backing_store().as_ref(),
2347 protocol_config,
2348 self.metrics.limits_metrics.clone(),
2349 false,
2351 self.config.certificate_deny_config.certificate_deny_set(),
2352 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2353 epoch_store
2354 .epoch_start_config()
2355 .epoch_data()
2356 .epoch_start_timestamp(),
2357 checked_input_objects,
2358 gas_objects,
2359 gas_status,
2360 transaction_kind,
2361 sender,
2362 transaction_digest,
2363 skip_checks,
2364 );
2365
2366 let raw_effects = if show_raw_txn_data_and_effects {
2367 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2368 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2369 })?
2370 } else {
2371 vec![]
2372 };
2373
2374 let mut layout_resolver =
2375 epoch_store
2376 .executor()
2377 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2378 &inner_temp_store,
2379 self.get_backing_package_store(),
2380 )));
2381
2382 DevInspectResults::new(
2383 effects,
2384 inner_temp_store.events.clone(),
2385 execution_result,
2386 raw_txn_data,
2387 raw_effects,
2388 layout_resolver.as_mut(),
2389 )
2390 }
2391
2392 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2394 let epoch_store = self.epoch_store_for_testing();
2395 Ok(epoch_store.reference_gas_price())
2396 }
2397
2398 #[instrument(level = "trace", skip_all)]
2399 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2400 self.get_transaction_cache_reader()
2401 .try_is_tx_already_executed(digest)
2402 }
2403
2404 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2406 self.try_is_tx_already_executed(digest)
2407 .expect("storage access failed")
2408 }
2409
2410 #[instrument(level = "debug", skip_all, err)]
2412 fn index_tx(
2413 &self,
2414 indexes: &IndexStore,
2415 digest: &TransactionDigest,
2416 cert: &VerifiedExecutableTransaction,
2418 effects: &TransactionEffects,
2419 events: &TransactionEvents,
2420 timestamp_ms: u64,
2421 tx_coins: Option<TxCoins>,
2422 written: &WrittenObjects,
2423 inner_temporary_store: &InnerTemporaryStore,
2424 ) -> IotaResult<u64> {
2425 let changes = self
2426 .process_object_index(effects, written, inner_temporary_store)
2427 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2428
2429 indexes.index_tx(
2430 cert.data().intent_message().value.sender(),
2431 cert.data()
2432 .intent_message()
2433 .value
2434 .input_objects()?
2435 .iter()
2436 .map(|o| o.object_id()),
2437 effects
2438 .all_changed_objects()
2439 .into_iter()
2440 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2441 cert.data()
2442 .intent_message()
2443 .value
2444 .move_calls()
2445 .into_iter()
2446 .map(|(package, module, function)| {
2447 (*package, module.to_owned(), function.to_owned())
2448 }),
2449 events,
2450 changes,
2451 digest,
2452 timestamp_ms,
2453 tx_coins,
2454 )
2455 }
2456
2457 #[cfg(msim)]
2458 fn create_fail_state(
2459 &self,
2460 certificate: &VerifiedExecutableTransaction,
2461 epoch_store: &Arc<AuthorityPerEpochStore>,
2462 effects: &mut TransactionEffects,
2463 ) {
2464 use std::cell::RefCell;
2465 thread_local! {
2466 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2467 }
2468 if !certificate.data().intent_message().value.is_system_tx() {
2469 let committee = epoch_store.committee();
2470 let cur_stake = (**committee).weight(&self.name);
2471 if cur_stake > 0 {
2472 FAIL_STATE.with_borrow_mut(|fail_state| {
2473 if fail_state.0 < committee.validity_threshold() {
2475 fail_state.0 += cur_stake;
2476 fail_state.1.insert(self.name);
2477 }
2478
2479 if fail_state.1.contains(&self.name) {
2480 info!("cp_exec failing tx");
2481 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2482 }
2483 });
2484 }
2485 }
2486 }
2487
2488 fn process_object_index(
2489 &self,
2490 effects: &TransactionEffects,
2491 written: &WrittenObjects,
2492 inner_temporary_store: &InnerTemporaryStore,
2493 ) -> IotaResult<ObjectIndexChanges> {
2494 let epoch_store = self.load_epoch_store_one_call_per_task();
2495 let mut layout_resolver =
2496 epoch_store
2497 .executor()
2498 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2499 inner_temporary_store,
2500 self.get_backing_package_store(),
2501 )));
2502
2503 let modified_at_version = effects
2504 .modified_at_versions()
2505 .into_iter()
2506 .collect::<HashMap<_, _>>();
2507
2508 let tx_digest = effects.transaction_digest();
2509 let mut deleted_owners = vec![];
2510 let mut deleted_dynamic_fields = vec![];
2511 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2512 let old_version = modified_at_version.get(&id).unwrap();
2513 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2516 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2517 ) {
2518 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2519 Owner::ObjectOwner(object_id) => {
2520 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2521 }
2522 _ => {}
2523 }
2524 }
2525
2526 let mut new_owners = vec![];
2527 let mut new_dynamic_fields = vec![];
2528
2529 for (oref, owner, kind) in effects.all_changed_objects() {
2530 let id = &oref.0;
2531 if let WriteKind::Mutate = kind {
2534 let Some(old_version) = modified_at_version.get(id) else {
2535 panic!(
2536 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2537 );
2538 };
2539 let Some(old_object) = self
2542 .get_object_store()
2543 .try_get_object_by_key(id, *old_version)?
2544 else {
2545 panic!(
2546 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2547 );
2548 };
2549 if old_object.owner != owner {
2550 match old_object.owner {
2551 Owner::AddressOwner(addr) => {
2552 deleted_owners.push((addr, *id));
2553 }
2554 Owner::ObjectOwner(object_id) => {
2555 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2556 }
2557 _ => {}
2558 }
2559 }
2560 }
2561
2562 match owner {
2563 Owner::AddressOwner(addr) => {
2564 let new_object = written.get(id).unwrap_or_else(
2567 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2568 );
2569 assert_eq!(
2570 new_object.version(),
2571 oref.1,
2572 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2573 tx_digest,
2574 id,
2575 new_object.version(),
2576 oref.1
2577 );
2578
2579 let type_ = new_object
2580 .type_()
2581 .map(|type_| ObjectType::Struct(type_.clone()))
2582 .unwrap_or(ObjectType::Package);
2583
2584 new_owners.push((
2585 (addr, *id),
2586 ObjectInfo {
2587 object_id: *id,
2588 version: oref.1,
2589 digest: oref.2,
2590 type_,
2591 owner,
2592 previous_transaction: *effects.transaction_digest(),
2593 },
2594 ));
2595 }
2596 Owner::ObjectOwner(owner) => {
2597 let new_object = written.get(id).unwrap_or_else(
2598 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2599 );
2600 assert_eq!(
2601 new_object.version(),
2602 oref.1,
2603 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2604 tx_digest,
2605 id,
2606 new_object.version(),
2607 oref.1
2608 );
2609
2610 let Some(df_info) = self
2611 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2612 .unwrap_or_else(|e| {
2613 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2614 None
2615 }
2616 )
2617 else {
2618 continue;
2620 };
2621 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2622 }
2623 _ => {}
2624 }
2625 }
2626
2627 Ok(ObjectIndexChanges {
2628 deleted_owners,
2629 deleted_dynamic_fields,
2630 new_owners,
2631 new_dynamic_fields,
2632 })
2633 }
2634
2635 fn try_create_dynamic_field_info(
2636 &self,
2637 o: &Object,
2638 written: &WrittenObjects,
2639 resolver: &mut dyn LayoutResolver,
2640 get_latest_object_version: bool,
2641 ) -> IotaResult<Option<DynamicFieldInfo>> {
2642 let Some(move_object) = o.data.try_as_move().cloned() else {
2644 return Ok(None);
2645 };
2646
2647 if !move_object.type_().is_dynamic_field() {
2649 return Ok(None);
2650 }
2651
2652 let layout = resolver
2653 .get_annotated_layout(&move_object.type_().clone().into())?
2654 .into_layout();
2655
2656 let field =
2657 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2658 IotaError::ObjectDeserialization {
2659 error: e.to_string(),
2660 }
2661 })?;
2662
2663 let type_ = field.kind;
2664 let name_type: TypeTag = field.name_layout.into();
2665 let bcs_name = field.name_bytes.to_owned();
2666
2667 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2668 .map_err(|e| {
2669 warn!("{e}");
2670 IotaError::ObjectDeserialization {
2671 error: e.to_string(),
2672 }
2673 })?;
2674
2675 let name = DynamicFieldName {
2676 type_: name_type,
2677 value: IotaMoveValue::from(name_value).to_json_value(),
2678 };
2679
2680 let value_metadata = field.value_metadata().map_err(|e| {
2681 warn!("{e}");
2682 IotaError::ObjectDeserialization {
2683 error: e.to_string(),
2684 }
2685 })?;
2686
2687 Ok(Some(match value_metadata {
2688 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2689 name,
2690 bcs_name,
2691 type_,
2692 object_type: object_type.to_canonical_string(true),
2693 object_id: o.id(),
2694 version: o.version(),
2695 digest: o.digest(),
2696 },
2697
2698 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2699 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2704 (
2705 object.version(),
2706 object.digest(),
2707 object.data.type_().unwrap().clone(),
2708 )
2709 } else {
2710 let object = if get_latest_object_version {
2712 self.get_object_store()
2719 .try_get_object(&object_id)?
2720 .ok_or_else(|| UserInputError::ObjectNotFound {
2721 object_id,
2722 version: Some(o.version()),
2723 })?
2724 } else {
2725 self.get_object_store()
2727 .try_get_object_by_key(&object_id, o.version())?
2728 .ok_or_else(|| UserInputError::ObjectNotFound {
2729 object_id,
2730 version: Some(o.version()),
2731 })?
2732 };
2733
2734 (
2735 object.version(),
2736 object.digest(),
2737 object.data.type_().unwrap().clone(),
2738 )
2739 };
2740
2741 DynamicFieldInfo {
2742 name,
2743 bcs_name,
2744 type_,
2745 object_type: object_type.to_string(),
2746 object_id,
2747 version,
2748 digest,
2749 }
2750 }
2751 }))
2752 }
2753
2754 #[instrument(level = "trace", skip_all, err)]
2755 fn post_process_one_tx(
2756 &self,
2757 certificate: &VerifiedExecutableTransaction,
2758 effects: &TransactionEffects,
2759 inner_temporary_store: &InnerTemporaryStore,
2760 epoch_store: &Arc<AuthorityPerEpochStore>,
2761 ) -> IotaResult {
2762 if self.indexes.is_none() {
2763 return Ok(());
2764 }
2765
2766 let tx_digest = certificate.digest();
2767 let timestamp_ms = Self::unixtime_now_ms();
2768 let events = &inner_temporary_store.events;
2769 let written = &inner_temporary_store.written;
2770 let tx_coins =
2771 self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2772
2773 if let Some(indexes) = &self.indexes {
2775 let _ = self
2776 .index_tx(
2777 indexes.as_ref(),
2778 tx_digest,
2779 certificate,
2780 effects,
2781 events,
2782 timestamp_ms,
2783 tx_coins,
2784 written,
2785 inner_temporary_store,
2786 )
2787 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2788 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2789 .expect("Indexing tx should not fail");
2790
2791 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2792 let events = self.make_transaction_block_events(
2793 events.clone(),
2794 *tx_digest,
2795 timestamp_ms,
2796 epoch_store,
2797 inner_temporary_store,
2798 )?;
2799 self.subscription_handler
2801 .process_tx(certificate.data().transaction_data(), &effects, &events)
2802 .tap_ok(|_| {
2803 self.metrics
2804 .post_processing_total_tx_had_event_processed
2805 .inc()
2806 })
2807 .tap_err(|e| {
2808 warn!(
2809 ?tx_digest,
2810 "Post processing - Couldn't process events for tx: {}", e
2811 )
2812 })?;
2813
2814 self.metrics
2815 .post_processing_total_events_emitted
2816 .inc_by(events.data.len() as u64);
2817 };
2818 Ok(())
2819 }
2820
2821 fn make_transaction_block_events(
2822 &self,
2823 transaction_events: TransactionEvents,
2824 digest: TransactionDigest,
2825 timestamp_ms: u64,
2826 epoch_store: &Arc<AuthorityPerEpochStore>,
2827 inner_temporary_store: &InnerTemporaryStore,
2828 ) -> IotaResult<IotaTransactionBlockEvents> {
2829 let mut layout_resolver =
2830 epoch_store
2831 .executor()
2832 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2833 inner_temporary_store,
2834 self.get_backing_package_store(),
2835 )));
2836 IotaTransactionBlockEvents::try_from(
2837 transaction_events,
2838 digest,
2839 Some(timestamp_ms),
2840 layout_resolver.as_mut(),
2841 )
2842 }
2843
2844 pub fn unixtime_now_ms() -> u64 {
2845 let now = SystemTime::now()
2846 .duration_since(UNIX_EPOCH)
2847 .expect("Time went backwards")
2848 .as_millis();
2849 u64::try_from(now).expect("Travelling in time machine")
2850 }
2851
2852 #[instrument(level = "trace", skip_all)]
2853 pub async fn handle_transaction_info_request(
2854 &self,
2855 request: TransactionInfoRequest,
2856 ) -> IotaResult<TransactionInfoResponse> {
2857 let epoch_store = self.load_epoch_store_one_call_per_task();
2858 let (transaction, status) = self
2859 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2860 .ok_or(IotaError::TransactionNotFound {
2861 digest: request.transaction_digest,
2862 })?;
2863 Ok(TransactionInfoResponse {
2864 transaction,
2865 status,
2866 })
2867 }
2868
2869 #[instrument(level = "trace", skip_all)]
2870 pub async fn handle_object_info_request(
2871 &self,
2872 request: ObjectInfoRequest,
2873 ) -> IotaResult<ObjectInfoResponse> {
2874 let epoch_store = self.load_epoch_store_one_call_per_task();
2875
2876 let requested_object_seq = match request.request_kind {
2877 ObjectInfoRequestKind::LatestObjectInfo => {
2878 let (_, seq, _) = self
2879 .try_get_object_or_tombstone(request.object_id)
2880 .await?
2881 .ok_or_else(|| {
2882 IotaError::from(UserInputError::ObjectNotFound {
2883 object_id: request.object_id,
2884 version: None,
2885 })
2886 })?;
2887 seq
2888 }
2889 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2890 };
2891
2892 let object = self
2893 .get_object_store()
2894 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2895 .ok_or_else(|| {
2896 IotaError::from(UserInputError::ObjectNotFound {
2897 object_id: request.object_id,
2898 version: Some(requested_object_seq),
2899 })
2900 })?;
2901
2902 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2903 (request.generate_layout, object.data.try_as_move())
2904 {
2905 Some(into_struct_layout(
2906 epoch_store
2907 .executor()
2908 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2909 .get_annotated_layout(&move_obj.type_().clone().into())?,
2910 )?)
2911 } else {
2912 None
2913 };
2914
2915 let lock = if !object.is_address_owned() {
2916 None
2918 } else {
2919 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2920 .await?
2921 .map(|s| s.into_inner())
2922 };
2923
2924 Ok(ObjectInfoResponse {
2925 object,
2926 layout,
2927 lock_for_debugging: lock,
2928 })
2929 }
2930
2931 #[instrument(level = "trace", skip_all)]
2932 pub fn handle_checkpoint_request(
2933 &self,
2934 request: &CheckpointRequest,
2935 ) -> IotaResult<CheckpointResponse> {
2936 let summary = if request.certified {
2937 let summary = match request.sequence_number {
2938 Some(seq) => self
2939 .checkpoint_store
2940 .get_checkpoint_by_sequence_number(seq)?,
2941 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2942 }
2943 .map(|v| v.into_inner());
2944 summary.map(CheckpointSummaryResponse::Certified)
2945 } else {
2946 let summary = match request.sequence_number {
2947 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2948 None => self
2949 .checkpoint_store
2950 .get_latest_locally_computed_checkpoint()?,
2951 };
2952 summary.map(CheckpointSummaryResponse::Pending)
2953 };
2954 let contents = match &summary {
2955 Some(s) => self
2956 .checkpoint_store
2957 .get_checkpoint_contents(&s.content_digest())?,
2958 None => None,
2959 };
2960 Ok(CheckpointResponse {
2961 checkpoint: summary,
2962 contents,
2963 })
2964 }
2965
2966 fn check_protocol_version(
2967 supported_protocol_versions: SupportedProtocolVersions,
2968 current_version: ProtocolVersion,
2969 ) {
2970 info!("current protocol version is now {:?}", current_version);
2971 info!("supported versions are: {:?}", supported_protocol_versions);
2972 if !supported_protocol_versions.is_version_supported(current_version) {
2973 let msg = format!(
2974 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2975 );
2976
2977 error!("{}", msg);
2978 eprintln!("{msg}");
2979
2980 #[cfg(not(msim))]
2981 std::process::exit(1);
2982
2983 #[cfg(msim)]
2984 iota_simulator::task::shutdown_current_node();
2985 }
2986 }
2987
2988 #[expect(clippy::disallowed_methods)] pub async fn new(
2990 name: AuthorityName,
2991 secret: StableSyncAuthoritySigner,
2992 supported_protocol_versions: SupportedProtocolVersions,
2993 store: Arc<AuthorityStore>,
2994 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2995 epoch_store: Arc<AuthorityPerEpochStore>,
2996 committee_store: Arc<CommitteeStore>,
2997 indexes: Option<Arc<IndexStore>>,
2998 rest_index: Option<Arc<RestIndexStore>>,
2999 checkpoint_store: Arc<CheckpointStore>,
3000 prometheus_registry: &Registry,
3001 genesis_objects: &[Object],
3002 db_checkpoint_config: &DBCheckpointConfig,
3003 config: NodeConfig,
3004 archive_readers: ArchiveReaderBalancer,
3005 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3006 chain_identifier: ChainIdentifier,
3007 pruner_db: Option<Arc<AuthorityPrunerTables>>,
3008 ) -> Arc<Self> {
3009 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3010
3011 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3012 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3013 let transaction_manager = Arc::new(TransactionManager::new(
3014 execution_cache_trait_pointers.object_cache_reader.clone(),
3015 execution_cache_trait_pointers
3016 .transaction_cache_reader
3017 .clone(),
3018 &epoch_store,
3019 tx_ready_certificates,
3020 metrics.clone(),
3021 ));
3022 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3023
3024 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3025 epoch_store.get_parent_path(),
3026 &config.authority_store_pruning_config,
3027 );
3028 let _pruner = AuthorityStorePruner::new(
3029 store.perpetual_tables.clone(),
3030 checkpoint_store.clone(),
3031 rest_index.clone(),
3032 indexes.clone(),
3033 config.authority_store_pruning_config.clone(),
3034 epoch_store.committee().authority_exists(&name),
3035 epoch_store.epoch_start_state().epoch_duration_ms(),
3036 prometheus_registry,
3037 archive_readers,
3038 pruner_db,
3039 );
3040 let input_loader =
3041 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3042 let epoch = epoch_store.epoch();
3043 let rgp = epoch_store.reference_gas_price();
3044 let state = Arc::new(AuthorityState {
3045 name,
3046 secret,
3047 execution_lock: RwLock::new(epoch),
3048 epoch_store: ArcSwap::new(epoch_store.clone()),
3049 input_loader,
3050 execution_cache_trait_pointers,
3051 indexes,
3052 rest_index,
3053 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3054 checkpoint_store,
3055 committee_store,
3056 transaction_manager,
3057 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3058 metrics,
3059 _pruner,
3060 _authority_per_epoch_pruner,
3061 db_checkpoint_config: db_checkpoint_config.clone(),
3062 config,
3063 overload_info: AuthorityOverloadInfo::default(),
3064 validator_tx_finalizer,
3065 chain_identifier,
3066 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3067 });
3068
3069 let authority_state = Arc::downgrade(&state);
3071 spawn_monitored_task!(execution_process(
3072 authority_state,
3073 rx_ready_certificates,
3074 rx_execution_shutdown,
3075 ));
3076
3077 state
3079 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3080 .expect("Error indexing genesis objects.");
3081
3082 state
3083 }
3084
3085 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3087 &self.execution_cache_trait_pointers.object_cache_reader
3088 }
3089
3090 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3091 &self.execution_cache_trait_pointers.transaction_cache_reader
3092 }
3093
3094 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3095 &self.execution_cache_trait_pointers.cache_writer
3096 }
3097
3098 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3099 &self.execution_cache_trait_pointers.backing_store
3100 }
3101
3102 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3103 &self.execution_cache_trait_pointers.backing_package_store
3104 }
3105
3106 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3107 &self.execution_cache_trait_pointers.object_store
3108 }
3109
3110 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3111 &self.execution_cache_trait_pointers.reconfig_api
3112 }
3113
3114 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3115 &self.execution_cache_trait_pointers.accumulator_store
3116 }
3117
3118 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3119 &self.execution_cache_trait_pointers.checkpoint_cache
3120 }
3121
3122 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3123 &self.execution_cache_trait_pointers.state_sync_store
3124 }
3125
3126 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3127 &self.execution_cache_trait_pointers.cache_commit
3128 }
3129
3130 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3131 self.execution_cache_trait_pointers
3132 .testing_api
3133 .database_for_testing()
3134 }
3135
3136 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3137 &self,
3138 config: NodeConfig,
3139 metrics: Arc<AuthorityStorePruningMetrics>,
3140 ) -> anyhow::Result<()> {
3141 let archive_readers =
3142 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3143 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3144 &self.database_for_testing().perpetual_tables,
3145 &self.checkpoint_store,
3146 self.rest_index.as_deref(),
3147 None,
3148 config.authority_store_pruning_config,
3149 metrics,
3150 archive_readers,
3151 EPOCH_DURATION_MS_FOR_TESTING,
3152 )
3153 .await
3154 }
3155
3156 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3157 &self.transaction_manager
3158 }
3159
3160 pub fn enqueue_transactions_for_execution(
3163 &self,
3164 txns: Vec<VerifiedExecutableTransaction>,
3165 epoch_store: &Arc<AuthorityPerEpochStore>,
3166 ) {
3167 self.transaction_manager.enqueue(txns, epoch_store)
3168 }
3169
3170 pub fn enqueue_certificates_for_execution(
3172 &self,
3173 certs: Vec<VerifiedCertificate>,
3174 epoch_store: &Arc<AuthorityPerEpochStore>,
3175 ) {
3176 self.transaction_manager
3177 .enqueue_certificates(certs, epoch_store)
3178 }
3179
3180 pub fn enqueue_with_expected_effects_digest(
3181 &self,
3182 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3183 epoch_store: &AuthorityPerEpochStore,
3184 ) {
3185 self.transaction_manager
3186 .enqueue_with_expected_effects_digest(certs, epoch_store)
3187 }
3188
3189 fn create_owner_index_if_empty(
3190 &self,
3191 genesis_objects: &[Object],
3192 epoch_store: &Arc<AuthorityPerEpochStore>,
3193 ) -> IotaResult {
3194 let Some(index_store) = &self.indexes else {
3195 return Ok(());
3196 };
3197 if !index_store.is_empty() {
3198 return Ok(());
3199 }
3200
3201 let mut new_owners = vec![];
3202 let mut new_dynamic_fields = vec![];
3203 let mut layout_resolver = epoch_store
3204 .executor()
3205 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3206 for o in genesis_objects.iter() {
3207 match o.owner {
3208 Owner::AddressOwner(addr) => new_owners.push((
3209 (addr, o.id()),
3210 ObjectInfo::new(&o.compute_object_reference(), o),
3211 )),
3212 Owner::ObjectOwner(object_id) => {
3213 let id = o.id();
3214 let Some(info) = self.try_create_dynamic_field_info(
3215 o,
3216 &BTreeMap::new(),
3217 layout_resolver.as_mut(),
3218 true,
3219 )?
3220 else {
3221 continue;
3222 };
3223 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3224 }
3225 _ => {}
3226 }
3227 }
3228
3229 index_store.insert_genesis_objects(ObjectIndexChanges {
3230 deleted_owners: vec![],
3231 deleted_dynamic_fields: vec![],
3232 new_owners,
3233 new_dynamic_fields,
3234 })
3235 }
3236
3237 pub fn execution_lock_for_executable_transaction(
3241 &self,
3242 transaction: &VerifiedExecutableTransaction,
3243 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3244 let lock = self
3245 .execution_lock
3246 .try_read()
3247 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3248 if *lock == transaction.auth_sig().epoch() {
3249 Ok(lock)
3250 } else {
3251 Err(IotaError::WrongEpoch {
3252 expected_epoch: *lock,
3253 actual_epoch: transaction.auth_sig().epoch(),
3254 })
3255 }
3256 }
3257
3258 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3264 self.execution_lock
3265 .try_read()
3266 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3267 }
3268
3269 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3270 self.execution_lock.write().await
3271 }
3272
3273 #[instrument(level = "error", skip_all)]
3274 pub async fn reconfigure(
3275 &self,
3276 cur_epoch_store: &AuthorityPerEpochStore,
3277 supported_protocol_versions: SupportedProtocolVersions,
3278 new_committee: Committee,
3279 epoch_start_configuration: EpochStartConfiguration,
3280 accumulator: Arc<StateAccumulator>,
3281 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3282 epoch_supply_change: i64,
3283 epoch_last_checkpoint: CheckpointSequenceNumber,
3284 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3285 Self::check_protocol_version(
3286 supported_protocol_versions,
3287 epoch_start_configuration
3288 .epoch_start_state()
3289 .protocol_version(),
3290 );
3291 self.metrics.reset_on_reconfigure();
3292 self.committee_store.insert_new_committee(&new_committee)?;
3293
3294 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3296
3297 cur_epoch_store.epoch_terminated().await;
3299
3300 let highest_locally_built_checkpoint_seq = self
3301 .checkpoint_store
3302 .get_latest_locally_computed_checkpoint()?
3303 .map(|c| *c.sequence_number())
3304 .unwrap_or(0);
3305
3306 assert!(
3307 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3308 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3309 );
3310 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3311 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3315 if num_shared_version_assignments > 1 {
3318 debug_fatal!(
3320 "all shared_version_assignments should have been removed \
3321 (num_shared_version_assignments: {num_shared_version_assignments})"
3322 );
3323 }
3324 }
3325
3326 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3332 .await?;
3333 self.get_reconfig_api()
3334 .clear_state_end_of_epoch(&execution_lock);
3335 self.check_system_consistency(
3336 cur_epoch_store,
3337 accumulator,
3338 expensive_safety_check_config,
3339 epoch_supply_change,
3340 )?;
3341 self.get_reconfig_api()
3342 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3343 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3344 if self
3345 .db_checkpoint_config
3346 .perform_db_checkpoints_at_epoch_end
3347 {
3348 let checkpoint_indexes = self
3349 .db_checkpoint_config
3350 .perform_index_db_checkpoints_at_epoch_end
3351 .unwrap_or(false);
3352 let current_epoch = cur_epoch_store.epoch();
3353 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3354 self.checkpoint_all_dbs(
3355 &epoch_checkpoint_path,
3356 cur_epoch_store,
3357 checkpoint_indexes,
3358 )?;
3359 }
3360 }
3361
3362 self.get_reconfig_api()
3363 .reconfigure_cache(&epoch_start_configuration)
3364 .await;
3365
3366 let new_epoch = new_committee.epoch;
3367 let new_epoch_store = self
3368 .reopen_epoch_db(
3369 cur_epoch_store,
3370 new_committee,
3371 epoch_start_configuration,
3372 expensive_safety_check_config,
3373 epoch_last_checkpoint,
3374 )
3375 .await?;
3376 assert_eq!(new_epoch_store.epoch(), new_epoch);
3377 self.transaction_manager.reconfigure(new_epoch);
3378 *execution_lock = new_epoch;
3379 Ok(new_epoch_store)
3383 }
3384
3385 pub async fn reconfigure_for_testing(&self) {
3390 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3391 let epoch_store = self.epoch_store_for_testing().clone();
3392 let protocol_config = epoch_store.protocol_config().clone();
3393 let _guard =
3401 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3402 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3403 self.get_backing_package_store().clone(),
3404 self.get_object_store().clone(),
3405 &self.config.expensive_safety_check_config,
3406 self.checkpoint_store
3407 .get_epoch_last_checkpoint(epoch_store.epoch())
3408 .unwrap()
3409 .map(|c| *c.sequence_number())
3410 .unwrap_or_default(),
3411 );
3412 let new_epoch = new_epoch_store.epoch();
3413 self.transaction_manager.reconfigure(new_epoch);
3414 self.epoch_store.store(new_epoch_store);
3415 epoch_store.epoch_terminated().await;
3416 *execution_lock = new_epoch;
3417 }
3418
3419 #[instrument(level = "error", skip_all)]
3420 fn check_system_consistency(
3421 &self,
3422 cur_epoch_store: &AuthorityPerEpochStore,
3423 accumulator: Arc<StateAccumulator>,
3424 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3425 epoch_supply_change: i64,
3426 ) -> IotaResult<()> {
3427 info!(
3428 "Performing iota conservation consistency check for epoch {}",
3429 cur_epoch_store.epoch()
3430 );
3431
3432 if cfg!(debug_assertions) {
3433 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3434 }
3435
3436 self.get_reconfig_api()
3437 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3438
3439 if expensive_safety_check_config.enable_state_consistency_check() {
3441 info!(
3442 "Performing state consistency check for epoch {}",
3443 cur_epoch_store.epoch()
3444 );
3445 self.expensive_check_is_consistent_state(
3446 accumulator,
3447 cur_epoch_store,
3448 cfg!(debug_assertions), );
3450 }
3451
3452 if expensive_safety_check_config.enable_secondary_index_checks() {
3453 if let Some(indexes) = self.indexes.clone() {
3454 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3455 .expect("secondary indexes are inconsistent");
3456 }
3457 }
3458
3459 Ok(())
3460 }
3461
3462 fn expensive_check_is_consistent_state(
3463 &self,
3464 accumulator: Arc<StateAccumulator>,
3465 cur_epoch_store: &AuthorityPerEpochStore,
3466 panic: bool,
3467 ) {
3468 let live_object_set_hash = accumulator.digest_live_object_set();
3469
3470 let root_state_hash: ECMHLiveObjectSetDigest = self
3471 .get_accumulator_store()
3472 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3473 .expect("Retrieving root state hash cannot fail")
3474 .expect("Root state hash for epoch must exist")
3475 .1
3476 .digest()
3477 .into();
3478
3479 let is_inconsistent = root_state_hash != live_object_set_hash;
3480 if is_inconsistent {
3481 if panic {
3482 panic!(
3483 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3484 );
3485 } else {
3486 error!(
3487 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3488 root_state_hash, live_object_set_hash
3489 );
3490 }
3491 } else {
3492 info!("State consistency check passed");
3493 }
3494
3495 if !panic {
3496 accumulator.set_inconsistent_state(is_inconsistent);
3497 }
3498 }
3499
3500 pub fn current_epoch_for_testing(&self) -> EpochId {
3501 self.epoch_store_for_testing().epoch()
3502 }
3503
3504 #[instrument(level = "error", skip_all)]
3505 pub fn checkpoint_all_dbs(
3506 &self,
3507 checkpoint_path: &Path,
3508 cur_epoch_store: &AuthorityPerEpochStore,
3509 checkpoint_indexes: bool,
3510 ) -> IotaResult {
3511 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3512 let current_epoch = cur_epoch_store.epoch();
3513
3514 if checkpoint_path.exists() {
3515 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3516 return Ok(());
3517 }
3518
3519 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3520 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3521
3522 if checkpoint_path_tmp.exists() {
3523 fs::remove_dir_all(&checkpoint_path_tmp)
3524 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3525 }
3526
3527 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3528 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3529
3530 self.checkpoint_store
3533 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3534
3535 self.get_reconfig_api()
3536 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3537
3538 self.committee_store
3539 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3540
3541 if checkpoint_indexes {
3542 if let Some(indexes) = self.indexes.as_ref() {
3543 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3544 }
3545 }
3546
3547 fs::rename(checkpoint_path_tmp, checkpoint_path)
3548 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3549 Ok(())
3550 }
3551
3552 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3559 self.epoch_store.load()
3560 }
3561
3562 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3564 self.load_epoch_store_one_call_per_task()
3565 }
3566
3567 pub fn clone_committee_for_testing(&self) -> Committee {
3568 Committee::clone(self.epoch_store_for_testing().committee())
3569 }
3570
3571 #[instrument(level = "trace", skip_all)]
3572 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3573 self.get_object_store()
3574 .try_get_object(object_id)
3575 .map_err(Into::into)
3576 }
3577
3578 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3580 self.try_get_object(object_id)
3581 .await
3582 .expect("storage access failed")
3583 }
3584
3585 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3586 Ok(self
3587 .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3588 .await?
3589 .expect("framework object should always exist")
3590 .compute_object_reference())
3591 }
3592
3593 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3595 self.get_object_cache_reader()
3596 .try_get_iota_system_state_object_unsafe()
3597 }
3598
3599 #[instrument(level = "trace", skip_all)]
3600 pub fn get_checkpoint_by_sequence_number(
3601 &self,
3602 sequence_number: CheckpointSequenceNumber,
3603 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3604 Ok(self
3605 .checkpoint_store
3606 .get_checkpoint_by_sequence_number(sequence_number)?)
3607 }
3608
3609 #[instrument(level = "trace", skip_all)]
3610 pub fn get_transaction_checkpoint_for_tests(
3611 &self,
3612 digest: &TransactionDigest,
3613 epoch_store: &AuthorityPerEpochStore,
3614 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3615 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3616 let Some(checkpoint) = checkpoint else {
3617 return Ok(None);
3618 };
3619 let checkpoint = self
3620 .checkpoint_store
3621 .get_checkpoint_by_sequence_number(checkpoint)?;
3622 Ok(checkpoint)
3623 }
3624
3625 #[instrument(level = "trace", skip_all)]
3626 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3627 Ok(
3628 match self
3629 .get_object_cache_reader()
3630 .try_get_latest_object_or_tombstone(*object_id)?
3631 {
3632 Some((_, ObjectOrTombstone::Object(object))) => {
3633 let layout = self.get_object_layout(&object)?;
3634 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3635 }
3636 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3637 None => ObjectRead::NotExists(*object_id),
3638 },
3639 )
3640 }
3641
3642 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3644 self.chain_identifier
3645 }
3646
3647 #[instrument(level = "trace", skip_all)]
3648 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3649 where
3650 T: DeserializeOwned,
3651 {
3652 let o = self.get_object_read(object_id)?.into_object()?;
3653 if let Some(move_object) = o.data.try_as_move() {
3654 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3655 IotaError::ObjectDeserialization {
3656 error: format!("{e}"),
3657 }
3658 })?)
3659 } else {
3660 Err(IotaError::ObjectDeserialization {
3661 error: format!("Provided object : [{object_id}] is not a Move object."),
3662 })
3663 }
3664 }
3665
3666 #[instrument(level = "trace", skip_all)]
3672 pub fn get_past_object_read(
3673 &self,
3674 object_id: &ObjectID,
3675 version: SequenceNumber,
3676 ) -> IotaResult<PastObjectRead> {
3677 let Some(obj_ref) = self
3679 .get_object_cache_reader()
3680 .try_get_latest_object_ref_or_tombstone(*object_id)?
3681 else {
3682 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3683 };
3684
3685 if version > obj_ref.1 {
3686 return Ok(PastObjectRead::VersionTooHigh {
3687 object_id: *object_id,
3688 asked_version: version,
3689 latest_version: obj_ref.1,
3690 });
3691 }
3692
3693 if version < obj_ref.1 {
3694 return Ok(match self.read_object_at_version(object_id, version)? {
3696 Some((object, layout)) => {
3697 let obj_ref = object.compute_object_reference();
3698 PastObjectRead::VersionFound(obj_ref, object, layout)
3699 }
3700
3701 None => PastObjectRead::VersionNotFound(*object_id, version),
3702 });
3703 }
3704
3705 if !obj_ref.2.is_alive() {
3706 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3707 }
3708
3709 match self.read_object_at_version(object_id, obj_ref.1)? {
3710 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3711 None => {
3712 error!(
3713 "Object with in parent_entry is missing from object store, datastore is \
3714 inconsistent",
3715 );
3716 Err(UserInputError::ObjectNotFound {
3717 object_id: *object_id,
3718 version: Some(obj_ref.1),
3719 }
3720 .into())
3721 }
3722 }
3723 }
3724
3725 #[instrument(level = "trace", skip_all)]
3726 fn read_object_at_version(
3727 &self,
3728 object_id: &ObjectID,
3729 version: SequenceNumber,
3730 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3731 let Some(object) = self
3732 .get_object_cache_reader()
3733 .try_get_object_by_key(object_id, version)?
3734 else {
3735 return Ok(None);
3736 };
3737
3738 let layout = self.get_object_layout(&object)?;
3739 Ok(Some((object, layout)))
3740 }
3741
3742 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3743 let layout = object
3744 .data
3745 .try_as_move()
3746 .map(|object| {
3747 into_struct_layout(
3748 self.load_epoch_store_one_call_per_task()
3749 .executor()
3750 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3752 .get_annotated_layout(&object.type_().clone().into())?,
3753 )
3754 })
3755 .transpose()?;
3756 Ok(layout)
3757 }
3758
3759 fn get_owner_at_version(
3760 &self,
3761 object_id: &ObjectID,
3762 version: SequenceNumber,
3763 ) -> IotaResult<Owner> {
3764 self.get_object_store()
3765 .try_get_object_by_key(object_id, version)?
3766 .ok_or_else(|| {
3767 IotaError::from(UserInputError::ObjectNotFound {
3768 object_id: *object_id,
3769 version: Some(version),
3770 })
3771 })
3772 .map(|o| o.owner)
3773 }
3774
3775 #[instrument(level = "trace", skip_all)]
3776 pub fn get_owner_objects(
3777 &self,
3778 owner: IotaAddress,
3779 cursor: Option<ObjectID>,
3781 limit: usize,
3782 filter: Option<IotaObjectDataFilter>,
3783 ) -> IotaResult<Vec<ObjectInfo>> {
3784 if let Some(indexes) = &self.indexes {
3785 indexes.get_owner_objects(owner, cursor, limit, filter)
3786 } else {
3787 Err(IotaError::IndexStoreNotAvailable)
3788 }
3789 }
3790
3791 #[instrument(level = "trace", skip_all)]
3792 pub fn get_owned_coins_iterator_with_cursor(
3793 &self,
3794 owner: IotaAddress,
3795 cursor: (String, ObjectID),
3797 limit: usize,
3798 one_coin_type_only: bool,
3799 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3800 if let Some(indexes) = &self.indexes {
3801 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3802 } else {
3803 Err(IotaError::IndexStoreNotAvailable)
3804 }
3805 }
3806
3807 #[instrument(level = "trace", skip_all)]
3808 pub fn get_owner_objects_iterator(
3809 &self,
3810 owner: IotaAddress,
3811 cursor: Option<ObjectID>,
3813 filter: Option<IotaObjectDataFilter>,
3814 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3815 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3816 if let Some(indexes) = &self.indexes {
3817 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3818 } else {
3819 Err(IotaError::IndexStoreNotAvailable)
3820 }
3821 }
3822
3823 #[instrument(level = "trace", skip_all)]
3824 pub async fn get_move_objects<T>(
3825 &self,
3826 owner: IotaAddress,
3827 type_: MoveObjectType,
3828 ) -> IotaResult<Vec<T>>
3829 where
3830 T: DeserializeOwned,
3831 {
3832 let object_ids = self
3833 .get_owner_objects_iterator(owner, None, None)?
3834 .filter(|o| match &o.type_ {
3835 ObjectType::Struct(s) => &type_ == s,
3836 ObjectType::Package => false,
3837 })
3838 .map(|info| ObjectKey(info.object_id, info.version))
3839 .collect::<Vec<_>>();
3840 let mut move_objects = vec![];
3841
3842 let objects = self
3843 .get_object_store()
3844 .try_multi_get_objects_by_key(&object_ids)?;
3845
3846 for (o, id) in objects.into_iter().zip(object_ids) {
3847 let object = o.ok_or_else(|| {
3848 IotaError::from(UserInputError::ObjectNotFound {
3849 object_id: id.0,
3850 version: Some(id.1),
3851 })
3852 })?;
3853 let move_object = object.data.try_as_move().ok_or_else(|| {
3854 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3855 })?;
3856 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3857 IotaError::ObjectDeserialization {
3858 error: format!("{e}"),
3859 }
3860 })?);
3861 }
3862 Ok(move_objects)
3863 }
3864
3865 #[instrument(level = "trace", skip_all)]
3866 pub fn get_dynamic_fields(
3867 &self,
3868 owner: ObjectID,
3869 cursor: Option<ObjectID>,
3871 limit: usize,
3872 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3873 Ok(self
3874 .get_dynamic_fields_iterator(owner, cursor)?
3875 .take(limit)
3876 .collect::<Result<Vec<_>, _>>()?)
3877 }
3878
3879 fn get_dynamic_fields_iterator(
3880 &self,
3881 owner: ObjectID,
3882 cursor: Option<ObjectID>,
3884 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3885 {
3886 if let Some(indexes) = &self.indexes {
3887 indexes.get_dynamic_fields_iterator(owner, cursor)
3888 } else {
3889 Err(IotaError::IndexStoreNotAvailable)
3890 }
3891 }
3892
3893 #[instrument(level = "trace", skip_all)]
3894 pub fn get_dynamic_field_object_id(
3895 &self,
3896 owner: ObjectID,
3897 name_type: TypeTag,
3898 name_bcs_bytes: &[u8],
3899 ) -> IotaResult<Option<ObjectID>> {
3900 if let Some(indexes) = &self.indexes {
3901 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3902 } else {
3903 Err(IotaError::IndexStoreNotAvailable)
3904 }
3905 }
3906
3907 #[instrument(level = "trace", skip_all)]
3908 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3909 Ok(self.get_indexes()?.next_sequence_number())
3910 }
3911
3912 #[instrument(level = "trace", skip_all)]
3913 pub async fn get_executed_transaction_and_effects(
3914 &self,
3915 digest: TransactionDigest,
3916 kv_store: Arc<TransactionKeyValueStore>,
3917 ) -> IotaResult<(Transaction, TransactionEffects)> {
3918 let transaction = kv_store.get_tx(digest).await?;
3919 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3920 Ok((transaction, effects))
3921 }
3922
3923 #[instrument(level = "trace", skip_all)]
3924 pub fn multi_get_checkpoint_by_sequence_number(
3925 &self,
3926 sequence_numbers: &[CheckpointSequenceNumber],
3927 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3928 Ok(self
3929 .checkpoint_store
3930 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3931 }
3932
3933 #[instrument(level = "trace", skip_all)]
3934 pub fn get_transaction_events(
3935 &self,
3936 digest: &TransactionEventsDigest,
3937 ) -> IotaResult<TransactionEvents> {
3938 self.get_transaction_cache_reader()
3939 .try_get_events(digest)?
3940 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3941 }
3942
3943 pub fn get_transaction_input_objects(
3944 &self,
3945 effects: &TransactionEffects,
3946 ) -> anyhow::Result<Vec<Object>> {
3947 iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
3948 .map_err(Into::into)
3949 }
3950
3951 pub fn get_transaction_output_objects(
3952 &self,
3953 effects: &TransactionEffects,
3954 ) -> anyhow::Result<Vec<Object>> {
3955 iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
3956 .map_err(Into::into)
3957 }
3958
3959 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3960 match &self.indexes {
3961 Some(i) => Ok(i.clone()),
3962 None => Err(IotaError::UnsupportedFeature {
3963 error: "extended object indexing is not enabled on this server".into(),
3964 }),
3965 }
3966 }
3967
3968 pub async fn get_transactions_for_tests(
3969 self: &Arc<Self>,
3970 filter: Option<TransactionFilter>,
3971 cursor: Option<TransactionDigest>,
3972 limit: Option<usize>,
3973 reverse: bool,
3974 ) -> IotaResult<Vec<TransactionDigest>> {
3975 let metrics = KeyValueStoreMetrics::new_for_tests();
3976 let kv_store = Arc::new(TransactionKeyValueStore::new(
3977 "rocksdb",
3978 metrics,
3979 self.clone(),
3980 ));
3981 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3982 .await
3983 }
3984
3985 #[instrument(level = "trace", skip_all)]
3986 pub async fn get_transactions(
3987 &self,
3988 kv_store: &Arc<TransactionKeyValueStore>,
3989 filter: Option<TransactionFilter>,
3990 cursor: Option<TransactionDigest>,
3992 limit: Option<usize>,
3993 reverse: bool,
3994 ) -> IotaResult<Vec<TransactionDigest>> {
3995 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3996 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3997 let iter = checkpoint_contents.iter().map(|c| c.transaction);
3998 if reverse {
3999 let iter = iter
4000 .rev()
4001 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4002 .skip(usize::from(cursor.is_some()));
4003 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4004 } else {
4005 let iter = iter
4006 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4007 .skip(usize::from(cursor.is_some()));
4008 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4009 }
4010 }
4011 self.get_indexes()?
4012 .get_transactions(filter, cursor, limit, reverse)
4013 }
4014
4015 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4016 &self.checkpoint_store
4017 }
4018
4019 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4020 self.get_checkpoint_store()
4021 .get_highest_executed_checkpoint_seq_number()?
4022 .ok_or(IotaError::UserInput {
4023 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4024 })
4025 }
4026
4027 #[cfg(msim)]
4028 pub fn get_highest_pruned_checkpoint_for_testing(
4029 &self,
4030 ) -> IotaResult<CheckpointSequenceNumber> {
4031 self.database_for_testing()
4032 .perpetual_tables
4033 .get_highest_pruned_checkpoint()
4034 .map(|c| c.unwrap_or(0))
4035 .map_err(Into::into)
4036 }
4037
4038 #[instrument(level = "trace", skip_all)]
4039 pub fn get_checkpoint_summary_by_sequence_number(
4040 &self,
4041 sequence_number: CheckpointSequenceNumber,
4042 ) -> IotaResult<CheckpointSummary> {
4043 let verified_checkpoint = self
4044 .get_checkpoint_store()
4045 .get_checkpoint_by_sequence_number(sequence_number)?;
4046 match verified_checkpoint {
4047 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4048 None => Err(IotaError::UserInput {
4049 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4050 }),
4051 }
4052 }
4053
4054 #[instrument(level = "trace", skip_all)]
4055 pub fn get_checkpoint_summary_by_digest(
4056 &self,
4057 digest: CheckpointDigest,
4058 ) -> IotaResult<CheckpointSummary> {
4059 let verified_checkpoint = self
4060 .get_checkpoint_store()
4061 .get_checkpoint_by_digest(&digest)?;
4062 match verified_checkpoint {
4063 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4064 None => Err(IotaError::UserInput {
4065 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4066 }),
4067 }
4068 }
4069
4070 #[instrument(level = "trace", skip_all)]
4071 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4072 if is_system_package(package_id) {
4073 return self.find_genesis_txn_digest();
4074 }
4075 Ok(self
4076 .get_object_read(&package_id)?
4077 .into_object()?
4078 .previous_transaction)
4079 }
4080
4081 #[instrument(level = "trace", skip_all)]
4082 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4083 let summary = self
4084 .get_verified_checkpoint_by_sequence_number(0)?
4085 .into_message();
4086 let content = self.get_checkpoint_contents(summary.content_digest)?;
4087 let genesis_transaction = content.enumerate_transactions(&summary).next();
4088 Ok(genesis_transaction
4089 .ok_or(IotaError::UserInput {
4090 error: UserInputError::GenesisTransactionNotFound,
4091 })?
4092 .1
4093 .transaction)
4094 }
4095
4096 #[instrument(level = "trace", skip_all)]
4097 pub fn get_verified_checkpoint_by_sequence_number(
4098 &self,
4099 sequence_number: CheckpointSequenceNumber,
4100 ) -> IotaResult<VerifiedCheckpoint> {
4101 let verified_checkpoint = self
4102 .get_checkpoint_store()
4103 .get_checkpoint_by_sequence_number(sequence_number)?;
4104 match verified_checkpoint {
4105 Some(verified_checkpoint) => Ok(verified_checkpoint),
4106 None => Err(IotaError::UserInput {
4107 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4108 }),
4109 }
4110 }
4111
4112 #[instrument(level = "trace", skip_all)]
4113 pub fn get_verified_checkpoint_summary_by_digest(
4114 &self,
4115 digest: CheckpointDigest,
4116 ) -> IotaResult<VerifiedCheckpoint> {
4117 let verified_checkpoint = self
4118 .get_checkpoint_store()
4119 .get_checkpoint_by_digest(&digest)?;
4120 match verified_checkpoint {
4121 Some(verified_checkpoint) => Ok(verified_checkpoint),
4122 None => Err(IotaError::UserInput {
4123 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4124 }),
4125 }
4126 }
4127
4128 #[instrument(level = "trace", skip_all)]
4129 pub fn get_checkpoint_contents(
4130 &self,
4131 digest: CheckpointContentsDigest,
4132 ) -> IotaResult<CheckpointContents> {
4133 self.get_checkpoint_store()
4134 .get_checkpoint_contents(&digest)?
4135 .ok_or(IotaError::UserInput {
4136 error: UserInputError::CheckpointContentsNotFound(digest),
4137 })
4138 }
4139
4140 #[instrument(level = "trace", skip_all)]
4141 pub fn get_checkpoint_contents_by_sequence_number(
4142 &self,
4143 sequence_number: CheckpointSequenceNumber,
4144 ) -> IotaResult<CheckpointContents> {
4145 let verified_checkpoint = self
4146 .get_checkpoint_store()
4147 .get_checkpoint_by_sequence_number(sequence_number)?;
4148 match verified_checkpoint {
4149 Some(verified_checkpoint) => {
4150 let content_digest = verified_checkpoint.into_inner().content_digest;
4151 self.get_checkpoint_contents(content_digest)
4152 }
4153 None => Err(IotaError::UserInput {
4154 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4155 }),
4156 }
4157 }
4158
4159 #[instrument(level = "trace", skip_all)]
4160 pub async fn query_events(
4161 &self,
4162 kv_store: &Arc<TransactionKeyValueStore>,
4163 query: EventFilter,
4164 cursor: Option<EventID>,
4166 limit: usize,
4167 descending: bool,
4168 ) -> IotaResult<Vec<IotaEvent>> {
4169 let index_store = self.get_indexes()?;
4170
4171 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4173 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4174 IotaError::TransactionNotFound {
4175 digest: cursor.tx_digest,
4176 },
4177 )?;
4178 (tx_seq, cursor.event_seq as usize)
4179 } else if descending {
4180 (u64::MAX, usize::MAX)
4181 } else {
4182 (0, 0)
4183 };
4184
4185 let limit = limit + 1;
4186 let mut event_keys = match query {
4187 EventFilter::All(filters) => {
4188 if filters.is_empty() {
4189 index_store.all_events(tx_num, event_num, limit, descending)?
4190 } else {
4191 return Err(IotaError::UserInput {
4192 error: UserInputError::Unsupported(
4193 "This query type does not currently support filter combinations"
4194 .to_string(),
4195 ),
4196 });
4197 }
4198 }
4199 EventFilter::Transaction(digest) => {
4200 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4201 }
4202 EventFilter::MoveModule { package, module } => {
4203 let module_id = ModuleId::new(package.into(), module);
4204 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4205 }
4206 EventFilter::MoveEventType(struct_name) => index_store
4207 .events_by_move_event_struct_name(
4208 &struct_name,
4209 tx_num,
4210 event_num,
4211 limit,
4212 descending,
4213 )?,
4214 EventFilter::Sender(sender) => {
4215 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4216 }
4217 EventFilter::TimeRange {
4218 start_time,
4219 end_time,
4220 } => index_store
4221 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4222 EventFilter::MoveEventModule { package, module } => index_store
4223 .events_by_move_event_module(
4224 &ModuleId::new(package.into(), module),
4225 tx_num,
4226 event_num,
4227 limit,
4228 descending,
4229 )?,
4230 EventFilter::Package(_)
4232 | EventFilter::MoveEventField { .. }
4233 | EventFilter::Any(_)
4234 | EventFilter::And(_, _)
4235 | EventFilter::Or(_, _) => {
4236 return Err(IotaError::UserInput {
4237 error: UserInputError::Unsupported(
4238 "This query type is not supported by the full node.".to_string(),
4239 ),
4240 });
4241 }
4242 };
4243
4244 if cursor.is_some() {
4247 if !event_keys.is_empty() {
4248 event_keys.remove(0);
4249 }
4250 } else {
4251 event_keys.truncate(limit - 1);
4252 }
4253
4254 let transaction_digests = event_keys
4256 .iter()
4257 .map(|(_, digest, _, _)| *digest)
4258 .collect::<HashSet<_>>()
4259 .into_iter()
4260 .collect::<Vec<_>>();
4261
4262 let events = kv_store
4263 .multi_get_events_by_tx_digests(&transaction_digests)
4264 .await?;
4265
4266 let events_map: HashMap<_, _> =
4267 transaction_digests.iter().zip(events.into_iter()).collect();
4268
4269 let stored_events = event_keys
4270 .into_iter()
4271 .map(|k| {
4272 (
4273 k,
4274 events_map
4275 .get(&k.1)
4276 .expect("fetched digest is missing")
4277 .clone()
4278 .and_then(|e| e.data.get(k.2).cloned()),
4279 )
4280 })
4281 .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4282 event
4283 .map(|e| (e, tx_digest, event_seq, timestamp))
4284 .ok_or(IotaError::TransactionEventsNotFound { digest })
4285 })
4286 .collect::<Result<Vec<_>, _>>()?;
4287
4288 let epoch_store = self.load_epoch_store_one_call_per_task();
4289 let backing_store = self.get_backing_package_store().as_ref();
4290 let mut layout_resolver = epoch_store
4291 .executor()
4292 .type_layout_resolver(Box::new(backing_store));
4293 let mut events = vec![];
4294 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4295 events.push(IotaEvent::try_from(
4296 e.clone(),
4297 tx_digest,
4298 event_seq as u64,
4299 Some(timestamp),
4300 layout_resolver.get_annotated_layout(&e.type_)?,
4301 )?)
4302 }
4303 Ok(events)
4304 }
4305
4306 pub async fn insert_genesis_object(&self, object: Object) {
4307 self.get_reconfig_api()
4308 .try_insert_genesis_object(object)
4309 .expect("Cannot insert genesis object")
4310 }
4311
4312 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4313 futures::future::join_all(
4314 objects
4315 .iter()
4316 .map(|o| self.insert_genesis_object(o.clone())),
4317 )
4318 .await;
4319 }
4320
4321 #[instrument(level = "trace", skip_all)]
4323 pub fn get_transaction_status(
4324 &self,
4325 transaction_digest: &TransactionDigest,
4326 epoch_store: &Arc<AuthorityPerEpochStore>,
4327 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4328 if let Some(effects) =
4330 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4331 {
4332 if let Some(transaction) = self
4333 .get_transaction_cache_reader()
4334 .try_get_transaction_block(transaction_digest)?
4335 {
4336 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4337 let events = if let Some(digest) = effects.events_digest() {
4338 self.get_transaction_events(digest)?
4339 } else {
4340 TransactionEvents::default()
4341 };
4342 return Ok(Some((
4343 (*transaction).clone().into_message(),
4344 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4345 )));
4346 } else {
4347 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4352 }
4353 }
4354 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4355 self.metrics.tx_already_processed.inc();
4356 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4357 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4358 } else {
4359 Ok(None)
4360 }
4361 }
4362
4363 #[instrument(level = "trace", skip_all)]
4367 pub fn get_signed_effects_and_maybe_resign(
4368 &self,
4369 transaction_digest: &TransactionDigest,
4370 epoch_store: &Arc<AuthorityPerEpochStore>,
4371 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4372 let effects = self
4373 .get_transaction_cache_reader()
4374 .try_get_executed_effects(transaction_digest)?;
4375 match effects {
4376 Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4377 None => Ok(None),
4378 }
4379 }
4380
4381 #[instrument(level = "trace", skip_all)]
4382 pub(crate) fn sign_effects(
4383 &self,
4384 effects: TransactionEffects,
4385 epoch_store: &Arc<AuthorityPerEpochStore>,
4386 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4387 let tx_digest = *effects.transaction_digest();
4388 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4389 Some(sig) if sig.epoch == epoch_store.epoch() => {
4390 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4391 }
4392 _ => {
4393 debug!(
4416 ?tx_digest,
4417 epoch=?epoch_store.epoch(),
4418 "Re-signing the effects with the current epoch"
4419 );
4420
4421 let sig = AuthoritySignInfo::new(
4422 epoch_store.epoch(),
4423 &effects,
4424 Intent::iota_app(IntentScope::TransactionEffects),
4425 self.name,
4426 &*self.secret,
4427 );
4428
4429 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4430
4431 epoch_store.insert_effects_digest_and_signature(
4432 &tx_digest,
4433 effects.digest(),
4434 &sig,
4435 )?;
4436
4437 effects
4438 }
4439 };
4440
4441 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4442 signed_effects,
4443 ))
4444 }
4445
4446 #[instrument(level = "trace", skip_all)]
4448 fn fullnode_only_get_tx_coins_for_indexing(
4449 &self,
4450 inner_temporary_store: &InnerTemporaryStore,
4451 epoch_store: &Arc<AuthorityPerEpochStore>,
4452 ) -> Option<TxCoins> {
4453 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4454 return None;
4455 }
4456 let written_coin_objects = inner_temporary_store
4457 .written
4458 .iter()
4459 .filter_map(|(k, v)| {
4460 if v.is_coin() {
4461 Some((*k, v.clone()))
4462 } else {
4463 None
4464 }
4465 })
4466 .collect();
4467 let input_coin_objects = inner_temporary_store
4468 .input_objects
4469 .iter()
4470 .filter_map(|(k, v)| {
4471 if v.is_coin() {
4472 Some((*k, v.clone()))
4473 } else {
4474 None
4475 }
4476 })
4477 .collect::<ObjectMap>();
4478 Some((input_coin_objects, written_coin_objects))
4479 }
4480
4481 #[instrument(level = "trace", skip_all)]
4493 pub async fn get_transaction_lock(
4494 &self,
4495 object_ref: &ObjectRef,
4496 epoch_store: &AuthorityPerEpochStore,
4497 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4498 let lock_info = self
4499 .get_object_cache_reader()
4500 .try_get_lock(*object_ref, epoch_store)?;
4501 let lock_info = match lock_info {
4502 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4503 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4504 provided_obj_ref: *object_ref,
4505 current_version: locked_ref.1,
4506 }
4507 .into());
4508 }
4509 ObjectLockStatus::Initialized => {
4510 return Ok(None);
4511 }
4512 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4513 };
4514
4515 epoch_store.get_signed_transaction(&lock_info)
4516 }
4517
4518 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4519 self.get_object_cache_reader().try_get_objects(objects)
4520 }
4521
4522 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4524 self.try_get_objects(objects)
4525 .await
4526 .expect("storage access failed")
4527 }
4528
4529 pub async fn try_get_object_or_tombstone(
4530 &self,
4531 object_id: ObjectID,
4532 ) -> IotaResult<Option<ObjectRef>> {
4533 self.get_object_cache_reader()
4534 .try_get_latest_object_ref_or_tombstone(object_id)
4535 }
4536
4537 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4539 self.try_get_object_or_tombstone(object_id)
4540 .await
4541 .expect("storage access failed")
4542 }
4543
4544 pub fn set_override_protocol_upgrade_buffer_stake(
4554 &self,
4555 expected_epoch: EpochId,
4556 buffer_stake_bps: u64,
4557 ) -> IotaResult {
4558 let epoch_store = self.load_epoch_store_one_call_per_task();
4559 let actual_epoch = epoch_store.epoch();
4560 if actual_epoch != expected_epoch {
4561 return Err(IotaError::WrongEpoch {
4562 expected_epoch,
4563 actual_epoch,
4564 });
4565 }
4566
4567 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4568 }
4569
4570 pub fn clear_override_protocol_upgrade_buffer_stake(
4571 &self,
4572 expected_epoch: EpochId,
4573 ) -> IotaResult {
4574 let epoch_store = self.load_epoch_store_one_call_per_task();
4575 let actual_epoch = epoch_store.epoch();
4576 if actual_epoch != expected_epoch {
4577 return Err(IotaError::WrongEpoch {
4578 expected_epoch,
4579 actual_epoch,
4580 });
4581 }
4582
4583 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4584 }
4585
4586 pub async fn get_available_system_packages(
4590 &self,
4591 binary_config: &BinaryConfig,
4592 ) -> Vec<ObjectRef> {
4593 let mut results = vec![];
4594
4595 let system_packages = BuiltInFramework::iter_system_packages();
4596
4597 #[cfg(msim)]
4599 let extra_packages = framework_injection::get_extra_packages(self.name);
4600 #[cfg(msim)]
4601 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4602
4603 for system_package in system_packages {
4604 let modules = system_package.modules().to_vec();
4605 #[cfg(msim)]
4607 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4608 .unwrap_or(modules);
4609
4610 let Some(obj_ref) = iota_framework::compare_system_package(
4611 &self.get_object_store(),
4612 &system_package.id,
4613 &modules,
4614 system_package.dependencies.to_vec(),
4615 binary_config,
4616 )
4617 .await
4618 else {
4619 return vec![];
4620 };
4621 results.push(obj_ref);
4622 }
4623
4624 results
4625 }
4626
4627 async fn get_system_package_bytes(
4644 &self,
4645 system_packages: Vec<ObjectRef>,
4646 binary_config: &BinaryConfig,
4647 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4648 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4649 let objects = self.get_objects(&ids).await;
4650
4651 let mut res = Vec::with_capacity(system_packages.len());
4652 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4653 let prev_transaction = match object {
4654 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4655 info!("Framework {} does not need updating", system_package_ref.0);
4657 continue;
4658 }
4659
4660 Some(cur_object) => cur_object.previous_transaction,
4661 None => TransactionDigest::genesis_marker(),
4662 };
4663
4664 #[cfg(msim)]
4665 let SystemPackage {
4666 id: _,
4667 bytes,
4668 dependencies,
4669 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4670 .unwrap_or_else(|| {
4671 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4672 });
4673
4674 #[cfg(not(msim))]
4675 let SystemPackage {
4676 id: _,
4677 bytes,
4678 dependencies,
4679 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4680
4681 let modules: Vec<_> = bytes
4682 .iter()
4683 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4684 .collect();
4685
4686 let new_object = Object::new_system_package(
4687 &modules,
4688 system_package_ref.1,
4689 dependencies.clone(),
4690 prev_transaction,
4691 );
4692
4693 let new_ref = new_object.compute_object_reference();
4694 if new_ref != system_package_ref {
4695 error!(
4696 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4697 );
4698 return None;
4699 }
4700
4701 res.push((system_package_ref.1, bytes, dependencies));
4702 }
4703
4704 Some(res)
4705 }
4706
4707 fn is_protocol_version_supported_v1(
4711 proposed_protocol_version: ProtocolVersion,
4712 committee: &Committee,
4713 capabilities: Vec<AuthorityCapabilitiesV1>,
4714 mut buffer_stake_bps: u64,
4715 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4716 if buffer_stake_bps > 10000 {
4717 warn!("clamping buffer_stake_bps to 10000");
4718 buffer_stake_bps = 10000;
4719 }
4720
4721 let mut desired_upgrades: Vec<_> = capabilities
4724 .into_iter()
4725 .filter_map(|mut cap| {
4726 if cap.available_system_packages.is_empty() {
4728 return None;
4729 }
4730
4731 cap.available_system_packages.sort();
4732
4733 info!(
4734 "validator {:?} supports {:?} with system packages: {:?}",
4735 cap.authority.concise(),
4736 cap.supported_protocol_versions,
4737 cap.available_system_packages,
4738 );
4739
4740 cap.supported_protocol_versions
4744 .get_version_digest(proposed_protocol_version)
4745 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4746 })
4747 .collect();
4748
4749 desired_upgrades.sort();
4752 desired_upgrades
4753 .into_iter()
4754 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4755 .into_iter()
4756 .find_map(|((digest, packages), group)| {
4757 assert!(!packages.is_empty());
4759
4760 let mut stake_aggregator: StakeAggregator<(), true> =
4761 StakeAggregator::new(Arc::new(committee.clone()));
4762
4763 for (_, _, authority) in group {
4764 stake_aggregator.insert_generic(authority, ());
4765 }
4766
4767 let total_votes = stake_aggregator.total_votes();
4768 let quorum_threshold = committee.quorum_threshold();
4769 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4770
4771 info!(
4772 protocol_config_digest = ?digest,
4773 ?total_votes,
4774 ?quorum_threshold,
4775 ?buffer_stake_bps,
4776 ?effective_threshold,
4777 ?proposed_protocol_version,
4778 ?packages,
4779 "support for upgrade"
4780 );
4781
4782 let has_support = total_votes >= effective_threshold;
4783 has_support.then_some((proposed_protocol_version, digest, packages))
4784 })
4785 }
4786
4787 fn choose_protocol_version_and_system_packages_v1(
4791 current_protocol_version: ProtocolVersion,
4792 current_protocol_digest: Digest,
4793 committee: &Committee,
4794 capabilities: Vec<AuthorityCapabilitiesV1>,
4795 buffer_stake_bps: u64,
4796 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4797 let mut next_protocol_version = current_protocol_version;
4798 let mut system_packages = vec![];
4799 let mut protocol_version_digest = current_protocol_digest;
4800
4801 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4805 next_protocol_version + 1,
4806 committee,
4807 capabilities.clone(),
4808 buffer_stake_bps,
4809 ) {
4810 next_protocol_version = version;
4811 protocol_version_digest = digest;
4812 system_packages = packages;
4813 }
4814
4815 (
4816 next_protocol_version,
4817 protocol_version_digest,
4818 system_packages,
4819 )
4820 }
4821
4822 fn get_validators_supporting_protocol_version(
4827 target_protocol_version: ProtocolVersion,
4828 target_digest: Digest,
4829 active_validators: &[AuthorityPublicKey],
4830 capabilities: &[AuthorityCapabilitiesV1],
4831 ) -> Vec<u64> {
4832 let mut eligible_validators = Vec::new();
4833
4834 for capability in capabilities {
4835 if let Some(digest) = capability
4837 .supported_protocol_versions
4838 .get_version_digest(target_protocol_version)
4839 {
4840 if digest == target_digest {
4841 if let Some(index) = active_validators
4843 .iter()
4844 .position(|name| AuthorityName::from(name) == capability.authority)
4845 {
4846 eligible_validators.push(index as u64);
4847 }
4848 }
4849 }
4850 }
4851
4852 eligible_validators.sort();
4854 eligible_validators
4855 }
4856
4857 fn calculate_eligible_validators_weight(
4862 eligible_validator_indices: &[u64],
4863 active_validators: &[AuthorityPublicKey],
4864 committee: &Committee,
4865 ) -> u64 {
4866 let mut total_weight = 0u64;
4867
4868 for &index in eligible_validator_indices {
4869 let authority_pubkey = &active_validators[index as usize];
4870 if let Some((_, weight)) = committee
4872 .members()
4873 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4874 {
4875 total_weight += weight;
4876 }
4877 }
4878
4879 total_weight
4880 }
4881
4882 #[instrument(level = "debug", skip_all)]
4883 fn create_authenticator_state_tx(
4884 &self,
4885 epoch_store: &Arc<AuthorityPerEpochStore>,
4886 ) -> Option<EndOfEpochTransactionKind> {
4887 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4888 info!("authenticator state transactions not enabled");
4889 return None;
4890 }
4891
4892 let authenticator_state_exists = epoch_store.authenticator_state_exists();
4893 let tx = if authenticator_state_exists {
4894 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4895 let min_epoch =
4896 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4897 let authenticator_obj_initial_shared_version = epoch_store
4898 .epoch_start_config()
4899 .authenticator_obj_initial_shared_version()
4900 .expect("initial version must exist");
4901
4902 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4903 min_epoch,
4904 authenticator_obj_initial_shared_version,
4905 );
4906
4907 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4908
4909 tx
4910 } else {
4911 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4912 info!("Creating AuthenticatorStateCreate tx");
4913 tx
4914 };
4915 Some(tx)
4916 }
4917
4918 #[instrument(level = "error", skip_all)]
4931 pub async fn create_and_execute_advance_epoch_tx(
4932 &self,
4933 epoch_store: &Arc<AuthorityPerEpochStore>,
4934 gas_cost_summary: &GasCostSummary,
4935 checkpoint: CheckpointSequenceNumber,
4936 epoch_start_timestamp_ms: CheckpointTimestamp,
4937 scores: Vec<u64>,
4938 ) -> anyhow::Result<(
4939 IotaSystemState,
4940 Option<SystemEpochInfoEvent>,
4941 TransactionEffects,
4942 )> {
4943 let mut txns = Vec::new();
4944
4945 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4946 txns.push(tx);
4947 }
4948
4949 let next_epoch = epoch_store.epoch() + 1;
4950
4951 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4952 let authority_capabilities = epoch_store
4953 .get_capabilities_v1()
4954 .expect("read capabilities from db cannot fail");
4955 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4956 Self::choose_protocol_version_and_system_packages_v1(
4957 epoch_store.protocol_version(),
4958 SupportedProtocolVersionsWithHashes::protocol_config_digest(
4959 epoch_store.protocol_config(),
4960 ),
4961 epoch_store.committee(),
4962 authority_capabilities.clone(),
4963 buffer_stake_bps,
4964 );
4965
4966 let config = epoch_store.protocol_config();
4970 let binary_config = to_binary_config(config);
4971 let Some(next_epoch_system_package_bytes) = self
4972 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4973 .await
4974 else {
4975 error!(
4976 "upgraded system packages {:?} are not locally available, cannot create \
4977 ChangeEpochTx. validator binary must be upgraded to the correct version!",
4978 next_epoch_system_packages
4979 );
4980 bail!("missing system packages: cannot form ChangeEpochTx");
4990 };
4991
4992 if config.select_committee_from_eligible_validators() {
4995 let active_validators = epoch_store.epoch_start_state().get_active_validators();
4997
4998 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
4999
5000 if config.select_committee_supporting_next_epoch_version() {
5004 eligible_active_validators = Self::get_validators_supporting_protocol_version(
5005 next_epoch_protocol_version,
5006 next_epoch_protocol_digest,
5007 &active_validators,
5008 &authority_capabilities,
5009 );
5010
5011 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5013 &eligible_active_validators,
5014 &active_validators,
5015 epoch_store.committee(),
5016 );
5017
5018 let committee = epoch_store.committee();
5022 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5023
5024 if eligible_validators_weight < effective_threshold {
5025 error!(
5026 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5027 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5028 );
5029 eligible_active_validators = (0..active_validators.len() as u64).collect();
5032 }
5033 }
5034
5035 if config.pass_validator_scores_to_advance_epoch() {
5038 txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5039 next_epoch,
5040 next_epoch_protocol_version,
5041 gas_cost_summary.storage_cost,
5042 gas_cost_summary.computation_cost,
5043 gas_cost_summary.computation_cost_burned,
5044 gas_cost_summary.storage_rebate,
5045 gas_cost_summary.non_refundable_storage_fee,
5046 epoch_start_timestamp_ms,
5047 next_epoch_system_package_bytes,
5048 eligible_active_validators,
5049 scores,
5050 config.adjust_rewards_by_score(),
5051 ));
5052 } else {
5053 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5054 next_epoch,
5055 next_epoch_protocol_version,
5056 gas_cost_summary.storage_cost,
5057 gas_cost_summary.computation_cost,
5058 gas_cost_summary.computation_cost_burned,
5059 gas_cost_summary.storage_rebate,
5060 gas_cost_summary.non_refundable_storage_fee,
5061 epoch_start_timestamp_ms,
5062 next_epoch_system_package_bytes,
5063 eligible_active_validators,
5064 ));
5065 }
5066 } else if config.protocol_defined_base_fee()
5067 && config.max_committee_members_count_as_option().is_some()
5068 {
5069 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5070 next_epoch,
5071 next_epoch_protocol_version,
5072 gas_cost_summary.storage_cost,
5073 gas_cost_summary.computation_cost,
5074 gas_cost_summary.computation_cost_burned,
5075 gas_cost_summary.storage_rebate,
5076 gas_cost_summary.non_refundable_storage_fee,
5077 epoch_start_timestamp_ms,
5078 next_epoch_system_package_bytes,
5079 ));
5080 } else {
5081 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5082 next_epoch,
5083 next_epoch_protocol_version,
5084 gas_cost_summary.storage_cost,
5085 gas_cost_summary.computation_cost,
5086 gas_cost_summary.storage_rebate,
5087 gas_cost_summary.non_refundable_storage_fee,
5088 epoch_start_timestamp_ms,
5089 next_epoch_system_package_bytes,
5090 ));
5091 }
5092
5093 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5094
5095 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5096 tx.clone(),
5097 epoch_store.epoch(),
5098 checkpoint,
5099 );
5100
5101 let tx_digest = executable_tx.digest();
5102
5103 info!(
5104 ?next_epoch,
5105 ?next_epoch_protocol_version,
5106 ?next_epoch_system_packages,
5107 computation_cost=?gas_cost_summary.computation_cost,
5108 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5109 storage_cost=?gas_cost_summary.storage_cost,
5110 storage_rebate=?gas_cost_summary.storage_rebate,
5111 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5112 ?tx_digest,
5113 "Creating advance epoch transaction"
5114 );
5115
5116 fail_point_async!("change_epoch_tx_delay");
5117 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5118
5119 if self
5123 .get_transaction_cache_reader()
5124 .try_is_tx_already_executed(tx_digest)?
5125 {
5126 warn!("change epoch tx has already been executed via state sync");
5127 bail!("change epoch tx has already been executed via state sync",);
5128 }
5129
5130 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5131
5132 epoch_store.assign_shared_object_versions_idempotent(
5136 self.get_object_cache_reader().as_ref(),
5137 std::slice::from_ref(&executable_tx),
5138 )?;
5139
5140 let (input_objects, _, _) =
5141 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5142
5143 let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5144 &execution_guard,
5145 &executable_tx,
5146 input_objects,
5147 None,
5148 None,
5149 epoch_store,
5150 )?;
5151 let system_obj = get_iota_system_state(&temporary_store.written)
5152 .expect("change epoch tx must write to system object");
5153 let system_epoch_info_event = temporary_store
5155 .events
5156 .data
5157 .into_iter()
5158 .find(|event| event.is_system_epoch_info_event())
5159 .map(SystemEpochInfoEvent::from);
5160 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5163
5164 self.get_state_sync_store()
5168 .try_insert_transaction_and_effects(&tx, &effects)
5169 .map_err(|err| {
5170 let err: anyhow::Error = err.into();
5171 err
5172 })?;
5173
5174 info!(
5175 "Effects summary of the change epoch transaction: {:?}",
5176 effects.summary_for_debug()
5177 );
5178 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5179 assert!(effects.status().is_ok());
5181 Ok((system_obj, system_epoch_info_event, effects))
5182 }
5183
5184 #[instrument(level = "error", skip_all)]
5188 async fn revert_uncommitted_epoch_transactions(
5189 &self,
5190 epoch_store: &AuthorityPerEpochStore,
5191 ) -> IotaResult {
5192 {
5193 let state = epoch_store.get_reconfig_state_write_lock_guard();
5194 if state.should_accept_user_certs() {
5195 epoch_store.close_user_certs(state);
5204 }
5205 }
5207 let pending_certificates = epoch_store.pending_consensus_certificates();
5208 info!(
5209 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5210 pending_certificates.len(),
5211 pending_certificates,
5212 );
5213 for digest in pending_certificates {
5214 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5215 info!(
5216 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5217 digest
5218 );
5219 continue;
5220 }
5221 info!("Reverting {:?} at the end of epoch", digest);
5222 epoch_store.revert_executed_transaction(&digest)?;
5223 self.get_reconfig_api().try_revert_state_update(&digest)?;
5224 }
5225 info!("All uncommitted local transactions reverted");
5226 Ok(())
5227 }
5228
5229 #[instrument(level = "error", skip_all)]
5230 async fn reopen_epoch_db(
5231 &self,
5232 cur_epoch_store: &AuthorityPerEpochStore,
5233 new_committee: Committee,
5234 epoch_start_configuration: EpochStartConfiguration,
5235 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5236 epoch_last_checkpoint: CheckpointSequenceNumber,
5237 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5238 let new_epoch = new_committee.epoch;
5239 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5240 assert_eq!(
5241 epoch_start_configuration.epoch_start_state().epoch(),
5242 new_committee.epoch
5243 );
5244 fail_point!("before-open-new-epoch-store");
5245 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5246 self.name,
5247 new_committee,
5248 epoch_start_configuration,
5249 self.get_backing_package_store().clone(),
5250 self.get_object_store().clone(),
5251 expensive_safety_check_config,
5252 epoch_last_checkpoint,
5253 )?;
5254 self.epoch_store.store(new_epoch_store.clone());
5255 Ok(new_epoch_store)
5256 }
5257
5258 fn check_move_account(
5261 &self,
5262 auth_account_object_id: ObjectID,
5263 auth_account_object_seq_number: Option<SequenceNumber>,
5264 auth_account_object_digest: Option<ObjectDigest>,
5265 account_object: ObjectReadResult,
5266 signer: &IotaAddress,
5267 ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5268 let account_object = match account_object.object {
5269 ObjectReadResultKind::Object(object) => Ok(object),
5270 ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5271 Err(UserInputError::AccountObjectDeleted {
5272 account_id: account_object.id(),
5273 account_version: version,
5274 transaction_digest: digest,
5275 })
5276 }
5277 ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5280 Err(UserInputError::AccountObjectInCanceledTransaction {
5281 account_id: account_object.id(),
5282 account_version: version,
5283 })
5284 }
5285 }?;
5286
5287 let account_object_addr = IotaAddress::from(auth_account_object_id);
5288
5289 fp_ensure!(
5290 signer == &account_object_addr,
5291 UserInputError::IncorrectUserSignature {
5292 error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5293 }
5294 .into()
5295 );
5296
5297 fp_ensure!(
5298 account_object.is_shared() || account_object.is_immutable(),
5299 UserInputError::AccountObjectNotSupported {
5300 object_id: auth_account_object_id
5301 }
5302 .into()
5303 );
5304
5305 let auth_account_object_seq_number =
5306 if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5307 let account_object_version = account_object.version();
5308
5309 fp_ensure!(
5310 account_object_version == auth_account_object_seq_number,
5311 UserInputError::AccountObjectVersionMismatch {
5312 object_id: auth_account_object_id,
5313 expected_version: auth_account_object_seq_number,
5314 actual_version: account_object_version,
5315 }
5316 .into()
5317 );
5318
5319 auth_account_object_seq_number
5320 } else {
5321 account_object.version()
5322 };
5323
5324 if let Some(auth_account_object_digest) = auth_account_object_digest {
5325 let expected_digest = account_object.digest();
5326 fp_ensure!(
5327 expected_digest == auth_account_object_digest,
5328 UserInputError::InvalidAccountObjectDigest {
5329 object_id: auth_account_object_id,
5330 expected_digest,
5331 actual_digest: auth_account_object_digest,
5332 }
5333 .into()
5334 );
5335 }
5336
5337 let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5338 auth_account_object_id,
5339 &AuthenticatorFunctionRefV1Key::tag().into(),
5340 &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5341 )
5342 .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5343 account_object_id: auth_account_object_id,
5344 })?;
5345
5346 let authenticator_function_ref_field = self
5347 .get_object_cache_reader()
5348 .try_find_object_lt_or_eq_version(
5349 authenticator_function_ref_field_id,
5350 auth_account_object_seq_number,
5351 )?;
5352
5353 if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5354 let field_move_object = authenticator_function_ref_field_obj
5355 .data
5356 .try_as_move()
5357 .expect("dynamic field should never be a package object");
5358
5359 let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5360 field_move_object.to_rust().ok_or(
5361 UserInputError::InvalidAuthenticatorFunctionRefField {
5362 account_object_id: auth_account_object_id,
5363 },
5364 )?;
5365
5366 Ok(AuthenticatorFunctionRefForExecution::new_v1(
5367 field.value,
5368 authenticator_function_ref_field_obj.compute_object_reference(),
5369 authenticator_function_ref_field_obj.owner,
5370 authenticator_function_ref_field_obj.storage_rebate,
5371 authenticator_function_ref_field_obj.previous_transaction,
5372 ))
5373 } else {
5374 Err(UserInputError::MoveAuthenticatorNotFound {
5375 authenticator_function_ref_id: authenticator_function_ref_field_id,
5376 account_object_id: auth_account_object_id,
5377 account_object_version: auth_account_object_seq_number,
5378 }
5379 .into())
5380 }
5381 }
5382
5383 fn read_objects_for_signing(
5384 &self,
5385 transaction: &VerifiedTransaction,
5386 epoch: u64,
5387 ) -> IotaResult<(
5388 InputObjects,
5389 ReceivingObjects,
5390 Option<InputObjects>,
5391 Option<ObjectReadResult>,
5392 )> {
5393 let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5394 Some(transaction.digest()),
5395 &transaction.collect_all_input_object_kind_for_reading()?,
5396 &transaction.data().transaction_data().receiving_objects(),
5397 epoch,
5398 )?;
5399
5400 transaction
5401 .split_input_objects_into_groups_for_reading(input_objects)
5402 .map(|(tx_input_objects, auth_input_objects, account_object)| {
5403 (
5404 tx_input_objects,
5405 tx_receiving_objects,
5406 auth_input_objects,
5407 account_object,
5408 )
5409 })
5410 }
5411
5412 fn check_transaction_inputs_for_signing(
5413 &self,
5414 protocol_config: &ProtocolConfig,
5415 reference_gas_price: u64,
5416 tx_data: &TransactionData,
5417 tx_input_objects: InputObjects,
5418 tx_receiving_objects: &ReceivingObjects,
5419 move_authenticator: Option<&MoveAuthenticator>,
5420 auth_input_objects: Option<InputObjects>,
5421 account_object: Option<ObjectReadResult>,
5422 ) -> IotaResult<(
5423 IotaGasStatus,
5424 CheckedInputObjects,
5425 Option<CheckedInputObjects>,
5426 Option<AuthenticatorFunctionRef>,
5427 )> {
5428 let (
5429 auth_checked_input_objects_union,
5430 authenticator_function_ref,
5431 authenticator_gas_budget,
5432 ) = if let Some(move_authenticator) = move_authenticator {
5433 let auth_input_objects =
5434 auth_input_objects.expect("MoveAuthenticator input objects must be provided");
5435 let account_object = account_object.expect("Move account object must be provided");
5436
5437 let (
5439 auth_account_object_id,
5440 auth_account_object_seq_number,
5441 auth_account_object_digest,
5442 ) = move_authenticator.object_to_authenticate_components()?;
5443
5444 let AuthenticatorFunctionRefForExecution {
5446 authenticator_function_ref,
5447 ..
5448 } = self.check_move_account(
5449 auth_account_object_id,
5450 auth_account_object_seq_number,
5451 auth_account_object_digest,
5452 account_object,
5453 &tx_data.sender(),
5454 )?;
5455
5456 let auth_checked_input_objects =
5458 iota_transaction_checks::check_move_authenticator_input_for_signing(
5459 auth_input_objects,
5460 )?;
5461
5462 let authenticator_gas_budget = protocol_config.max_auth_gas();
5465
5466 (
5467 Some(auth_checked_input_objects),
5468 Some(authenticator_function_ref),
5469 authenticator_gas_budget,
5470 )
5471 } else {
5472 (None, None, 0)
5473 };
5474
5475 let (gas_status, tx_checked_input_objects) =
5477 iota_transaction_checks::check_transaction_input(
5478 protocol_config,
5479 reference_gas_price,
5480 tx_data,
5481 tx_input_objects,
5482 tx_receiving_objects,
5483 &self.metrics.bytecode_verifier_metrics,
5484 &self.config.verifier_signing_config,
5485 authenticator_gas_budget,
5486 )?;
5487
5488 Ok((
5489 gas_status,
5490 tx_checked_input_objects,
5491 auth_checked_input_objects_union,
5492 authenticator_function_ref,
5493 ))
5494 }
5495
5496 #[cfg(test)]
5497 pub(crate) fn iter_live_object_set_for_testing(
5498 &self,
5499 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5500 self.get_accumulator_store()
5501 .iter_cached_live_object_set_for_testing()
5502 }
5503
5504 #[cfg(test)]
5505 pub(crate) fn shutdown_execution_for_test(&self) {
5506 self.tx_execution_shutdown
5507 .lock()
5508 .take()
5509 .unwrap()
5510 .send(())
5511 .unwrap();
5512 }
5513
5514 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5517 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5518 self.get_object_cache_reader()
5519 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5520 self.get_reconfig_api()
5521 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5522 }
5523}
5524
5525pub struct RandomnessRoundReceiver {
5526 authority_state: Arc<AuthorityState>,
5527 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5528}
5529
5530impl RandomnessRoundReceiver {
5531 pub fn spawn(
5532 authority_state: Arc<AuthorityState>,
5533 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5534 ) -> JoinHandle<()> {
5535 let rrr = RandomnessRoundReceiver {
5536 authority_state,
5537 randomness_rx,
5538 };
5539 spawn_monitored_task!(rrr.run())
5540 }
5541
5542 async fn run(mut self) {
5543 info!("RandomnessRoundReceiver event loop started");
5544
5545 loop {
5546 tokio::select! {
5547 maybe_recv = self.randomness_rx.recv() => {
5548 if let Some((epoch, round, bytes)) = maybe_recv {
5549 self.handle_new_randomness(epoch, round, bytes);
5550 } else {
5551 break;
5552 }
5553 },
5554 }
5555 }
5556
5557 info!("RandomnessRoundReceiver event loop ended");
5558 }
5559
5560 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5561 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5562 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5563 if epoch_store.epoch() != epoch {
5564 warn!(
5565 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5566 epoch_store.epoch()
5567 );
5568 return;
5569 }
5570 let transaction = VerifiedTransaction::new_randomness_state_update(
5571 epoch,
5572 round,
5573 bytes,
5574 epoch_store
5575 .epoch_start_config()
5576 .randomness_obj_initial_shared_version(),
5577 );
5578 debug!(
5579 "created randomness state update transaction with digest: {:?}",
5580 transaction.digest()
5581 );
5582 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5583 let digest = *transaction.digest();
5584
5585 self.authority_state
5590 .get_cache_commit()
5591 .persist_transaction(&transaction);
5592
5593 self.authority_state
5595 .transaction_manager()
5596 .enqueue(vec![transaction], &epoch_store);
5597
5598 let authority_state = self.authority_state.clone();
5599 spawn_monitored_task!(async move {
5600 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5609 let result = tokio::time::timeout(
5610 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5611 authority_state
5612 .get_transaction_cache_reader()
5613 .try_notify_read_executed_effects(&[digest]),
5614 )
5615 .await;
5616 let result = match result {
5617 Ok(result) => result,
5618 Err(_) => {
5619 if cfg!(debug_assertions) {
5620 panic!(
5622 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5623 );
5624 }
5625 warn!(
5626 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5627 );
5628 authority_state
5630 .get_transaction_cache_reader()
5631 .try_notify_read_executed_effects(&[digest])
5632 .await
5633 }
5634 };
5635
5636 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5637 let effects = effects.pop().expect("should return effects");
5638 if *effects.status() != ExecutionStatus::Success {
5639 fatal!(
5640 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5641 );
5642 }
5643 debug!(
5644 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5645 );
5646 });
5647 }
5648}
5649
5650#[async_trait]
5651impl TransactionKeyValueStoreTrait for AuthorityState {
5652 async fn multi_get(
5653 &self,
5654 transaction_keys: &[TransactionDigest],
5655 effects_keys: &[TransactionDigest],
5656 ) -> IotaResult<KVStoreTransactionData> {
5657 let txns = if !transaction_keys.is_empty() {
5658 self.get_transaction_cache_reader()
5659 .try_multi_get_transaction_blocks(transaction_keys)?
5660 .into_iter()
5661 .map(|t| t.map(|t| (*t).clone().into_inner()))
5662 .collect()
5663 } else {
5664 vec![]
5665 };
5666
5667 let fx = if !effects_keys.is_empty() {
5668 self.get_transaction_cache_reader()
5669 .try_multi_get_executed_effects(effects_keys)?
5670 } else {
5671 vec![]
5672 };
5673
5674 Ok((txns, fx))
5675 }
5676
5677 async fn multi_get_checkpoints(
5678 &self,
5679 checkpoint_summaries: &[CheckpointSequenceNumber],
5680 checkpoint_contents: &[CheckpointSequenceNumber],
5681 checkpoint_summaries_by_digest: &[CheckpointDigest],
5682 ) -> IotaResult<(
5683 Vec<Option<CertifiedCheckpointSummary>>,
5684 Vec<Option<CheckpointContents>>,
5685 Vec<Option<CertifiedCheckpointSummary>>,
5686 )> {
5687 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5689 let store = self.get_checkpoint_store();
5690 for seq in checkpoint_summaries {
5691 let checkpoint = store
5692 .get_checkpoint_by_sequence_number(*seq)?
5693 .map(|c| c.into_inner());
5694
5695 summaries.push(checkpoint);
5696 }
5697
5698 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5699 for seq in checkpoint_contents {
5700 let checkpoint = store
5701 .get_checkpoint_by_sequence_number(*seq)?
5702 .and_then(|summary| {
5703 store
5704 .get_checkpoint_contents(&summary.content_digest)
5705 .expect("db read cannot fail")
5706 });
5707 contents.push(checkpoint);
5708 }
5709
5710 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5711 for digest in checkpoint_summaries_by_digest {
5712 let checkpoint = store
5713 .get_checkpoint_by_digest(digest)?
5714 .map(|c| c.into_inner());
5715 summaries_by_digest.push(checkpoint);
5716 }
5717
5718 Ok((summaries, contents, summaries_by_digest))
5719 }
5720
5721 async fn get_transaction_perpetual_checkpoint(
5722 &self,
5723 digest: TransactionDigest,
5724 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5725 self.get_checkpoint_cache()
5726 .try_get_transaction_perpetual_checkpoint(&digest)
5727 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5728 }
5729
5730 async fn get_object(
5731 &self,
5732 object_id: ObjectID,
5733 version: VersionNumber,
5734 ) -> IotaResult<Option<Object>> {
5735 self.get_object_cache_reader()
5736 .try_get_object_by_key(&object_id, version)
5737 }
5738
5739 async fn multi_get_transactions_perpetual_checkpoints(
5740 &self,
5741 digests: &[TransactionDigest],
5742 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5743 let res = self
5744 .get_checkpoint_cache()
5745 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5746
5747 Ok(res
5748 .into_iter()
5749 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5750 .collect())
5751 }
5752
5753 #[instrument(skip(self))]
5754 async fn multi_get_events_by_tx_digests(
5755 &self,
5756 digests: &[TransactionDigest],
5757 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5758 if digests.is_empty() {
5759 return Ok(vec![]);
5760 }
5761 let events_digests: Vec<_> = self
5762 .get_transaction_cache_reader()
5763 .try_multi_get_executed_effects(digests)?
5764 .into_iter()
5765 .map(|t| t.and_then(|t| t.events_digest().cloned()))
5766 .collect();
5767 let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5768 let mut events = self
5769 .get_transaction_cache_reader()
5770 .try_multi_get_events(&non_empty_events)?
5771 .into_iter();
5772 Ok(events_digests
5773 .into_iter()
5774 .map(|ev| ev.and_then(|_| events.next()?))
5775 .collect())
5776 }
5777}
5778
5779#[cfg(msim)]
5780pub mod framework_injection {
5781 use std::{
5782 cell::RefCell,
5783 collections::{BTreeMap, BTreeSet},
5784 };
5785
5786 use iota_framework::{BuiltInFramework, SystemPackage};
5787 use iota_types::{
5788 base_types::{AuthorityName, ObjectID},
5789 is_system_package,
5790 };
5791 use move_binary_format::CompiledModule;
5792
5793 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5794
5795 thread_local! {
5797 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5798 }
5799
5800 type Framework = Vec<CompiledModule>;
5801
5802 pub type PackageUpgradeCallback =
5803 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5804
5805 enum PackageOverrideConfig {
5806 Global(Framework),
5807 PerValidator(PackageUpgradeCallback),
5808 }
5809
5810 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5811 modules
5812 .iter()
5813 .map(|m| {
5814 let mut buf = Vec::new();
5815 m.serialize_with_version(m.version, &mut buf).unwrap();
5816 buf
5817 })
5818 .collect()
5819 }
5820
5821 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5822 OVERRIDE.with(|bs| {
5823 bs.borrow_mut()
5824 .insert(package_id, PackageOverrideConfig::Global(modules))
5825 });
5826 }
5827
5828 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5829 OVERRIDE.with(|bs| {
5830 bs.borrow_mut()
5831 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5832 });
5833 }
5834
5835 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5836 OVERRIDE.with(|cfg| {
5837 cfg.borrow().get(package_id).and_then(|entry| match entry {
5838 PackageOverrideConfig::Global(framework) => {
5839 Some(compiled_modules_to_bytes(framework))
5840 }
5841 PackageOverrideConfig::PerValidator(func) => {
5842 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5843 }
5844 })
5845 })
5846 }
5847
5848 pub fn get_override_modules(
5849 package_id: &ObjectID,
5850 name: AuthorityName,
5851 ) -> Option<Vec<CompiledModule>> {
5852 OVERRIDE.with(|cfg| {
5853 cfg.borrow().get(package_id).and_then(|entry| match entry {
5854 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5855 PackageOverrideConfig::PerValidator(func) => func(name),
5856 })
5857 })
5858 }
5859
5860 pub fn get_override_system_package(
5861 package_id: &ObjectID,
5862 name: AuthorityName,
5863 ) -> Option<SystemPackage> {
5864 let bytes = get_override_bytes(package_id, name)?;
5865 let dependencies = if is_system_package(*package_id) {
5866 BuiltInFramework::get_package_by_id(package_id)
5867 .dependencies
5868 .to_vec()
5869 } else {
5870 BuiltInFramework::all_package_ids()
5873 };
5874 Some(SystemPackage {
5875 id: *package_id,
5876 bytes,
5877 dependencies,
5878 })
5879 }
5880
5881 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5882 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5883 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5884 cfg.borrow()
5885 .keys()
5886 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5887 .collect()
5888 });
5889
5890 extra
5891 .into_iter()
5892 .map(|package| SystemPackage {
5893 id: package,
5894 bytes: get_override_bytes(&package, name).unwrap(),
5895 dependencies: BuiltInFramework::all_package_ids(),
5896 })
5897 .collect()
5898 }
5899}
5900
5901#[derive(Debug, Serialize, Deserialize, Clone)]
5902pub struct ObjDumpFormat {
5903 pub id: ObjectID,
5904 pub version: VersionNumber,
5905 pub digest: ObjectDigest,
5906 pub object: Object,
5907}
5908
5909impl ObjDumpFormat {
5910 fn new(object: Object) -> Self {
5911 let oref = object.compute_object_reference();
5912 Self {
5913 id: oref.0,
5914 version: oref.1,
5915 digest: oref.2,
5916 object,
5917 }
5918 }
5919}
5920
5921#[derive(Debug, Serialize, Deserialize, Clone)]
5922pub struct NodeStateDump {
5923 pub tx_digest: TransactionDigest,
5924 pub sender_signed_data: SenderSignedData,
5925 pub executed_epoch: u64,
5926 pub reference_gas_price: u64,
5927 pub protocol_version: u64,
5928 pub epoch_start_timestamp_ms: u64,
5929 pub computed_effects: TransactionEffects,
5930 pub expected_effects_digest: TransactionEffectsDigest,
5931 pub relevant_system_packages: Vec<ObjDumpFormat>,
5932 pub shared_objects: Vec<ObjDumpFormat>,
5933 pub loaded_child_objects: Vec<ObjDumpFormat>,
5934 pub modified_at_versions: Vec<ObjDumpFormat>,
5935 pub runtime_reads: Vec<ObjDumpFormat>,
5936 pub input_objects: Vec<ObjDumpFormat>,
5937}
5938
5939impl NodeStateDump {
5940 pub fn new(
5941 tx_digest: &TransactionDigest,
5942 effects: &TransactionEffects,
5943 expected_effects_digest: TransactionEffectsDigest,
5944 object_store: &dyn ObjectStore,
5945 epoch_store: &Arc<AuthorityPerEpochStore>,
5946 inner_temporary_store: &InnerTemporaryStore,
5947 certificate: &VerifiedExecutableTransaction,
5948 ) -> IotaResult<Self> {
5949 let executed_epoch = epoch_store.epoch();
5951 let reference_gas_price = epoch_store.reference_gas_price();
5952 let epoch_start_config = epoch_store.epoch_start_config();
5953 let protocol_version = epoch_store.protocol_version().as_u64();
5954 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5955
5956 let mut relevant_system_packages = Vec::new();
5958 for sys_package_id in BuiltInFramework::all_package_ids() {
5959 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5960 relevant_system_packages.push(ObjDumpFormat::new(w))
5961 }
5962 }
5963
5964 let mut shared_objects = Vec::new();
5966 for kind in effects.input_shared_objects() {
5967 match kind {
5968 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5969 if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5970 shared_objects.push(ObjDumpFormat::new(w))
5971 }
5972 }
5973 InputSharedObject::ReadDeleted(..)
5974 | InputSharedObject::MutateDeleted(..)
5975 | InputSharedObject::Cancelled(..) => (), }
5978 }
5979
5980 let mut loaded_child_objects = Vec::new();
5983 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5984 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5985 loaded_child_objects.push(ObjDumpFormat::new(w))
5986 }
5987 }
5988
5989 let mut modified_at_versions = Vec::new();
5991 for (id, ver) in effects.modified_at_versions() {
5992 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5993 modified_at_versions.push(ObjDumpFormat::new(w))
5994 }
5995 }
5996
5997 let mut runtime_reads = Vec::new();
6001 for obj in inner_temporary_store
6002 .runtime_packages_loaded_from_db
6003 .values()
6004 {
6005 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6006 }
6007
6008 Ok(Self {
6011 tx_digest: *tx_digest,
6012 executed_epoch,
6013 reference_gas_price,
6014 epoch_start_timestamp_ms,
6015 protocol_version,
6016 relevant_system_packages,
6017 shared_objects,
6018 loaded_child_objects,
6019 modified_at_versions,
6020 runtime_reads,
6021 sender_signed_data: certificate.clone().into_message(),
6022 input_objects: inner_temporary_store
6023 .input_objects
6024 .values()
6025 .map(|o| ObjDumpFormat::new(o.clone()))
6026 .collect(),
6027 computed_effects: effects.clone(),
6028 expected_effects_digest,
6029 })
6030 }
6031
6032 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6033 let mut objects = Vec::new();
6034 objects.extend(self.relevant_system_packages.clone());
6035 objects.extend(self.shared_objects.clone());
6036 objects.extend(self.loaded_child_objects.clone());
6037 objects.extend(self.modified_at_versions.clone());
6038 objects.extend(self.runtime_reads.clone());
6039 objects.extend(self.input_objects.clone());
6040 objects
6041 }
6042
6043 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6044 let file_name = format!(
6045 "{}_{}_NODE_DUMP.json",
6046 self.tx_digest,
6047 AuthorityState::unixtime_now_ms()
6048 );
6049 let mut path = path.to_path_buf();
6050 path.push(&file_name);
6051 let mut file = File::create(path.clone())?;
6052 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6053 Ok(path)
6054 }
6055
6056 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6057 let file = File::open(path)?;
6058 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6059 }
6060}