1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49 key_value_store::{
50 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51 },
52 key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57 IOTA_SYSTEM_ADDRESS, TypeTag,
58 account_abstraction::{
59 account::AuthenticatorFunctionRefV1Key,
60 authenticator_function::{
61 AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62 AuthenticatorFunctionRefV1,
63 },
64 },
65 authenticator_state::get_authenticator_state,
66 base_types::*,
67 committee::{Committee, EpochId, ProtocolVersion},
68 crypto::{
69 AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70 default_hash,
71 },
72 deny_list_v1::check_coin_deny_list_v1_during_signing,
73 digests::{ChainIdentifier, Digest, TransactionEventsDigest},
74 dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75 effects::{
76 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77 TransactionEvents, VerifiedSignedTransactionEffects,
78 },
79 error::{ExecutionError, IotaError, IotaResult, UserInputError},
80 event::{Event, EventID, SystemEpochInfoEvent},
81 executable_transaction::VerifiedExecutableTransaction,
82 execution_config_utils::to_binary_config,
83 execution_status::ExecutionStatus,
84 fp_ensure,
85 gas::{GasCostSummary, IotaGasStatus},
86 gas_coin::NANOS_PER_IOTA,
87 inner_temporary_store::{
88 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89 WrittenObjects,
90 },
91 iota_system_state::{
92 IotaSystemState, IotaSystemStateTrait,
93 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94 },
95 is_system_package,
96 layout_resolver::{LayoutResolver, into_struct_layout},
97 message_envelope::Message,
98 messages_checkpoint::{
99 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103 },
104 messages_consensus::AuthorityCapabilitiesV1,
105 messages_grpc::{
106 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108 TransactionStatus,
109 },
110 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111 move_authenticator::MoveAuthenticator,
112 object::{
113 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114 bounded_visitor::BoundedVisitor,
115 },
116 storage::{
117 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118 },
119 supported_protocol_versions::{
120 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121 },
122 transaction::*,
123 transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131 register_histogram_vec_with_registry, register_histogram_with_registry,
132 register_int_counter_vec_with_registry, register_int_counter_with_registry,
133 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139 task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152 authority::{
153 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157 authority_store_tables::AuthorityPrunerTables,
158 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159 },
160 authority_client::NetworkAuthorityClient,
161 checkpoints::CheckpointStore,
162 congestion_tracker::CongestionTracker,
163 consensus_adapter::ConsensusAdapter,
164 epoch::committee_store::CommitteeStore,
165 execution_cache::{
166 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168 TransactionCacheRead,
169 },
170 execution_driver::execution_process,
171 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
172 metrics::{LatencyObserver, RateTracker},
173 module_cache_metrics::ResolverMetrics,
174 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
175 rest_index::RestIndexStore,
176 stake_aggregator::StakeAggregator,
177 state_accumulator::{AccumulatorStore, StateAccumulator},
178 subscription_handler::SubscriptionHandler,
179 transaction_input_loader::TransactionInputLoader,
180 transaction_manager::TransactionManager,
181 transaction_outputs::TransactionOutputs,
182 validator_tx_finalizer::ValidatorTxFinalizer,
183 verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223pub mod authority_store_pruner;
224pub mod authority_store_tables;
225pub mod authority_store_types;
226pub mod epoch_start_configuration;
227pub mod shared_object_congestion_tracker;
228pub mod shared_object_version_manager;
229pub mod suggested_gas_price_calculator;
230pub mod test_authority_builder;
231pub mod transaction_deferral;
232
233pub(crate) mod authority_store;
234pub mod backpressure;
235
236pub struct AuthorityMetrics {
238 tx_orders: IntCounter,
239 total_certs: IntCounter,
240 total_cert_attempts: IntCounter,
241 total_effects: IntCounter,
242 pub shared_obj_tx: IntCounter,
243 sponsored_tx: IntCounter,
244 tx_already_processed: IntCounter,
245 num_input_objs: Histogram,
246 num_shared_objects: Histogram,
247 batch_size: Histogram,
248
249 authority_state_handle_transaction_latency: Histogram,
250
251 execute_certificate_latency_single_writer: Histogram,
252 execute_certificate_latency_shared_object: Histogram,
253
254 internal_execution_latency: Histogram,
255 execution_load_input_objects_latency: Histogram,
256 prepare_certificate_latency: Histogram,
257 commit_certificate_latency: Histogram,
258 db_checkpoint_latency: Histogram,
259
260 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
261 pub(crate) transaction_manager_num_missing_objects: IntGauge,
262 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
263 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
264 pub(crate) transaction_manager_num_ready: IntGauge,
265 pub(crate) transaction_manager_object_cache_size: IntGauge,
266 pub(crate) transaction_manager_object_cache_hits: IntCounter,
267 pub(crate) transaction_manager_object_cache_misses: IntCounter,
268 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
269 pub(crate) transaction_manager_package_cache_size: IntGauge,
270 pub(crate) transaction_manager_package_cache_hits: IntCounter,
271 pub(crate) transaction_manager_package_cache_misses: IntCounter,
272 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
273 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
274
275 pub(crate) execution_driver_executed_transactions: IntCounter,
276 pub(crate) execution_driver_dispatch_queue: IntGauge,
277 pub(crate) execution_queueing_delay_s: Histogram,
278 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
279 pub(crate) execution_gas_latency_ratio: Histogram,
280
281 pub(crate) skipped_consensus_txns: IntCounter,
282 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
283
284 pub(crate) authority_overload_status: IntGauge,
285 pub(crate) authority_load_shedding_percentage: IntGauge,
286
287 pub(crate) transaction_overload_sources: IntCounterVec,
288
289 post_processing_total_events_emitted: IntCounter,
291 post_processing_total_tx_indexed: IntCounter,
292 post_processing_total_tx_had_event_processed: IntCounter,
293 post_processing_total_failures: IntCounter,
294
295 pub consensus_handler_processed: IntCounterVec,
297 pub consensus_handler_transaction_sizes: HistogramVec,
298 pub consensus_handler_num_low_scoring_authorities: IntGauge,
299 pub consensus_handler_scores: IntGaugeVec,
300 pub consensus_handler_deferred_transactions: IntCounter,
301 pub consensus_handler_congested_transactions: IntCounter,
302 pub consensus_handler_cancelled_transactions: IntCounter,
303 pub consensus_handler_max_object_costs: IntGaugeVec,
304 pub consensus_committed_subdags: IntCounterVec,
305 pub consensus_committed_messages: IntGaugeVec,
306 pub consensus_committed_user_transactions: IntGaugeVec,
307 pub consensus_handler_leader_round: IntGauge,
308 pub consensus_calculated_throughput: IntGauge,
309 pub consensus_calculated_throughput_profile: IntGauge,
310
311 pub limits_metrics: Arc<LimitsMetrics>,
312
313 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
315
316 pub zklogin_sig_count: IntCounter,
318 pub multisig_sig_count: IntCounter,
320
321 pub execution_queueing_latency: LatencyObserver,
324
325 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
332
333 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
336}
337
338const POSITIVE_INT_BUCKETS: &[f64] = &[
340 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
341 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
342 7000000., 10000000.,
343];
344
345const LATENCY_SEC_BUCKETS: &[f64] = &[
346 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
347 10., 20., 30., 60., 90.,
348];
349
350const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
352 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
353 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
354];
355
356const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
357 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
358 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
359];
360
361pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
365 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
366 let execute_certificate_latency = register_histogram_vec_with_registry!(
367 "authority_state_execute_certificate_latency",
368 "Latency of executing certificates, including waiting for inputs",
369 &["tx_type"],
370 LATENCY_SEC_BUCKETS.to_vec(),
371 registry,
372 )
373 .unwrap();
374
375 let execute_certificate_latency_single_writer =
376 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
377 let execute_certificate_latency_shared_object =
378 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
379
380 Self {
381 tx_orders: register_int_counter_with_registry!(
382 "total_transaction_orders",
383 "Total number of transaction orders",
384 registry,
385 )
386 .unwrap(),
387 total_certs: register_int_counter_with_registry!(
388 "total_transaction_certificates",
389 "Total number of transaction certificates handled",
390 registry,
391 )
392 .unwrap(),
393 total_cert_attempts: register_int_counter_with_registry!(
394 "total_handle_certificate_attempts",
395 "Number of calls to handle_certificate",
396 registry,
397 )
398 .unwrap(),
399 total_effects: register_int_counter_with_registry!(
401 "total_transaction_effects",
402 "Total number of transaction effects produced",
403 registry,
404 )
405 .unwrap(),
406
407 shared_obj_tx: register_int_counter_with_registry!(
408 "num_shared_obj_tx",
409 "Number of transactions involving shared objects",
410 registry,
411 )
412 .unwrap(),
413
414 sponsored_tx: register_int_counter_with_registry!(
415 "num_sponsored_tx",
416 "Number of sponsored transactions",
417 registry,
418 )
419 .unwrap(),
420
421 tx_already_processed: register_int_counter_with_registry!(
422 "num_tx_already_processed",
423 "Number of transaction orders already processed previously",
424 registry,
425 )
426 .unwrap(),
427 num_input_objs: register_histogram_with_registry!(
428 "num_input_objects",
429 "Distribution of number of input TX objects per TX",
430 POSITIVE_INT_BUCKETS.to_vec(),
431 registry,
432 )
433 .unwrap(),
434 num_shared_objects: register_histogram_with_registry!(
435 "num_shared_objects",
436 "Number of shared input objects per TX",
437 POSITIVE_INT_BUCKETS.to_vec(),
438 registry,
439 )
440 .unwrap(),
441 batch_size: register_histogram_with_registry!(
442 "batch_size",
443 "Distribution of size of transaction batch",
444 POSITIVE_INT_BUCKETS.to_vec(),
445 registry,
446 )
447 .unwrap(),
448 authority_state_handle_transaction_latency: register_histogram_with_registry!(
449 "authority_state_handle_transaction_latency",
450 "Latency of handling transactions",
451 LATENCY_SEC_BUCKETS.to_vec(),
452 registry,
453 )
454 .unwrap(),
455 execute_certificate_latency_single_writer,
456 execute_certificate_latency_shared_object,
457 internal_execution_latency: register_histogram_with_registry!(
458 "authority_state_internal_execution_latency",
459 "Latency of actual certificate executions",
460 LATENCY_SEC_BUCKETS.to_vec(),
461 registry,
462 )
463 .unwrap(),
464 execution_load_input_objects_latency: register_histogram_with_registry!(
465 "authority_state_execution_load_input_objects_latency",
466 "Latency of loading input objects for execution",
467 LOW_LATENCY_SEC_BUCKETS.to_vec(),
468 registry,
469 )
470 .unwrap(),
471 prepare_certificate_latency: register_histogram_with_registry!(
472 "authority_state_prepare_certificate_latency",
473 "Latency of executing certificates, before committing the results",
474 LATENCY_SEC_BUCKETS.to_vec(),
475 registry,
476 )
477 .unwrap(),
478 commit_certificate_latency: register_histogram_with_registry!(
479 "authority_state_commit_certificate_latency",
480 "Latency of committing certificate execution results",
481 LATENCY_SEC_BUCKETS.to_vec(),
482 registry,
483 )
484 .unwrap(),
485 db_checkpoint_latency: register_histogram_with_registry!(
486 "db_checkpoint_latency",
487 "Latency of checkpointing dbs",
488 LATENCY_SEC_BUCKETS.to_vec(),
489 registry,
490 ).unwrap(),
491 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
492 "transaction_manager_num_enqueued_certificates",
493 "Current number of certificates enqueued to TransactionManager",
494 &["result"],
495 registry,
496 )
497 .unwrap(),
498 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
499 "transaction_manager_num_missing_objects",
500 "Current number of missing objects in TransactionManager",
501 registry,
502 )
503 .unwrap(),
504 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
505 "transaction_manager_num_pending_certificates",
506 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
507 registry,
508 )
509 .unwrap(),
510 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
511 "transaction_manager_num_executing_certificates",
512 "Number of executing certificates, including queued and actually running certificates",
513 registry,
514 )
515 .unwrap(),
516 transaction_manager_num_ready: register_int_gauge_with_registry!(
517 "transaction_manager_num_ready",
518 "Number of ready transactions in TransactionManager",
519 registry,
520 )
521 .unwrap(),
522 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
523 "transaction_manager_object_cache_size",
524 "Current size of object-availability cache in TransactionManager",
525 registry,
526 )
527 .unwrap(),
528 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
529 "transaction_manager_object_cache_hits",
530 "Number of object-availability cache hits in TransactionManager",
531 registry,
532 )
533 .unwrap(),
534 authority_overload_status: register_int_gauge_with_registry!(
535 "authority_overload_status",
536 "Whether authority is current experiencing overload and enters load shedding mode.",
537 registry)
538 .unwrap(),
539 authority_load_shedding_percentage: register_int_gauge_with_registry!(
540 "authority_load_shedding_percentage",
541 "The percentage of transactions is shed when the authority is in load shedding mode.",
542 registry)
543 .unwrap(),
544 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
545 "transaction_manager_object_cache_misses",
546 "Number of object-availability cache misses in TransactionManager",
547 registry,
548 )
549 .unwrap(),
550 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
551 "transaction_manager_object_cache_evictions",
552 "Number of object-availability cache evictions in TransactionManager",
553 registry,
554 )
555 .unwrap(),
556 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
557 "transaction_manager_package_cache_size",
558 "Current size of package-availability cache in TransactionManager",
559 registry,
560 )
561 .unwrap(),
562 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
563 "transaction_manager_package_cache_hits",
564 "Number of package-availability cache hits in TransactionManager",
565 registry,
566 )
567 .unwrap(),
568 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
569 "transaction_manager_package_cache_misses",
570 "Number of package-availability cache misses in TransactionManager",
571 registry,
572 )
573 .unwrap(),
574 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
575 "transaction_manager_package_cache_evictions",
576 "Number of package-availability cache evictions in TransactionManager",
577 registry,
578 )
579 .unwrap(),
580 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
581 "transaction_manager_transaction_queue_age_s",
582 "Time spent in waiting for transaction in the queue",
583 LATENCY_SEC_BUCKETS.to_vec(),
584 registry,
585 )
586 .unwrap(),
587 transaction_overload_sources: register_int_counter_vec_with_registry!(
588 "transaction_overload_sources",
589 "Number of times each source indicates transaction overload.",
590 &["source"],
591 registry)
592 .unwrap(),
593 execution_driver_executed_transactions: register_int_counter_with_registry!(
594 "execution_driver_executed_transactions",
595 "Cumulative number of transaction executed by execution driver",
596 registry,
597 )
598 .unwrap(),
599 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
600 "execution_driver_dispatch_queue",
601 "Number of transaction pending in execution driver dispatch queue",
602 registry,
603 )
604 .unwrap(),
605 execution_queueing_delay_s: register_histogram_with_registry!(
606 "execution_queueing_delay_s",
607 "Queueing delay between a transaction is ready for execution until it starts executing.",
608 LATENCY_SEC_BUCKETS.to_vec(),
609 registry
610 )
611 .unwrap(),
612 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
613 "prepare_cert_gas_latency_ratio",
614 "The ratio of computation gas divided by VM execution latency.",
615 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
616 registry
617 )
618 .unwrap(),
619 execution_gas_latency_ratio: register_histogram_with_registry!(
620 "execution_gas_latency_ratio",
621 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
622 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
623 registry
624 )
625 .unwrap(),
626 skipped_consensus_txns: register_int_counter_with_registry!(
627 "skipped_consensus_txns",
628 "Total number of consensus transactions skipped",
629 registry,
630 )
631 .unwrap(),
632 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
633 "skipped_consensus_txns_cache_hit",
634 "Total number of consensus transactions skipped because of local cache hit",
635 registry,
636 )
637 .unwrap(),
638 post_processing_total_events_emitted: register_int_counter_with_registry!(
639 "post_processing_total_events_emitted",
640 "Total number of events emitted in post processing",
641 registry,
642 )
643 .unwrap(),
644 post_processing_total_tx_indexed: register_int_counter_with_registry!(
645 "post_processing_total_tx_indexed",
646 "Total number of txes indexed in post processing",
647 registry,
648 )
649 .unwrap(),
650 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
651 "post_processing_total_tx_had_event_processed",
652 "Total number of txes finished event processing in post processing",
653 registry,
654 )
655 .unwrap(),
656 post_processing_total_failures: register_int_counter_with_registry!(
657 "post_processing_total_failures",
658 "Total number of failure in post processing",
659 registry,
660 )
661 .unwrap(),
662 consensus_handler_processed: register_int_counter_vec_with_registry!(
663 "consensus_handler_processed",
664 "Number of transactions processed by consensus handler",
665 &["class"],
666 registry
667 ).unwrap(),
668 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
669 "consensus_handler_transaction_sizes",
670 "Sizes of each type of transactions processed by consensus handler",
671 &["class"],
672 POSITIVE_INT_BUCKETS.to_vec(),
673 registry
674 ).unwrap(),
675 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
676 "consensus_handler_num_low_scoring_authorities",
677 "Number of low scoring authorities based on reputation scores from consensus",
678 registry
679 ).unwrap(),
680 consensus_handler_scores: register_int_gauge_vec_with_registry!(
681 "consensus_handler_scores",
682 "scores from consensus for each authority",
683 &["authority"],
684 registry,
685 ).unwrap(),
686 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
687 "consensus_handler_deferred_transactions",
688 "Number of transactions deferred by consensus handler",
689 registry,
690 ).unwrap(),
691 consensus_handler_congested_transactions: register_int_counter_with_registry!(
692 "consensus_handler_congested_transactions",
693 "Number of transactions deferred by consensus handler due to congestion",
694 registry,
695 ).unwrap(),
696 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
697 "consensus_handler_cancelled_transactions",
698 "Number of transactions cancelled by consensus handler",
699 registry,
700 ).unwrap(),
701 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
702 "consensus_handler_max_congestion_control_object_costs",
703 "Max object costs for congestion control in the current consensus commit",
704 &["commit_type"],
705 registry,
706 ).unwrap(),
707 consensus_committed_subdags: register_int_counter_vec_with_registry!(
708 "consensus_committed_subdags",
709 "Number of committed subdags, sliced by leader",
710 &["authority"],
711 registry,
712 ).unwrap(),
713 consensus_committed_messages: register_int_gauge_vec_with_registry!(
714 "consensus_committed_messages",
715 "Total number of committed consensus messages, sliced by author",
716 &["authority"],
717 registry,
718 ).unwrap(),
719 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
720 "consensus_committed_user_transactions",
721 "Number of committed user transactions, sliced by submitter",
722 &["authority"],
723 registry,
724 ).unwrap(),
725 consensus_handler_leader_round: register_int_gauge_with_registry!(
726 "consensus_handler_leader_round",
727 "The leader round of the current consensus output being processed in the consensus handler",
728 registry,
729 ).unwrap(),
730 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
731 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
732 zklogin_sig_count: register_int_counter_with_registry!(
733 "zklogin_sig_count",
734 "Count of zkLogin signatures",
735 registry,
736 )
737 .unwrap(),
738 multisig_sig_count: register_int_counter_with_registry!(
739 "multisig_sig_count",
740 "Count of zkLogin signatures",
741 registry,
742 )
743 .unwrap(),
744 consensus_calculated_throughput: register_int_gauge_with_registry!(
745 "consensus_calculated_throughput",
746 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
747 registry,
748 ).unwrap(),
749 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
750 "consensus_calculated_throughput_profile",
751 "The current active calculated throughput profile",
752 registry
753 ).unwrap(),
754 execution_queueing_latency: LatencyObserver::new(),
755 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
756 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757 }
758 }
759
760 pub fn reset_on_reconfigure(&self) {
764 self.consensus_committed_messages.reset();
765 self.consensus_handler_scores.reset();
766 self.consensus_committed_user_transactions.reset();
767 }
768}
769
770pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
777
778pub struct AuthorityState {
779 pub name: AuthorityName,
782 pub secret: StableSyncAuthoritySigner,
784
785 input_loader: TransactionInputLoader,
787 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
788
789 epoch_store: ArcSwap<AuthorityPerEpochStore>,
790
791 execution_lock: RwLock<EpochId>,
797
798 pub indexes: Option<Arc<IndexStore>>,
799 pub rest_index: Option<Arc<RestIndexStore>>,
800
801 pub subscription_handler: Arc<SubscriptionHandler>,
802 pub checkpoint_store: Arc<CheckpointStore>,
803
804 committee_store: Arc<CommitteeStore>,
805
806 transaction_manager: Arc<TransactionManager>,
808
809 #[cfg_attr(not(test), expect(unused))]
811 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
812
813 pub metrics: Arc<AuthorityMetrics>,
814 _pruner: AuthorityStorePruner,
815 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
816
817 db_checkpoint_config: DBCheckpointConfig,
819
820 pub config: NodeConfig,
821
822 pub overload_info: AuthorityOverloadInfo,
824
825 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
826
827 chain_identifier: ChainIdentifier,
830
831 pub(crate) congestion_tracker: Arc<CongestionTracker>,
832}
833
834impl AuthorityState {
843 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
844 epoch_store.committee().authority_exists(&self.name)
845 }
846
847 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
848 epoch_store
849 .active_validators()
850 .iter()
851 .any(|a| AuthorityName::from(a) == self.name)
852 }
853
854 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
855 !self.is_committee_validator(epoch_store)
856 }
857
858 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
859 &self.committee_store
860 }
861
862 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
863 self.committee_store.clone()
864 }
865
866 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
867 &self.config.authority_overload_config
868 }
869
870 pub fn get_epoch_state_commitments(
871 &self,
872 epoch: EpochId,
873 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
874 self.checkpoint_store.get_epoch_state_commitments(epoch)
875 }
876
877 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
881 async fn handle_transaction_impl(
882 &self,
883 transaction: VerifiedTransaction,
884 epoch_store: &Arc<AuthorityPerEpochStore>,
885 ) -> IotaResult<VerifiedSignedTransaction> {
886 let _execution_lock = self.execution_lock_for_signing();
888
889 let protocol_config = epoch_store.protocol_config();
890 let reference_gas_price = epoch_store.reference_gas_price();
891
892 let epoch = epoch_store.epoch();
893
894 let tx_data = transaction.data().transaction_data();
895
896 iota_transaction_checks::deny::check_transaction_for_signing(
900 tx_data,
901 transaction.tx_signatures(),
902 &transaction.input_objects()?,
903 &tx_data.receiving_objects(),
904 &self.config.transaction_deny_config,
905 self.get_backing_package_store().as_ref(),
906 )?;
907
908 let (tx_input_objects, tx_receiving_objects, auth_input_objects, account_object) =
913 self.read_objects_for_signing(&transaction, epoch)?;
914
915 let move_authenticator = transaction.sender_move_authenticator();
919
920 let (
926 gas_status,
927 tx_checked_input_objects,
928 auth_checked_input_objects,
929 authenticator_function_ref,
930 ) = self.check_transaction_inputs_for_signing(
931 protocol_config,
932 reference_gas_price,
933 tx_data,
934 tx_input_objects,
935 &tx_receiving_objects,
936 move_authenticator,
937 auth_input_objects,
938 account_object,
939 )?;
940
941 check_coin_deny_list_v1_during_signing(
942 tx_data.sender(),
943 &tx_checked_input_objects,
944 &tx_receiving_objects,
945 &auth_checked_input_objects,
946 &self.get_object_store(),
947 )?;
948
949 if let Some(move_authenticator) = move_authenticator {
950 let auth_checked_input_objects = auth_checked_input_objects
954 .expect("MoveAuthenticator input objects must be provided");
955 let authenticator_function_ref = authenticator_function_ref
956 .expect("AuthenticatorFunctionRefV1 object must be provided");
957
958 let (kind, signer, _) = tx_data.execution_parts();
959
960 let validation_result = epoch_store.executor().authenticate_transaction(
962 self.get_backing_store().as_ref(),
963 protocol_config,
964 self.metrics.limits_metrics.clone(),
965 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
966 epoch_store
967 .epoch_start_config()
968 .epoch_data()
969 .epoch_start_timestamp(),
970 gas_status,
971 move_authenticator.to_owned(),
972 authenticator_function_ref,
973 auth_checked_input_objects,
974 kind,
975 signer,
976 transaction.digest().to_owned(),
977 &mut None,
978 );
979
980 if let Err(validation_error) = validation_result {
981 return Err(IotaError::MoveAuthenticatorExecutionFailure {
982 error: validation_error.to_string(),
983 });
984 }
985 }
986
987 let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
988
989 let signed_transaction =
990 VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
991
992 self.get_cache_writer().try_acquire_transaction_locks(
997 epoch_store,
998 &owned_objects,
999 signed_transaction.clone(),
1000 )?;
1001
1002 Ok(signed_transaction)
1003 }
1004
1005 #[instrument(name = "handle_transaction", level = "trace", skip_all)]
1007 pub async fn handle_transaction(
1008 &self,
1009 epoch_store: &Arc<AuthorityPerEpochStore>,
1010 transaction: VerifiedTransaction,
1011 ) -> IotaResult<HandleTransactionResponse> {
1012 let tx_digest = *transaction.digest();
1013 debug!("handle_transaction");
1014
1015 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1017 return Ok(HandleTransactionResponse { status });
1018 }
1019
1020 let _metrics_guard = self
1021 .metrics
1022 .authority_state_handle_transaction_latency
1023 .start_timer();
1024 self.metrics.tx_orders.inc();
1025
1026 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1027 match signed {
1028 Ok(s) => {
1029 if self.is_committee_validator(epoch_store) {
1030 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1031 let tx = s.clone();
1032 let validator_tx_finalizer = validator_tx_finalizer.clone();
1033 let cache_reader = self.get_transaction_cache_reader().clone();
1034 let epoch_store = epoch_store.clone();
1035 spawn_monitored_task!(epoch_store.within_alive_epoch(
1036 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1037 ));
1038 }
1039 }
1040 Ok(HandleTransactionResponse {
1041 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1042 })
1043 }
1044 Err(err) => Ok(HandleTransactionResponse {
1048 status: self
1049 .get_transaction_status(&tx_digest, epoch_store)?
1050 .ok_or(err)?
1051 .1,
1052 }),
1053 }
1054 }
1055
1056 pub fn check_system_overload_at_signing(&self) -> bool {
1057 self.config
1058 .authority_overload_config
1059 .check_system_overload_at_signing
1060 }
1061
1062 pub fn check_system_overload_at_execution(&self) -> bool {
1063 self.config
1064 .authority_overload_config
1065 .check_system_overload_at_execution
1066 }
1067
1068 pub(crate) fn check_system_overload(
1069 &self,
1070 consensus_adapter: &Arc<ConsensusAdapter>,
1071 tx_data: &SenderSignedData,
1072 do_authority_overload_check: bool,
1073 ) -> IotaResult {
1074 if do_authority_overload_check {
1075 self.check_authority_overload(tx_data).tap_err(|_| {
1076 self.update_overload_metrics("execution_queue");
1077 })?;
1078 }
1079 self.transaction_manager
1080 .check_execution_overload(self.overload_config(), tx_data)
1081 .tap_err(|_| {
1082 self.update_overload_metrics("execution_pending");
1083 })?;
1084 consensus_adapter.check_consensus_overload().tap_err(|_| {
1085 self.update_overload_metrics("consensus");
1086 })?;
1087
1088 let pending_tx_count = self
1089 .get_cache_commit()
1090 .approximate_pending_transaction_count();
1091 if pending_tx_count
1092 > self
1093 .config
1094 .execution_cache_config
1095 .writeback_cache
1096 .backpressure_threshold_for_rpc()
1097 {
1098 return Err(IotaError::ValidatorOverloadedRetryAfter {
1099 retry_after_secs: 10,
1100 });
1101 }
1102
1103 Ok(())
1104 }
1105
1106 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1107 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1108 return Ok(());
1109 }
1110
1111 let load_shedding_percentage = self
1112 .overload_info
1113 .load_shedding_percentage
1114 .load(Ordering::Relaxed);
1115 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1116 }
1117
1118 fn update_overload_metrics(&self, source: &str) {
1119 self.metrics
1120 .transaction_overload_sources
1121 .with_label_values(&[source])
1122 .inc();
1123 }
1124
1125 #[instrument(level = "trace", skip_all)]
1127 pub async fn execute_certificate(
1128 &self,
1129 certificate: &VerifiedCertificate,
1130 epoch_store: &Arc<AuthorityPerEpochStore>,
1131 ) -> IotaResult<TransactionEffects> {
1132 let _metrics_guard = if certificate.contains_shared_object() {
1133 self.metrics
1134 .execute_certificate_latency_shared_object
1135 .start_timer()
1136 } else {
1137 self.metrics
1138 .execute_certificate_latency_single_writer
1139 .start_timer()
1140 };
1141 trace!("execute_certificate");
1142
1143 self.metrics.total_cert_attempts.inc();
1144
1145 if !certificate.contains_shared_object() {
1146 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1151 }
1152
1153 epoch_store
1156 .within_alive_epoch(self.notify_read_effects(certificate))
1157 .await
1158 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1159 .and_then(|r| r)
1160 }
1161
1162 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1177 pub fn try_execute_immediately(
1178 &self,
1179 certificate: &VerifiedExecutableTransaction,
1180 expected_effects_digest: Option<TransactionEffectsDigest>,
1181 epoch_store: &Arc<AuthorityPerEpochStore>,
1182 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1183 let _scope = monitored_scope("Execution::try_execute_immediately");
1184 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1185
1186 let tx_digest = certificate.digest();
1187
1188 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1190
1191 if let Some(effects) = self
1194 .get_transaction_cache_reader()
1195 .try_get_executed_effects(tx_digest)?
1196 {
1197 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1198 assert_eq!(
1199 effects.digest(),
1200 expected_effects_digest_inner,
1201 "Unexpected effects digest for transaction {tx_digest:?}"
1202 );
1203 }
1204 tx_guard.release();
1205 return Ok((effects, None));
1206 }
1207
1208 let (tx_input_objects, authenticator_input_objects, account_object) =
1209 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1210
1211 let expected_effects_digest =
1217 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1218
1219 self.process_certificate(
1220 tx_guard,
1221 certificate,
1222 tx_input_objects,
1223 account_object,
1224 authenticator_input_objects,
1225 expected_effects_digest,
1226 epoch_store,
1227 )
1228 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1229 .tap_ok(
1230 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1231 )
1232 }
1233
1234 pub fn read_objects_for_execution(
1235 &self,
1236 tx_lock: &CertLockGuard,
1237 certificate: &VerifiedExecutableTransaction,
1238 epoch_store: &Arc<AuthorityPerEpochStore>,
1239 ) -> IotaResult<(InputObjects, Option<InputObjects>, Option<ObjectReadResult>)> {
1240 let _scope = monitored_scope("Execution::load_input_objects");
1241 let _metrics_guard = self
1242 .metrics
1243 .execution_load_input_objects_latency
1244 .start_timer();
1245
1246 let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1247
1248 let input_objects = self.input_loader.read_objects_for_execution(
1249 epoch_store,
1250 &certificate.key(),
1251 tx_lock,
1252 &input_objects,
1253 epoch_store.epoch(),
1254 )?;
1255
1256 certificate.split_input_objects_into_groups_for_reading(input_objects)
1257 }
1258
1259 pub fn try_execute_for_test(
1263 &self,
1264 certificate: &VerifiedCertificate,
1265 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1266 let epoch_store = self.epoch_store_for_testing();
1267 let (effects, execution_error_opt) = self.try_execute_immediately(
1268 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1269 None,
1270 &epoch_store,
1271 )?;
1272 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1273 Ok((signed_effects, execution_error_opt))
1274 }
1275
1276 pub fn execute_for_test(
1278 &self,
1279 certificate: &VerifiedCertificate,
1280 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1281 self.try_execute_for_test(certificate)
1282 .expect("try_execute_for_test should not fail")
1283 }
1284
1285 pub async fn notify_read_effects(
1286 &self,
1287 certificate: &VerifiedCertificate,
1288 ) -> IotaResult<TransactionEffects> {
1289 self.get_transaction_cache_reader()
1290 .try_notify_read_executed_effects(&[*certificate.digest()])
1291 .await
1292 .map(|mut r| r.pop().expect("must return correct number of effects"))
1293 }
1294
1295 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1296 self.get_object_cache_reader()
1297 .try_check_owned_objects_are_live(owned_object_refs)
1298 }
1299
1300 pub(crate) fn debug_dump_transaction_state(
1305 &self,
1306 tx_digest: &TransactionDigest,
1307 effects: &TransactionEffects,
1308 expected_effects_digest: TransactionEffectsDigest,
1309 inner_temporary_store: &InnerTemporaryStore,
1310 certificate: &VerifiedExecutableTransaction,
1311 debug_dump_config: &StateDebugDumpConfig,
1312 ) -> IotaResult<PathBuf> {
1313 let dump_dir = debug_dump_config
1314 .dump_file_directory
1315 .as_ref()
1316 .cloned()
1317 .unwrap_or(std::env::temp_dir());
1318 let epoch_store = self.load_epoch_store_one_call_per_task();
1319
1320 NodeStateDump::new(
1321 tx_digest,
1322 effects,
1323 expected_effects_digest,
1324 self.get_object_store().as_ref(),
1325 &epoch_store,
1326 inner_temporary_store,
1327 certificate,
1328 )?
1329 .write_to_file(&dump_dir)
1330 .map_err(|e| IotaError::FileIO(e.to_string()))
1331 }
1332
1333 #[instrument(level = "trace", skip_all)]
1334 pub(crate) fn process_certificate(
1335 &self,
1336 tx_guard: CertTxGuard,
1337 certificate: &VerifiedExecutableTransaction,
1338 tx_input_objects: InputObjects,
1339 account_object: Option<ObjectReadResult>,
1340 authenticator_input_objects: Option<InputObjects>,
1341 expected_effects_digest: Option<TransactionEffectsDigest>,
1342 epoch_store: &Arc<AuthorityPerEpochStore>,
1343 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1344 let process_certificate_start_time = tokio::time::Instant::now();
1345 let digest = *certificate.digest();
1346
1347 fail_point_if!("correlated-crash-process-certificate", || {
1348 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1349 iota_simulator::task::kill_current_node(None);
1350 }
1351 });
1352
1353 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1354 let execution_guard = match execution_guard {
1360 Ok(execution_guard) => execution_guard,
1361 Err(err) => {
1362 tx_guard.release();
1363 return Err(err);
1364 }
1365 };
1366 if *execution_guard != epoch_store.epoch() {
1370 tx_guard.release();
1371 info!("The epoch of the execution_guard doesn't match the epoch store");
1372 return Err(IotaError::WrongEpoch {
1373 expected_epoch: epoch_store.epoch(),
1374 actual_epoch: *execution_guard,
1375 });
1376 }
1377
1378 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1384 &execution_guard,
1385 certificate,
1386 tx_input_objects,
1387 account_object,
1388 authenticator_input_objects,
1389 epoch_store,
1390 ) {
1391 Err(e) => {
1392 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1393 tx_guard.release();
1394 return Err(e);
1395 }
1396 Ok(res) => res,
1397 };
1398
1399 if let Some(expected_effects_digest) = expected_effects_digest {
1400 if effects.digest() != expected_effects_digest {
1401 match self.debug_dump_transaction_state(
1403 &digest,
1404 &effects,
1405 expected_effects_digest,
1406 &inner_temporary_store,
1407 certificate,
1408 &self.config.state_debug_dump_config,
1409 ) {
1410 Ok(out_path) => {
1411 info!(
1412 "Dumped node state for transaction {} to {}",
1413 digest,
1414 out_path.as_path().display().to_string()
1415 );
1416 }
1417 Err(e) => {
1418 error!("Error dumping state for transaction {}: {e}", digest);
1419 }
1420 }
1421 error!(
1422 tx_digest = ?digest,
1423 ?expected_effects_digest,
1424 actual_effects = ?effects,
1425 "fork detected!"
1426 );
1427 panic!(
1428 "Transaction {} is expected to have effects digest {}, but got {}!",
1429 digest,
1430 expected_effects_digest,
1431 effects.digest(),
1432 );
1433 }
1434 }
1435
1436 fail_point!("crash");
1437
1438 self.commit_certificate(
1439 certificate,
1440 inner_temporary_store,
1441 &effects,
1442 tx_guard,
1443 execution_guard,
1444 epoch_store,
1445 )?;
1446
1447 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1448 certificate.data().transaction_data().kind()
1449 {
1450 if let Some(err) = &execution_error_opt {
1451 debug_fatal!("Authenticator state update failed: {:?}", err);
1452 }
1453 epoch_store.update_authenticator_state(auth_state);
1454
1455 if cfg!(debug_assertions) {
1458 let authenticator_state = get_authenticator_state(self.get_object_store())
1459 .expect("Read cannot fail")
1460 .expect("Authenticator state must exist");
1461
1462 let mut sys_jwks: Vec<_> = authenticator_state
1463 .active_jwks
1464 .into_iter()
1465 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1466 .collect();
1467 let mut active_jwks: Vec<_> = epoch_store
1468 .signature_verifier
1469 .get_jwks()
1470 .into_iter()
1471 .collect();
1472 sys_jwks.sort();
1473 active_jwks.sort();
1474
1475 assert_eq!(sys_jwks, active_jwks);
1476 }
1477 }
1478
1479 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1480 if elapsed > 0.0 {
1481 self.metrics
1482 .execution_gas_latency_ratio
1483 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1484 };
1485 Ok((effects, execution_error_opt))
1486 }
1487
1488 #[instrument(level = "trace", skip_all)]
1489 fn commit_certificate(
1490 &self,
1491 certificate: &VerifiedExecutableTransaction,
1492 inner_temporary_store: InnerTemporaryStore,
1493 effects: &TransactionEffects,
1494 tx_guard: CertTxGuard,
1495 _execution_guard: ExecutionLockReadGuard<'_>,
1496 epoch_store: &Arc<AuthorityPerEpochStore>,
1497 ) -> IotaResult {
1498 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1499 monitored_scope("Execution::commit_certificate");
1500 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1501
1502 let tx_key = certificate.key();
1503 let tx_digest = certificate.digest();
1504 let input_object_count = inner_temporary_store.input_objects.len();
1505 let shared_object_count = effects.input_shared_objects().len();
1506
1507 let output_keys = inner_temporary_store.get_output_keys(effects);
1508
1509 let _ = self
1511 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1512 .tap_err(|e| {
1513 self.metrics.post_processing_total_failures.inc();
1514 error!(?tx_digest, "tx post processing failed: {e}");
1515 });
1516
1517 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1521
1522 fail_point!("crash");
1524
1525 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1526 certificate.clone().into_unsigned(),
1527 effects.clone(),
1528 inner_temporary_store,
1529 );
1530 self.get_cache_writer()
1531 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1532
1533 if certificate.transaction_data().is_end_of_epoch_tx() {
1534 self.get_object_cache_reader()
1537 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1538 }
1539
1540 tx_guard.commit_tx();
1542
1543 self.transaction_manager
1547 .notify_commit(tx_digest, output_keys, epoch_store);
1548
1549 self.update_metrics(certificate, input_object_count, shared_object_count);
1550
1551 Ok(())
1552 }
1553
1554 fn update_metrics(
1555 &self,
1556 certificate: &VerifiedExecutableTransaction,
1557 input_object_count: usize,
1558 shared_object_count: usize,
1559 ) {
1560 if certificate.has_zklogin_sig() {
1562 self.metrics.zklogin_sig_count.inc();
1563 } else if certificate.has_upgraded_multisig() {
1564 self.metrics.multisig_sig_count.inc();
1565 }
1566
1567 self.metrics.total_effects.inc();
1568 self.metrics.total_certs.inc();
1569
1570 if shared_object_count > 0 {
1571 self.metrics.shared_obj_tx.inc();
1572 }
1573
1574 if certificate.is_sponsored_tx() {
1575 self.metrics.sponsored_tx.inc();
1576 }
1577
1578 self.metrics
1579 .num_input_objs
1580 .observe(input_object_count as f64);
1581 self.metrics
1582 .num_shared_objects
1583 .observe(shared_object_count as f64);
1584 self.metrics.batch_size.observe(
1585 certificate
1586 .data()
1587 .intent_message()
1588 .value
1589 .kind()
1590 .num_commands() as f64,
1591 );
1592 }
1593
1594 #[instrument(level = "trace", skip_all)]
1606 fn prepare_certificate(
1607 &self,
1608 _execution_guard: &ExecutionLockReadGuard<'_>,
1609 certificate: &VerifiedExecutableTransaction,
1610 tx_input_objects: InputObjects,
1611 account_object: Option<ObjectReadResult>,
1612 move_authenticator_input_objects: Option<InputObjects>,
1613 epoch_store: &Arc<AuthorityPerEpochStore>,
1614 ) -> IotaResult<(
1615 InnerTemporaryStore,
1616 TransactionEffects,
1617 Option<ExecutionError>,
1618 )> {
1619 let _scope = monitored_scope("Execution::prepare_certificate");
1620 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1621 let prepare_certificate_start_time = tokio::time::Instant::now();
1622
1623 let protocol_config = epoch_store.protocol_config();
1624
1625 let reference_gas_price = epoch_store.reference_gas_price();
1626
1627 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1628 let epoch_start_timestamp = epoch_store
1629 .epoch_start_config()
1630 .epoch_data()
1631 .epoch_start_timestamp();
1632
1633 let backing_store = self.get_backing_store().as_ref();
1634
1635 let tx_digest = *certificate.digest();
1636
1637 let tx_data = certificate.data().transaction_data();
1640 tx_data.validity_check(protocol_config)?;
1641
1642 let (kind, signer, gas) = tx_data.execution_parts();
1643
1644 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1645 let (inner_temp_store, _, mut effects, execution_error_opt) = if let Some(
1646 move_authenticator,
1647 ) =
1648 certificate.sender_move_authenticator()
1649 {
1650 let (
1657 auth_account_object_id,
1658 auth_account_object_seq_number,
1659 auth_account_object_digest,
1660 ) = move_authenticator.object_to_authenticate_components()?;
1661
1662 let account_object = account_object.expect("Account object must be provided");
1665
1666 let authenticator_function_ref_for_execution = self.check_move_account(
1667 auth_account_object_id,
1668 auth_account_object_seq_number,
1669 auth_account_object_digest,
1670 account_object,
1671 &signer,
1672 )?;
1673
1674 let move_authenticator_input_objects = move_authenticator_input_objects.expect(
1675 "In case of a `MoveAuthenticator` signature, the authenticator input objects must be provided",
1676 );
1677
1678 let authenticator_gas_budget = protocol_config.max_auth_gas();
1683 let (
1684 gas_status,
1685 authenticator_checked_input_objects,
1686 authenticator_and_tx_checked_input_objects,
1687 ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1688 certificate,
1689 tx_input_objects,
1690 move_authenticator_input_objects,
1691 authenticator_gas_budget,
1692 protocol_config,
1693 reference_gas_price,
1694 )?;
1695
1696 let owned_object_refs = authenticator_and_tx_checked_input_objects
1697 .inner()
1698 .filter_owned_objects();
1699 self.check_owned_locks(&owned_object_refs)?;
1700
1701 epoch_store
1702 .executor()
1703 .authenticate_then_execute_transaction_to_effects(
1704 backing_store,
1705 protocol_config,
1706 self.metrics.limits_metrics.clone(),
1707 self.config
1708 .expensive_safety_check_config
1709 .enable_deep_per_tx_iota_conservation_check(),
1710 self.config.certificate_deny_config.certificate_deny_set(),
1711 &epoch_id,
1712 epoch_start_timestamp,
1713 gas_status,
1714 gas,
1715 move_authenticator.to_owned(),
1716 authenticator_function_ref_for_execution,
1717 authenticator_checked_input_objects,
1718 authenticator_and_tx_checked_input_objects,
1719 kind,
1720 signer,
1721 tx_digest,
1722 &mut None,
1723 )
1724 } else {
1725 let (tx_gas_status, tx_checked_input_objects) =
1730 iota_transaction_checks::check_certificate_input(
1731 certificate,
1732 tx_input_objects,
1733 protocol_config,
1734 reference_gas_price,
1735 )?;
1736
1737 let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1738 self.check_owned_locks(&owned_object_refs)?;
1739 epoch_store.executor().execute_transaction_to_effects(
1740 backing_store,
1741 protocol_config,
1742 self.metrics.limits_metrics.clone(),
1743 self.config
1746 .expensive_safety_check_config
1747 .enable_deep_per_tx_iota_conservation_check(),
1748 self.config.certificate_deny_config.certificate_deny_set(),
1749 &epoch_id,
1750 epoch_start_timestamp,
1751 tx_checked_input_objects,
1752 gas,
1753 tx_gas_status,
1754 kind,
1755 signer,
1756 tx_digest,
1757 &mut None,
1758 )
1759 };
1760
1761 fail_point_if!("cp_execution_nondeterminism", || {
1762 #[cfg(msim)]
1763 self.create_fail_state(certificate, epoch_store, &mut effects);
1764 });
1765
1766 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1767 if elapsed > 0.0 {
1768 self.metrics
1769 .prepare_cert_gas_latency_ratio
1770 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1771 }
1772
1773 Ok((inner_temp_store, effects, execution_error_opt.err()))
1774 }
1775
1776 pub fn prepare_certificate_for_benchmark(
1777 &self,
1778 certificate: &VerifiedExecutableTransaction,
1779 input_objects: InputObjects,
1780 epoch_store: &Arc<AuthorityPerEpochStore>,
1781 ) -> IotaResult<(
1782 InnerTemporaryStore,
1783 TransactionEffects,
1784 Option<ExecutionError>,
1785 )> {
1786 let lock = RwLock::new(epoch_store.epoch());
1787 let execution_guard = lock.try_read().unwrap();
1788
1789 self.prepare_certificate(
1790 &execution_guard,
1791 certificate,
1792 input_objects,
1793 None,
1794 None,
1795 epoch_store,
1796 )
1797 }
1798
1799 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1802 #[allow(clippy::type_complexity)]
1803 pub fn dry_exec_transaction(
1804 &self,
1805 transaction: TransactionData,
1806 transaction_digest: TransactionDigest,
1807 ) -> IotaResult<(
1808 DryRunTransactionBlockResponse,
1809 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1810 TransactionEffects,
1811 Option<ObjectID>,
1812 )> {
1813 let epoch_store = self.load_epoch_store_one_call_per_task();
1814 if !self.is_fullnode(&epoch_store) {
1815 return Err(IotaError::UnsupportedFeature {
1816 error: "dry-exec is only supported on fullnodes".to_string(),
1817 });
1818 }
1819
1820 if transaction.kind().is_system_tx() {
1821 return Err(IotaError::UnsupportedFeature {
1822 error: "dry-exec does not support system transactions".to_string(),
1823 });
1824 }
1825
1826 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1827 }
1828
1829 #[allow(clippy::type_complexity)]
1830 pub fn dry_exec_transaction_for_benchmark(
1831 &self,
1832 transaction: TransactionData,
1833 transaction_digest: TransactionDigest,
1834 ) -> IotaResult<(
1835 DryRunTransactionBlockResponse,
1836 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1837 TransactionEffects,
1838 Option<ObjectID>,
1839 )> {
1840 let epoch_store = self.load_epoch_store_one_call_per_task();
1841 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1842 }
1843
1844 #[instrument(level = "trace", skip_all)]
1845 #[allow(clippy::type_complexity)]
1846 fn dry_exec_transaction_impl(
1847 &self,
1848 epoch_store: &AuthorityPerEpochStore,
1849 transaction: TransactionData,
1850 transaction_digest: TransactionDigest,
1851 ) -> IotaResult<(
1852 DryRunTransactionBlockResponse,
1853 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1854 TransactionEffects,
1855 Option<ObjectID>,
1856 )> {
1857 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1859
1860 let input_object_kinds = transaction.input_objects()?;
1861 let receiving_object_refs = transaction.receiving_objects();
1862
1863 iota_transaction_checks::deny::check_transaction_for_signing(
1864 &transaction,
1865 &[],
1866 &input_object_kinds,
1867 &receiving_object_refs,
1868 &self.config.transaction_deny_config,
1869 self.get_backing_package_store().as_ref(),
1870 )?;
1871
1872 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1873 None,
1875 &input_object_kinds,
1876 &receiving_object_refs,
1877 epoch_store.epoch(),
1878 )?;
1879
1880 let mut transaction = transaction;
1882 let mut gas_object_refs = transaction.gas().to_vec();
1883 let reference_gas_price = epoch_store.reference_gas_price();
1884 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1885 let sender = transaction.gas_owner();
1886 let gas_object_id = ObjectID::random();
1887 let gas_object = Object::new_move(
1888 MoveObject::new_gas_coin(
1889 OBJECT_START_VERSION,
1890 gas_object_id,
1891 SIMULATION_GAS_COIN_VALUE,
1892 ),
1893 Owner::AddressOwner(sender),
1894 TransactionDigest::genesis_marker(),
1895 );
1896 let gas_object_ref = gas_object.compute_object_reference();
1897 gas_object_refs = vec![gas_object_ref];
1898 transaction.gas_data_mut().payment = gas_object_refs.clone();
1900 (
1901 iota_transaction_checks::check_transaction_input_with_given_gas(
1902 epoch_store.protocol_config(),
1903 reference_gas_price,
1904 &transaction,
1905 input_objects,
1906 receiving_objects,
1907 gas_object,
1908 &self.metrics.bytecode_verifier_metrics,
1909 &self.config.verifier_signing_config,
1910 )?,
1911 Some(gas_object_id),
1912 )
1913 } else {
1914 let authenticator_gas_budget = 0;
1917
1918 (
1919 iota_transaction_checks::check_transaction_input(
1920 epoch_store.protocol_config(),
1921 reference_gas_price,
1922 &transaction,
1923 input_objects,
1924 &receiving_objects,
1925 &self.metrics.bytecode_verifier_metrics,
1926 &self.config.verifier_signing_config,
1927 authenticator_gas_budget,
1928 )?,
1929 None,
1930 )
1931 };
1932
1933 let protocol_config = epoch_store.protocol_config();
1934 let (kind, signer, _) = transaction.execution_parts();
1935
1936 let silent = true;
1937 let executor = iota_execution::executor(protocol_config, silent, None)
1938 .expect("Creating an executor should not fail here");
1939
1940 let expensive_checks = false;
1941 let (inner_temp_store, _, effects, execution_error) = executor
1942 .execute_transaction_to_effects(
1943 self.get_backing_store().as_ref(),
1944 protocol_config,
1945 self.metrics.limits_metrics.clone(),
1946 expensive_checks,
1947 self.config.certificate_deny_config.certificate_deny_set(),
1948 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1949 epoch_store
1950 .epoch_start_config()
1951 .epoch_data()
1952 .epoch_start_timestamp(),
1953 checked_input_objects,
1954 gas_object_refs,
1955 gas_status,
1956 kind,
1957 signer,
1958 transaction_digest,
1959 &mut None,
1960 );
1961 let tx_digest = *effects.transaction_digest();
1962
1963 let module_cache =
1964 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1965
1966 let mut layout_resolver =
1967 epoch_store
1968 .executor()
1969 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1970 &inner_temp_store,
1971 self.get_backing_package_store(),
1972 )));
1973 let object_changes = Vec::new();
1975
1976 let balance_changes = Vec::new();
1978
1979 let written_with_kind = effects
1980 .created()
1981 .into_iter()
1982 .map(|(oref, _)| (oref, WriteKind::Create))
1983 .chain(
1984 effects
1985 .unwrapped()
1986 .into_iter()
1987 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1988 )
1989 .chain(
1990 effects
1991 .mutated()
1992 .into_iter()
1993 .map(|(oref, _)| (oref, WriteKind::Mutate)),
1994 )
1995 .map(|(oref, kind)| {
1996 let obj = inner_temp_store.written.get(&oref.0).unwrap();
1997 (oref.0, (oref, obj.clone(), kind))
1999 })
2000 .collect();
2001
2002 let execution_error_source = execution_error
2003 .as_ref()
2004 .err()
2005 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2006
2007 Ok((
2008 DryRunTransactionBlockResponse {
2009 suggested_gas_price: self
2011 .congestion_tracker
2012 .get_prediction_suggested_gas_price(&transaction),
2013 input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
2014 .map_err(|e| IotaError::TransactionSerialization {
2015 error: format!(
2016 "Failed to convert transaction to IotaTransactionBlockData: {e}",
2017 ),
2018 })?, effects: effects.clone().try_into()?,
2020 events: IotaTransactionBlockEvents::try_from(
2021 inner_temp_store.events.clone(),
2022 tx_digest,
2023 None,
2024 layout_resolver.as_mut(),
2025 )?,
2026 object_changes,
2027 balance_changes,
2028 execution_error_source,
2029 },
2030 written_with_kind,
2031 effects,
2032 mock_gas,
2033 ))
2034 }
2035
2036 pub fn simulate_transaction(
2037 &self,
2038 mut transaction: TransactionData,
2039 checks: VmChecks,
2040 ) -> IotaResult<SimulateTransactionResult> {
2041 if transaction.kind().is_system_tx() {
2042 return Err(IotaError::UnsupportedFeature {
2043 error: "simulate does not support system transactions".to_string(),
2044 });
2045 }
2046
2047 let epoch_store = self.load_epoch_store_one_call_per_task();
2048 if !self.is_fullnode(&epoch_store) {
2049 return Err(IotaError::UnsupportedFeature {
2050 error: "simulate is only supported on fullnodes".to_string(),
2051 });
2052 }
2053
2054 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2058
2059 let input_object_kinds = transaction.input_objects()?;
2060 let receiving_object_refs = transaction.receiving_objects();
2061
2062 iota_transaction_checks::deny::check_transaction_for_signing(
2065 &transaction,
2066 &[],
2067 &input_object_kinds,
2068 &receiving_object_refs,
2069 &self.config.transaction_deny_config,
2070 self.get_backing_package_store().as_ref(),
2071 )?;
2072
2073 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2075 None,
2077 &input_object_kinds,
2078 &receiving_object_refs,
2079 epoch_store.epoch(),
2080 )?;
2081
2082 let mock_gas_id = if transaction.gas().is_empty() {
2084 let mock_gas_object = Object::new_move(
2085 MoveObject::new_gas_coin(
2086 OBJECT_START_VERSION,
2087 ObjectID::MAX,
2088 SIMULATION_GAS_COIN_VALUE,
2089 ),
2090 Owner::AddressOwner(transaction.gas_data().owner),
2091 TransactionDigest::genesis_marker(),
2092 );
2093 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2094 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2095 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2096 Some(mock_gas_object.id())
2097 } else {
2098 None
2099 };
2100
2101 let protocol_config = epoch_store.protocol_config();
2102
2103 let authenticator_gas_budget = 0;
2106
2107 let (gas_status, checked_input_objects) = if checks.enabled() {
2110 iota_transaction_checks::check_transaction_input(
2111 protocol_config,
2112 epoch_store.reference_gas_price(),
2113 &transaction,
2114 input_objects,
2115 &receiving_objects,
2116 &self.metrics.bytecode_verifier_metrics,
2117 &self.config.verifier_signing_config,
2118 authenticator_gas_budget,
2119 )?
2120 } else {
2121 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2122 protocol_config,
2123 transaction.kind(),
2124 input_objects,
2125 receiving_objects,
2126 )?;
2127 let gas_status = IotaGasStatus::new(
2128 transaction.gas_budget(),
2129 transaction.gas_price(),
2130 epoch_store.reference_gas_price(),
2131 protocol_config,
2132 )?;
2133
2134 (gas_status, checked_input_objects)
2135 };
2136
2137 let executor = iota_execution::executor(
2139 protocol_config,
2140 true, None,
2142 )
2143 .expect("Creating an executor should not fail here");
2144
2145 let (kind, signer, gas_coins) = transaction.execution_parts();
2147 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2148 self.get_backing_store().as_ref(),
2149 protocol_config,
2150 self.metrics.limits_metrics.clone(),
2151 false, self.config.certificate_deny_config.certificate_deny_set(),
2153 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2154 epoch_store
2155 .epoch_start_config()
2156 .epoch_data()
2157 .epoch_start_timestamp(),
2158 checked_input_objects,
2159 gas_coins,
2160 gas_status,
2161 kind,
2162 signer,
2163 transaction.digest(),
2164 checks.disabled(),
2165 );
2166
2167 Ok(SimulateTransactionResult {
2170 input_objects: inner_temp_store.input_objects,
2171 output_objects: inner_temp_store.written,
2172 events: effects.events_digest().map(|_| inner_temp_store.events),
2173 effects,
2174 execution_result,
2175 mock_gas_id,
2176 })
2177 }
2178
2179 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2184 pub async fn dev_inspect_transaction_block(
2185 &self,
2186 sender: IotaAddress,
2187 transaction_kind: TransactionKind,
2188 gas_price: Option<u64>,
2189 gas_budget: Option<u64>,
2190 gas_sponsor: Option<IotaAddress>,
2191 gas_objects: Option<Vec<ObjectRef>>,
2192 show_raw_txn_data_and_effects: Option<bool>,
2193 skip_checks: Option<bool>,
2194 ) -> IotaResult<DevInspectResults> {
2195 let epoch_store = self.load_epoch_store_one_call_per_task();
2196
2197 if !self.is_fullnode(&epoch_store) {
2198 return Err(IotaError::UnsupportedFeature {
2199 error: "dev-inspect is only supported on fullnodes".to_string(),
2200 });
2201 }
2202
2203 if transaction_kind.is_system_tx() {
2204 return Err(IotaError::UnsupportedFeature {
2205 error: "system transactions are not supported".to_string(),
2206 });
2207 }
2208
2209 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2210 let skip_checks = skip_checks.unwrap_or(true);
2211 let reference_gas_price = epoch_store.reference_gas_price();
2212 let protocol_config = epoch_store.protocol_config();
2213 let max_tx_gas = protocol_config.max_tx_gas();
2214
2215 let price = gas_price.unwrap_or(reference_gas_price);
2216 let budget = gas_budget.unwrap_or(max_tx_gas);
2217 let owner = gas_sponsor.unwrap_or(sender);
2218 let payment = gas_objects.unwrap_or_default();
2221 let transaction = TransactionData::V1(TransactionDataV1 {
2222 kind: transaction_kind.clone(),
2223 sender,
2224 gas_data: GasData {
2225 payment,
2226 owner,
2227 price,
2228 budget,
2229 },
2230 expiration: TransactionExpiration::None,
2231 });
2232
2233 let raw_txn_data = if show_raw_txn_data_and_effects {
2234 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2235 error: "Failed to serialize transaction during dev inspect".to_string(),
2236 })?
2237 } else {
2238 vec![]
2239 };
2240
2241 transaction.validity_check_no_gas_check(protocol_config)?;
2242
2243 let input_object_kinds = transaction.input_objects()?;
2244 let receiving_object_refs = transaction.receiving_objects();
2245
2246 iota_transaction_checks::deny::check_transaction_for_signing(
2247 &transaction,
2248 &[],
2249 &input_object_kinds,
2250 &receiving_object_refs,
2251 &self.config.transaction_deny_config,
2252 self.get_backing_package_store().as_ref(),
2253 )?;
2254
2255 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2256 None,
2258 &input_object_kinds,
2259 &receiving_object_refs,
2260 epoch_store.epoch(),
2261 )?;
2262
2263 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2265 SIMULATION_GAS_COIN_VALUE,
2266 transaction.gas_owner(),
2267 );
2268
2269 let gas_objects = if transaction.gas().is_empty() {
2270 let gas_object_ref = dummy_gas_object.compute_object_reference();
2271 vec![gas_object_ref]
2272 } else {
2273 transaction.gas().to_vec()
2274 };
2275
2276 let (gas_status, checked_input_objects) = if skip_checks {
2277 if transaction.gas().is_empty() {
2282 input_objects.push(ObjectReadResult::new(
2283 InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2284 dummy_gas_object.into(),
2285 ));
2286 }
2287 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2288 protocol_config,
2289 &transaction_kind,
2290 input_objects,
2291 receiving_objects,
2292 )?;
2293 let gas_status = IotaGasStatus::new(
2294 max_tx_gas,
2295 transaction.gas_price(),
2296 reference_gas_price,
2297 protocol_config,
2298 )?;
2299
2300 (gas_status, checked_input_objects)
2301 } else {
2302 if transaction.gas().is_empty() {
2306 iota_transaction_checks::check_transaction_input_with_given_gas(
2307 epoch_store.protocol_config(),
2308 reference_gas_price,
2309 &transaction,
2310 input_objects,
2311 receiving_objects,
2312 dummy_gas_object,
2313 &self.metrics.bytecode_verifier_metrics,
2314 &self.config.verifier_signing_config,
2315 )?
2316 } else {
2317 let authenticator_gas_budget = 0;
2320
2321 iota_transaction_checks::check_transaction_input(
2322 epoch_store.protocol_config(),
2323 reference_gas_price,
2324 &transaction,
2325 input_objects,
2326 &receiving_objects,
2327 &self.metrics.bytecode_verifier_metrics,
2328 &self.config.verifier_signing_config,
2329 authenticator_gas_budget,
2330 )?
2331 }
2332 };
2333
2334 let executor = iota_execution::executor(protocol_config, true, None)
2335 .expect("Creating an executor should not fail here");
2336 let intent_msg = IntentMessage::new(
2337 Intent {
2338 version: IntentVersion::V0,
2339 scope: IntentScope::TransactionData,
2340 app_id: IntentAppId::Iota,
2341 },
2342 transaction,
2343 );
2344 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2345 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2346 self.get_backing_store().as_ref(),
2347 protocol_config,
2348 self.metrics.limits_metrics.clone(),
2349 false,
2351 self.config.certificate_deny_config.certificate_deny_set(),
2352 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2353 epoch_store
2354 .epoch_start_config()
2355 .epoch_data()
2356 .epoch_start_timestamp(),
2357 checked_input_objects,
2358 gas_objects,
2359 gas_status,
2360 transaction_kind,
2361 sender,
2362 transaction_digest,
2363 skip_checks,
2364 );
2365
2366 let raw_effects = if show_raw_txn_data_and_effects {
2367 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2368 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2369 })?
2370 } else {
2371 vec![]
2372 };
2373
2374 let mut layout_resolver =
2375 epoch_store
2376 .executor()
2377 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2378 &inner_temp_store,
2379 self.get_backing_package_store(),
2380 )));
2381
2382 DevInspectResults::new(
2383 effects,
2384 inner_temp_store.events.clone(),
2385 execution_result,
2386 raw_txn_data,
2387 raw_effects,
2388 layout_resolver.as_mut(),
2389 )
2390 }
2391
2392 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2394 let epoch_store = self.epoch_store_for_testing();
2395 Ok(epoch_store.reference_gas_price())
2396 }
2397
2398 #[instrument(level = "trace", skip_all)]
2399 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2400 self.get_transaction_cache_reader()
2401 .try_is_tx_already_executed(digest)
2402 }
2403
2404 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2406 self.try_is_tx_already_executed(digest)
2407 .expect("storage access failed")
2408 }
2409
2410 #[instrument(level = "debug", skip_all, err)]
2412 fn index_tx(
2413 &self,
2414 indexes: &IndexStore,
2415 digest: &TransactionDigest,
2416 cert: &VerifiedExecutableTransaction,
2418 effects: &TransactionEffects,
2419 events: &TransactionEvents,
2420 timestamp_ms: u64,
2421 tx_coins: Option<TxCoins>,
2422 written: &WrittenObjects,
2423 inner_temporary_store: &InnerTemporaryStore,
2424 ) -> IotaResult<u64> {
2425 let changes = self
2426 .process_object_index(effects, written, inner_temporary_store)
2427 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2428
2429 indexes.index_tx(
2430 cert.data().intent_message().value.sender(),
2431 cert.data()
2432 .intent_message()
2433 .value
2434 .input_objects()?
2435 .iter()
2436 .map(|o| o.object_id()),
2437 effects
2438 .all_changed_objects()
2439 .into_iter()
2440 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2441 cert.data()
2442 .intent_message()
2443 .value
2444 .move_calls()
2445 .into_iter()
2446 .map(|(package, module, function)| {
2447 (*package, module.to_owned(), function.to_owned())
2448 }),
2449 events,
2450 changes,
2451 digest,
2452 timestamp_ms,
2453 tx_coins,
2454 )
2455 }
2456
2457 #[cfg(msim)]
2458 fn create_fail_state(
2459 &self,
2460 certificate: &VerifiedExecutableTransaction,
2461 epoch_store: &Arc<AuthorityPerEpochStore>,
2462 effects: &mut TransactionEffects,
2463 ) {
2464 use std::cell::RefCell;
2465 thread_local! {
2466 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2467 }
2468 if !certificate.data().intent_message().value.is_system_tx() {
2469 let committee = epoch_store.committee();
2470 let cur_stake = (**committee).weight(&self.name);
2471 if cur_stake > 0 {
2472 FAIL_STATE.with_borrow_mut(|fail_state| {
2473 if fail_state.0 < committee.validity_threshold() {
2475 fail_state.0 += cur_stake;
2476 fail_state.1.insert(self.name);
2477 }
2478
2479 if fail_state.1.contains(&self.name) {
2480 info!("cp_exec failing tx");
2481 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2482 }
2483 });
2484 }
2485 }
2486 }
2487
2488 fn process_object_index(
2489 &self,
2490 effects: &TransactionEffects,
2491 written: &WrittenObjects,
2492 inner_temporary_store: &InnerTemporaryStore,
2493 ) -> IotaResult<ObjectIndexChanges> {
2494 let epoch_store = self.load_epoch_store_one_call_per_task();
2495 let mut layout_resolver =
2496 epoch_store
2497 .executor()
2498 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2499 inner_temporary_store,
2500 self.get_backing_package_store(),
2501 )));
2502
2503 let modified_at_version = effects
2504 .modified_at_versions()
2505 .into_iter()
2506 .collect::<HashMap<_, _>>();
2507
2508 let tx_digest = effects.transaction_digest();
2509 let mut deleted_owners = vec![];
2510 let mut deleted_dynamic_fields = vec![];
2511 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2512 let old_version = modified_at_version.get(&id).unwrap();
2513 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2516 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2517 ) {
2518 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2519 Owner::ObjectOwner(object_id) => {
2520 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2521 }
2522 _ => {}
2523 }
2524 }
2525
2526 let mut new_owners = vec![];
2527 let mut new_dynamic_fields = vec![];
2528
2529 for (oref, owner, kind) in effects.all_changed_objects() {
2530 let id = &oref.0;
2531 if let WriteKind::Mutate = kind {
2534 let Some(old_version) = modified_at_version.get(id) else {
2535 panic!(
2536 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2537 );
2538 };
2539 let Some(old_object) = self
2542 .get_object_store()
2543 .try_get_object_by_key(id, *old_version)?
2544 else {
2545 panic!(
2546 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2547 );
2548 };
2549 if old_object.owner != owner {
2550 match old_object.owner {
2551 Owner::AddressOwner(addr) => {
2552 deleted_owners.push((addr, *id));
2553 }
2554 Owner::ObjectOwner(object_id) => {
2555 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2556 }
2557 _ => {}
2558 }
2559 }
2560 }
2561
2562 match owner {
2563 Owner::AddressOwner(addr) => {
2564 let new_object = written.get(id).unwrap_or_else(
2567 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2568 );
2569 assert_eq!(
2570 new_object.version(),
2571 oref.1,
2572 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2573 tx_digest,
2574 id,
2575 new_object.version(),
2576 oref.1
2577 );
2578
2579 let type_ = new_object
2580 .type_()
2581 .map(|type_| ObjectType::Struct(type_.clone()))
2582 .unwrap_or(ObjectType::Package);
2583
2584 new_owners.push((
2585 (addr, *id),
2586 ObjectInfo {
2587 object_id: *id,
2588 version: oref.1,
2589 digest: oref.2,
2590 type_,
2591 owner,
2592 previous_transaction: *effects.transaction_digest(),
2593 },
2594 ));
2595 }
2596 Owner::ObjectOwner(owner) => {
2597 let new_object = written.get(id).unwrap_or_else(
2598 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2599 );
2600 assert_eq!(
2601 new_object.version(),
2602 oref.1,
2603 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2604 tx_digest,
2605 id,
2606 new_object.version(),
2607 oref.1
2608 );
2609
2610 let Some(df_info) = self
2611 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2612 .unwrap_or_else(|e| {
2613 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2614 None
2615 }
2616 )
2617 else {
2618 continue;
2620 };
2621 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2622 }
2623 _ => {}
2624 }
2625 }
2626
2627 Ok(ObjectIndexChanges {
2628 deleted_owners,
2629 deleted_dynamic_fields,
2630 new_owners,
2631 new_dynamic_fields,
2632 })
2633 }
2634
2635 fn try_create_dynamic_field_info(
2636 &self,
2637 o: &Object,
2638 written: &WrittenObjects,
2639 resolver: &mut dyn LayoutResolver,
2640 ) -> IotaResult<Option<DynamicFieldInfo>> {
2641 let Some(move_object) = o.data.try_as_move().cloned() else {
2643 return Ok(None);
2644 };
2645
2646 if !move_object.type_().is_dynamic_field() {
2648 return Ok(None);
2649 }
2650
2651 let layout = resolver
2652 .get_annotated_layout(&move_object.type_().clone().into())?
2653 .into_layout();
2654
2655 let field =
2656 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2657 IotaError::ObjectDeserialization {
2658 error: e.to_string(),
2659 }
2660 })?;
2661
2662 let type_ = field.kind;
2663 let name_type: TypeTag = field.name_layout.into();
2664 let bcs_name = field.name_bytes.to_owned();
2665
2666 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2667 .map_err(|e| {
2668 warn!("{e}");
2669 IotaError::ObjectDeserialization {
2670 error: e.to_string(),
2671 }
2672 })?;
2673
2674 let name = DynamicFieldName {
2675 type_: name_type,
2676 value: IotaMoveValue::from(name_value).to_json_value(),
2677 };
2678
2679 let value_metadata = field.value_metadata().map_err(|e| {
2680 warn!("{e}");
2681 IotaError::ObjectDeserialization {
2682 error: e.to_string(),
2683 }
2684 })?;
2685
2686 Ok(Some(match value_metadata {
2687 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2688 name,
2689 bcs_name,
2690 type_,
2691 object_type: object_type.to_canonical_string(true),
2692 object_id: o.id(),
2693 version: o.version(),
2694 digest: o.digest(),
2695 },
2696
2697 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2698 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2703 (
2704 object.version(),
2705 object.digest(),
2706 object.data.type_().unwrap().clone(),
2707 )
2708 } else {
2709 let object = self
2711 .get_object_store()
2712 .try_get_object_by_key(&object_id, o.version())?
2713 .ok_or_else(|| UserInputError::ObjectNotFound {
2714 object_id,
2715 version: Some(o.version()),
2716 })?;
2717 let version = object.version();
2718 let digest = object.digest();
2719 let object_type = object.data.type_().unwrap().clone();
2720 (version, digest, object_type)
2721 };
2722
2723 DynamicFieldInfo {
2724 name,
2725 bcs_name,
2726 type_,
2727 object_type: object_type.to_string(),
2728 object_id,
2729 version,
2730 digest,
2731 }
2732 }
2733 }))
2734 }
2735
2736 #[instrument(level = "trace", skip_all, err)]
2737 fn post_process_one_tx(
2738 &self,
2739 certificate: &VerifiedExecutableTransaction,
2740 effects: &TransactionEffects,
2741 inner_temporary_store: &InnerTemporaryStore,
2742 epoch_store: &Arc<AuthorityPerEpochStore>,
2743 ) -> IotaResult {
2744 if self.indexes.is_none() {
2745 return Ok(());
2746 }
2747
2748 let tx_digest = certificate.digest();
2749 let timestamp_ms = Self::unixtime_now_ms();
2750 let events = &inner_temporary_store.events;
2751 let written = &inner_temporary_store.written;
2752 let tx_coins =
2753 self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2754
2755 if let Some(indexes) = &self.indexes {
2757 let _ = self
2758 .index_tx(
2759 indexes.as_ref(),
2760 tx_digest,
2761 certificate,
2762 effects,
2763 events,
2764 timestamp_ms,
2765 tx_coins,
2766 written,
2767 inner_temporary_store,
2768 )
2769 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2770 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2771 .expect("Indexing tx should not fail");
2772
2773 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2774 let events = self.make_transaction_block_events(
2775 events.clone(),
2776 *tx_digest,
2777 timestamp_ms,
2778 epoch_store,
2779 inner_temporary_store,
2780 )?;
2781 self.subscription_handler
2783 .process_tx(certificate.data().transaction_data(), &effects, &events)
2784 .tap_ok(|_| {
2785 self.metrics
2786 .post_processing_total_tx_had_event_processed
2787 .inc()
2788 })
2789 .tap_err(|e| {
2790 warn!(
2791 ?tx_digest,
2792 "Post processing - Couldn't process events for tx: {}", e
2793 )
2794 })?;
2795
2796 self.metrics
2797 .post_processing_total_events_emitted
2798 .inc_by(events.data.len() as u64);
2799 };
2800 Ok(())
2801 }
2802
2803 fn make_transaction_block_events(
2804 &self,
2805 transaction_events: TransactionEvents,
2806 digest: TransactionDigest,
2807 timestamp_ms: u64,
2808 epoch_store: &Arc<AuthorityPerEpochStore>,
2809 inner_temporary_store: &InnerTemporaryStore,
2810 ) -> IotaResult<IotaTransactionBlockEvents> {
2811 let mut layout_resolver =
2812 epoch_store
2813 .executor()
2814 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2815 inner_temporary_store,
2816 self.get_backing_package_store(),
2817 )));
2818 IotaTransactionBlockEvents::try_from(
2819 transaction_events,
2820 digest,
2821 Some(timestamp_ms),
2822 layout_resolver.as_mut(),
2823 )
2824 }
2825
2826 pub fn unixtime_now_ms() -> u64 {
2827 let now = SystemTime::now()
2828 .duration_since(UNIX_EPOCH)
2829 .expect("Time went backwards")
2830 .as_millis();
2831 u64::try_from(now).expect("Travelling in time machine")
2832 }
2833
2834 #[instrument(level = "trace", skip_all)]
2835 pub async fn handle_transaction_info_request(
2836 &self,
2837 request: TransactionInfoRequest,
2838 ) -> IotaResult<TransactionInfoResponse> {
2839 let epoch_store = self.load_epoch_store_one_call_per_task();
2840 let (transaction, status) = self
2841 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2842 .ok_or(IotaError::TransactionNotFound {
2843 digest: request.transaction_digest,
2844 })?;
2845 Ok(TransactionInfoResponse {
2846 transaction,
2847 status,
2848 })
2849 }
2850
2851 #[instrument(level = "trace", skip_all)]
2852 pub async fn handle_object_info_request(
2853 &self,
2854 request: ObjectInfoRequest,
2855 ) -> IotaResult<ObjectInfoResponse> {
2856 let epoch_store = self.load_epoch_store_one_call_per_task();
2857
2858 let requested_object_seq = match request.request_kind {
2859 ObjectInfoRequestKind::LatestObjectInfo => {
2860 let (_, seq, _) = self
2861 .try_get_object_or_tombstone(request.object_id)
2862 .await?
2863 .ok_or_else(|| {
2864 IotaError::from(UserInputError::ObjectNotFound {
2865 object_id: request.object_id,
2866 version: None,
2867 })
2868 })?;
2869 seq
2870 }
2871 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2872 };
2873
2874 let object = self
2875 .get_object_store()
2876 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2877 .ok_or_else(|| {
2878 IotaError::from(UserInputError::ObjectNotFound {
2879 object_id: request.object_id,
2880 version: Some(requested_object_seq),
2881 })
2882 })?;
2883
2884 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2885 (request.generate_layout, object.data.try_as_move())
2886 {
2887 Some(into_struct_layout(
2888 epoch_store
2889 .executor()
2890 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2891 .get_annotated_layout(&move_obj.type_().clone().into())?,
2892 )?)
2893 } else {
2894 None
2895 };
2896
2897 let lock = if !object.is_address_owned() {
2898 None
2900 } else {
2901 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2902 .await?
2903 .map(|s| s.into_inner())
2904 };
2905
2906 Ok(ObjectInfoResponse {
2907 object,
2908 layout,
2909 lock_for_debugging: lock,
2910 })
2911 }
2912
2913 #[instrument(level = "trace", skip_all)]
2914 pub fn handle_checkpoint_request(
2915 &self,
2916 request: &CheckpointRequest,
2917 ) -> IotaResult<CheckpointResponse> {
2918 let summary = if request.certified {
2919 let summary = match request.sequence_number {
2920 Some(seq) => self
2921 .checkpoint_store
2922 .get_checkpoint_by_sequence_number(seq)?,
2923 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2924 }
2925 .map(|v| v.into_inner());
2926 summary.map(CheckpointSummaryResponse::Certified)
2927 } else {
2928 let summary = match request.sequence_number {
2929 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2930 None => self
2931 .checkpoint_store
2932 .get_latest_locally_computed_checkpoint()?,
2933 };
2934 summary.map(CheckpointSummaryResponse::Pending)
2935 };
2936 let contents = match &summary {
2937 Some(s) => self
2938 .checkpoint_store
2939 .get_checkpoint_contents(&s.content_digest())?,
2940 None => None,
2941 };
2942 Ok(CheckpointResponse {
2943 checkpoint: summary,
2944 contents,
2945 })
2946 }
2947
2948 fn check_protocol_version(
2949 supported_protocol_versions: SupportedProtocolVersions,
2950 current_version: ProtocolVersion,
2951 ) {
2952 info!("current protocol version is now {:?}", current_version);
2953 info!("supported versions are: {:?}", supported_protocol_versions);
2954 if !supported_protocol_versions.is_version_supported(current_version) {
2955 let msg = format!(
2956 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2957 );
2958
2959 error!("{}", msg);
2960 eprintln!("{msg}");
2961
2962 #[cfg(not(msim))]
2963 std::process::exit(1);
2964
2965 #[cfg(msim)]
2966 iota_simulator::task::shutdown_current_node();
2967 }
2968 }
2969
2970 #[expect(clippy::disallowed_methods)] pub async fn new(
2972 name: AuthorityName,
2973 secret: StableSyncAuthoritySigner,
2974 supported_protocol_versions: SupportedProtocolVersions,
2975 store: Arc<AuthorityStore>,
2976 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2977 epoch_store: Arc<AuthorityPerEpochStore>,
2978 committee_store: Arc<CommitteeStore>,
2979 indexes: Option<Arc<IndexStore>>,
2980 rest_index: Option<Arc<RestIndexStore>>,
2981 checkpoint_store: Arc<CheckpointStore>,
2982 prometheus_registry: &Registry,
2983 genesis_objects: &[Object],
2984 db_checkpoint_config: &DBCheckpointConfig,
2985 config: NodeConfig,
2986 archive_readers: ArchiveReaderBalancer,
2987 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2988 chain_identifier: ChainIdentifier,
2989 pruner_db: Option<Arc<AuthorityPrunerTables>>,
2990 ) -> Arc<Self> {
2991 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2992
2993 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2994 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2995 let transaction_manager = Arc::new(TransactionManager::new(
2996 execution_cache_trait_pointers.object_cache_reader.clone(),
2997 execution_cache_trait_pointers
2998 .transaction_cache_reader
2999 .clone(),
3000 &epoch_store,
3001 tx_ready_certificates,
3002 metrics.clone(),
3003 ));
3004 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3005
3006 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3007 epoch_store.get_parent_path(),
3008 &config.authority_store_pruning_config,
3009 );
3010 let _pruner = AuthorityStorePruner::new(
3011 store.perpetual_tables.clone(),
3012 checkpoint_store.clone(),
3013 rest_index.clone(),
3014 indexes.clone(),
3015 config.authority_store_pruning_config.clone(),
3016 epoch_store.committee().authority_exists(&name),
3017 epoch_store.epoch_start_state().epoch_duration_ms(),
3018 prometheus_registry,
3019 archive_readers,
3020 pruner_db,
3021 );
3022 let input_loader =
3023 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3024 let epoch = epoch_store.epoch();
3025 let rgp = epoch_store.reference_gas_price();
3026 let state = Arc::new(AuthorityState {
3027 name,
3028 secret,
3029 execution_lock: RwLock::new(epoch),
3030 epoch_store: ArcSwap::new(epoch_store.clone()),
3031 input_loader,
3032 execution_cache_trait_pointers,
3033 indexes,
3034 rest_index,
3035 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3036 checkpoint_store,
3037 committee_store,
3038 transaction_manager,
3039 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3040 metrics,
3041 _pruner,
3042 _authority_per_epoch_pruner,
3043 db_checkpoint_config: db_checkpoint_config.clone(),
3044 config,
3045 overload_info: AuthorityOverloadInfo::default(),
3046 validator_tx_finalizer,
3047 chain_identifier,
3048 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3049 });
3050
3051 let authority_state = Arc::downgrade(&state);
3053 spawn_monitored_task!(execution_process(
3054 authority_state,
3055 rx_ready_certificates,
3056 rx_execution_shutdown,
3057 ));
3058
3059 state
3061 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3062 .expect("Error indexing genesis objects.");
3063
3064 state
3065 }
3066
3067 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3069 &self.execution_cache_trait_pointers.object_cache_reader
3070 }
3071
3072 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3073 &self.execution_cache_trait_pointers.transaction_cache_reader
3074 }
3075
3076 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3077 &self.execution_cache_trait_pointers.cache_writer
3078 }
3079
3080 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3081 &self.execution_cache_trait_pointers.backing_store
3082 }
3083
3084 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3085 &self.execution_cache_trait_pointers.backing_package_store
3086 }
3087
3088 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3089 &self.execution_cache_trait_pointers.object_store
3090 }
3091
3092 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3093 &self.execution_cache_trait_pointers.reconfig_api
3094 }
3095
3096 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3097 &self.execution_cache_trait_pointers.accumulator_store
3098 }
3099
3100 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3101 &self.execution_cache_trait_pointers.checkpoint_cache
3102 }
3103
3104 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3105 &self.execution_cache_trait_pointers.state_sync_store
3106 }
3107
3108 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3109 &self.execution_cache_trait_pointers.cache_commit
3110 }
3111
3112 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3113 self.execution_cache_trait_pointers
3114 .testing_api
3115 .database_for_testing()
3116 }
3117
3118 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3119 &self,
3120 config: NodeConfig,
3121 metrics: Arc<AuthorityStorePruningMetrics>,
3122 ) -> anyhow::Result<()> {
3123 let archive_readers =
3124 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3125 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3126 &self.database_for_testing().perpetual_tables,
3127 &self.checkpoint_store,
3128 self.rest_index.as_deref(),
3129 None,
3130 config.authority_store_pruning_config,
3131 metrics,
3132 archive_readers,
3133 EPOCH_DURATION_MS_FOR_TESTING,
3134 )
3135 .await
3136 }
3137
3138 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3139 &self.transaction_manager
3140 }
3141
3142 pub fn enqueue_transactions_for_execution(
3145 &self,
3146 txns: Vec<VerifiedExecutableTransaction>,
3147 epoch_store: &Arc<AuthorityPerEpochStore>,
3148 ) {
3149 self.transaction_manager.enqueue(txns, epoch_store)
3150 }
3151
3152 pub fn enqueue_certificates_for_execution(
3154 &self,
3155 certs: Vec<VerifiedCertificate>,
3156 epoch_store: &Arc<AuthorityPerEpochStore>,
3157 ) {
3158 self.transaction_manager
3159 .enqueue_certificates(certs, epoch_store)
3160 }
3161
3162 pub fn enqueue_with_expected_effects_digest(
3163 &self,
3164 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3165 epoch_store: &AuthorityPerEpochStore,
3166 ) {
3167 self.transaction_manager
3168 .enqueue_with_expected_effects_digest(certs, epoch_store)
3169 }
3170
3171 fn create_owner_index_if_empty(
3172 &self,
3173 genesis_objects: &[Object],
3174 epoch_store: &Arc<AuthorityPerEpochStore>,
3175 ) -> IotaResult {
3176 let Some(index_store) = &self.indexes else {
3177 return Ok(());
3178 };
3179 if !index_store.is_empty() {
3180 return Ok(());
3181 }
3182
3183 let mut new_owners = vec![];
3184 let mut new_dynamic_fields = vec![];
3185 let mut layout_resolver = epoch_store
3186 .executor()
3187 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3188 for o in genesis_objects.iter() {
3189 match o.owner {
3190 Owner::AddressOwner(addr) => new_owners.push((
3191 (addr, o.id()),
3192 ObjectInfo::new(&o.compute_object_reference(), o),
3193 )),
3194 Owner::ObjectOwner(object_id) => {
3195 let id = o.id();
3196 let info = match self.try_create_dynamic_field_info(
3197 o,
3198 &BTreeMap::new(),
3199 layout_resolver.as_mut(),
3200 ) {
3201 Ok(Some(info)) => info,
3202 Ok(None) => continue,
3203 Err(IotaError::UserInput {
3204 error:
3205 UserInputError::ObjectNotFound {
3206 object_id: not_found_id,
3207 version,
3208 },
3209 }) => {
3210 warn!(
3211 ?not_found_id,
3212 ?version,
3213 object_owner=?object_id,
3214 field=?id,
3215 "Skipping dynamic field: referenced genesis object not found"
3216 );
3217 continue;
3218 }
3219 Err(e) => return Err(e),
3220 };
3221 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3222 }
3223 _ => {}
3224 }
3225 }
3226
3227 index_store.insert_genesis_objects(ObjectIndexChanges {
3228 deleted_owners: vec![],
3229 deleted_dynamic_fields: vec![],
3230 new_owners,
3231 new_dynamic_fields,
3232 })
3233 }
3234
3235 pub fn execution_lock_for_executable_transaction(
3239 &self,
3240 transaction: &VerifiedExecutableTransaction,
3241 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3242 let lock = self
3243 .execution_lock
3244 .try_read()
3245 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3246 if *lock == transaction.auth_sig().epoch() {
3247 Ok(lock)
3248 } else {
3249 Err(IotaError::WrongEpoch {
3250 expected_epoch: *lock,
3251 actual_epoch: transaction.auth_sig().epoch(),
3252 })
3253 }
3254 }
3255
3256 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3262 self.execution_lock
3263 .try_read()
3264 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3265 }
3266
3267 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3268 self.execution_lock.write().await
3269 }
3270
3271 #[instrument(level = "error", skip_all)]
3272 pub async fn reconfigure(
3273 &self,
3274 cur_epoch_store: &AuthorityPerEpochStore,
3275 supported_protocol_versions: SupportedProtocolVersions,
3276 new_committee: Committee,
3277 epoch_start_configuration: EpochStartConfiguration,
3278 accumulator: Arc<StateAccumulator>,
3279 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3280 epoch_supply_change: i64,
3281 epoch_last_checkpoint: CheckpointSequenceNumber,
3282 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3283 Self::check_protocol_version(
3284 supported_protocol_versions,
3285 epoch_start_configuration
3286 .epoch_start_state()
3287 .protocol_version(),
3288 );
3289 self.metrics.reset_on_reconfigure();
3290 self.committee_store.insert_new_committee(&new_committee)?;
3291
3292 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3294
3295 cur_epoch_store.epoch_terminated().await;
3297
3298 let highest_locally_built_checkpoint_seq = self
3299 .checkpoint_store
3300 .get_latest_locally_computed_checkpoint()?
3301 .map(|c| *c.sequence_number())
3302 .unwrap_or(0);
3303
3304 assert!(
3305 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3306 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3307 );
3308 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3309 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3313 if num_shared_version_assignments > 1 {
3316 debug_fatal!(
3318 "all shared_version_assignments should have been removed \
3319 (num_shared_version_assignments: {num_shared_version_assignments})"
3320 );
3321 }
3322 }
3323
3324 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3330 .await?;
3331 self.get_reconfig_api()
3332 .clear_state_end_of_epoch(&execution_lock);
3333 self.check_system_consistency(
3334 cur_epoch_store,
3335 accumulator,
3336 expensive_safety_check_config,
3337 epoch_supply_change,
3338 )?;
3339 self.get_reconfig_api()
3340 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3341 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3342 if self
3343 .db_checkpoint_config
3344 .perform_db_checkpoints_at_epoch_end
3345 {
3346 let checkpoint_indexes = self
3347 .db_checkpoint_config
3348 .perform_index_db_checkpoints_at_epoch_end
3349 .unwrap_or(false);
3350 let current_epoch = cur_epoch_store.epoch();
3351 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3352 self.checkpoint_all_dbs(
3353 &epoch_checkpoint_path,
3354 cur_epoch_store,
3355 checkpoint_indexes,
3356 )?;
3357 }
3358 }
3359
3360 self.get_reconfig_api()
3361 .reconfigure_cache(&epoch_start_configuration)
3362 .await;
3363
3364 let new_epoch = new_committee.epoch;
3365 let new_epoch_store = self
3366 .reopen_epoch_db(
3367 cur_epoch_store,
3368 new_committee,
3369 epoch_start_configuration,
3370 expensive_safety_check_config,
3371 epoch_last_checkpoint,
3372 )
3373 .await?;
3374 assert_eq!(new_epoch_store.epoch(), new_epoch);
3375 self.transaction_manager.reconfigure(new_epoch);
3376 *execution_lock = new_epoch;
3377 Ok(new_epoch_store)
3381 }
3382
3383 pub async fn reconfigure_for_testing(&self) {
3388 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3389 let epoch_store = self.epoch_store_for_testing().clone();
3390 let protocol_config = epoch_store.protocol_config().clone();
3391 let _guard =
3399 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3400 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3401 self.get_backing_package_store().clone(),
3402 self.get_object_store().clone(),
3403 &self.config.expensive_safety_check_config,
3404 self.checkpoint_store
3405 .get_epoch_last_checkpoint(epoch_store.epoch())
3406 .unwrap()
3407 .map(|c| *c.sequence_number())
3408 .unwrap_or_default(),
3409 );
3410 let new_epoch = new_epoch_store.epoch();
3411 self.transaction_manager.reconfigure(new_epoch);
3412 self.epoch_store.store(new_epoch_store);
3413 epoch_store.epoch_terminated().await;
3414 *execution_lock = new_epoch;
3415 }
3416
3417 #[instrument(level = "error", skip_all)]
3418 fn check_system_consistency(
3419 &self,
3420 cur_epoch_store: &AuthorityPerEpochStore,
3421 accumulator: Arc<StateAccumulator>,
3422 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3423 epoch_supply_change: i64,
3424 ) -> IotaResult<()> {
3425 info!(
3426 "Performing iota conservation consistency check for epoch {}",
3427 cur_epoch_store.epoch()
3428 );
3429
3430 if cfg!(debug_assertions) {
3431 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3432 }
3433
3434 self.get_reconfig_api()
3435 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3436
3437 if expensive_safety_check_config.enable_state_consistency_check() {
3439 info!(
3440 "Performing state consistency check for epoch {}",
3441 cur_epoch_store.epoch()
3442 );
3443 self.expensive_check_is_consistent_state(
3444 accumulator,
3445 cur_epoch_store,
3446 cfg!(debug_assertions), );
3448 }
3449
3450 if expensive_safety_check_config.enable_secondary_index_checks() {
3451 if let Some(indexes) = self.indexes.clone() {
3452 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3453 .expect("secondary indexes are inconsistent");
3454 }
3455 }
3456
3457 Ok(())
3458 }
3459
3460 fn expensive_check_is_consistent_state(
3461 &self,
3462 accumulator: Arc<StateAccumulator>,
3463 cur_epoch_store: &AuthorityPerEpochStore,
3464 panic: bool,
3465 ) {
3466 let live_object_set_hash = accumulator.digest_live_object_set();
3467
3468 let root_state_hash: ECMHLiveObjectSetDigest = self
3469 .get_accumulator_store()
3470 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3471 .expect("Retrieving root state hash cannot fail")
3472 .expect("Root state hash for epoch must exist")
3473 .1
3474 .digest()
3475 .into();
3476
3477 let is_inconsistent = root_state_hash != live_object_set_hash;
3478 if is_inconsistent {
3479 if panic {
3480 panic!(
3481 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3482 );
3483 } else {
3484 error!(
3485 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3486 root_state_hash, live_object_set_hash
3487 );
3488 }
3489 } else {
3490 info!("State consistency check passed");
3491 }
3492
3493 if !panic {
3494 accumulator.set_inconsistent_state(is_inconsistent);
3495 }
3496 }
3497
3498 pub fn current_epoch_for_testing(&self) -> EpochId {
3499 self.epoch_store_for_testing().epoch()
3500 }
3501
3502 #[instrument(level = "error", skip_all)]
3503 pub fn checkpoint_all_dbs(
3504 &self,
3505 checkpoint_path: &Path,
3506 cur_epoch_store: &AuthorityPerEpochStore,
3507 checkpoint_indexes: bool,
3508 ) -> IotaResult {
3509 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3510 let current_epoch = cur_epoch_store.epoch();
3511
3512 if checkpoint_path.exists() {
3513 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3514 return Ok(());
3515 }
3516
3517 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3518 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3519
3520 if checkpoint_path_tmp.exists() {
3521 fs::remove_dir_all(&checkpoint_path_tmp)
3522 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3523 }
3524
3525 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3526 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3527
3528 self.checkpoint_store
3531 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3532
3533 self.get_reconfig_api()
3534 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3535
3536 self.committee_store
3537 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3538
3539 if checkpoint_indexes {
3540 if let Some(indexes) = self.indexes.as_ref() {
3541 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3542 }
3543 }
3544
3545 fs::rename(checkpoint_path_tmp, checkpoint_path)
3546 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3547 Ok(())
3548 }
3549
3550 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3557 self.epoch_store.load()
3558 }
3559
3560 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3562 self.load_epoch_store_one_call_per_task()
3563 }
3564
3565 pub fn clone_committee_for_testing(&self) -> Committee {
3566 Committee::clone(self.epoch_store_for_testing().committee())
3567 }
3568
3569 #[instrument(level = "trace", skip_all)]
3570 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3571 self.get_object_store()
3572 .try_get_object(object_id)
3573 .map_err(Into::into)
3574 }
3575
3576 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3578 self.try_get_object(object_id)
3579 .await
3580 .expect("storage access failed")
3581 }
3582
3583 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3584 Ok(self
3585 .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3586 .await?
3587 .expect("framework object should always exist")
3588 .compute_object_reference())
3589 }
3590
3591 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3593 self.get_object_cache_reader()
3594 .try_get_iota_system_state_object_unsafe()
3595 }
3596
3597 #[instrument(level = "trace", skip_all)]
3598 pub fn get_checkpoint_by_sequence_number(
3599 &self,
3600 sequence_number: CheckpointSequenceNumber,
3601 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3602 Ok(self
3603 .checkpoint_store
3604 .get_checkpoint_by_sequence_number(sequence_number)?)
3605 }
3606
3607 #[instrument(level = "trace", skip_all)]
3608 pub fn get_transaction_checkpoint_for_tests(
3609 &self,
3610 digest: &TransactionDigest,
3611 epoch_store: &AuthorityPerEpochStore,
3612 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3613 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3614 let Some(checkpoint) = checkpoint else {
3615 return Ok(None);
3616 };
3617 let checkpoint = self
3618 .checkpoint_store
3619 .get_checkpoint_by_sequence_number(checkpoint)?;
3620 Ok(checkpoint)
3621 }
3622
3623 #[instrument(level = "trace", skip_all)]
3624 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3625 Ok(
3626 match self
3627 .get_object_cache_reader()
3628 .try_get_latest_object_or_tombstone(*object_id)?
3629 {
3630 Some((_, ObjectOrTombstone::Object(object))) => {
3631 let layout = self.get_object_layout(&object)?;
3632 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3633 }
3634 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3635 None => ObjectRead::NotExists(*object_id),
3636 },
3637 )
3638 }
3639
3640 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3642 self.chain_identifier
3643 }
3644
3645 #[instrument(level = "trace", skip_all)]
3646 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3647 where
3648 T: DeserializeOwned,
3649 {
3650 let o = self.get_object_read(object_id)?.into_object()?;
3651 if let Some(move_object) = o.data.try_as_move() {
3652 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3653 IotaError::ObjectDeserialization {
3654 error: format!("{e}"),
3655 }
3656 })?)
3657 } else {
3658 Err(IotaError::ObjectDeserialization {
3659 error: format!("Provided object : [{object_id}] is not a Move object."),
3660 })
3661 }
3662 }
3663
3664 #[instrument(level = "trace", skip_all)]
3670 pub fn get_past_object_read(
3671 &self,
3672 object_id: &ObjectID,
3673 version: SequenceNumber,
3674 ) -> IotaResult<PastObjectRead> {
3675 let Some(obj_ref) = self
3677 .get_object_cache_reader()
3678 .try_get_latest_object_ref_or_tombstone(*object_id)?
3679 else {
3680 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3681 };
3682
3683 if version > obj_ref.1 {
3684 return Ok(PastObjectRead::VersionTooHigh {
3685 object_id: *object_id,
3686 asked_version: version,
3687 latest_version: obj_ref.1,
3688 });
3689 }
3690
3691 if version < obj_ref.1 {
3692 return Ok(match self.read_object_at_version(object_id, version)? {
3694 Some((object, layout)) => {
3695 let obj_ref = object.compute_object_reference();
3696 PastObjectRead::VersionFound(obj_ref, object, layout)
3697 }
3698
3699 None => PastObjectRead::VersionNotFound(*object_id, version),
3700 });
3701 }
3702
3703 if !obj_ref.2.is_alive() {
3704 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3705 }
3706
3707 match self.read_object_at_version(object_id, obj_ref.1)? {
3708 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3709 None => {
3710 error!(
3711 "Object with in parent_entry is missing from object store, datastore is \
3712 inconsistent",
3713 );
3714 Err(UserInputError::ObjectNotFound {
3715 object_id: *object_id,
3716 version: Some(obj_ref.1),
3717 }
3718 .into())
3719 }
3720 }
3721 }
3722
3723 #[instrument(level = "trace", skip_all)]
3724 fn read_object_at_version(
3725 &self,
3726 object_id: &ObjectID,
3727 version: SequenceNumber,
3728 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3729 let Some(object) = self
3730 .get_object_cache_reader()
3731 .try_get_object_by_key(object_id, version)?
3732 else {
3733 return Ok(None);
3734 };
3735
3736 let layout = self.get_object_layout(&object)?;
3737 Ok(Some((object, layout)))
3738 }
3739
3740 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3741 let layout = object
3742 .data
3743 .try_as_move()
3744 .map(|object| {
3745 into_struct_layout(
3746 self.load_epoch_store_one_call_per_task()
3747 .executor()
3748 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3750 .get_annotated_layout(&object.type_().clone().into())?,
3751 )
3752 })
3753 .transpose()?;
3754 Ok(layout)
3755 }
3756
3757 fn get_owner_at_version(
3758 &self,
3759 object_id: &ObjectID,
3760 version: SequenceNumber,
3761 ) -> IotaResult<Owner> {
3762 self.get_object_store()
3763 .try_get_object_by_key(object_id, version)?
3764 .ok_or_else(|| {
3765 IotaError::from(UserInputError::ObjectNotFound {
3766 object_id: *object_id,
3767 version: Some(version),
3768 })
3769 })
3770 .map(|o| o.owner)
3771 }
3772
3773 #[instrument(level = "trace", skip_all)]
3774 pub fn get_owner_objects(
3775 &self,
3776 owner: IotaAddress,
3777 cursor: Option<ObjectID>,
3779 limit: usize,
3780 filter: Option<IotaObjectDataFilter>,
3781 ) -> IotaResult<Vec<ObjectInfo>> {
3782 if let Some(indexes) = &self.indexes {
3783 indexes.get_owner_objects(owner, cursor, limit, filter)
3784 } else {
3785 Err(IotaError::IndexStoreNotAvailable)
3786 }
3787 }
3788
3789 #[instrument(level = "trace", skip_all)]
3790 pub fn get_owned_coins_iterator_with_cursor(
3791 &self,
3792 owner: IotaAddress,
3793 cursor: (String, ObjectID),
3795 limit: usize,
3796 one_coin_type_only: bool,
3797 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3798 if let Some(indexes) = &self.indexes {
3799 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3800 } else {
3801 Err(IotaError::IndexStoreNotAvailable)
3802 }
3803 }
3804
3805 #[instrument(level = "trace", skip_all)]
3806 pub fn get_owner_objects_iterator(
3807 &self,
3808 owner: IotaAddress,
3809 cursor: Option<ObjectID>,
3811 filter: Option<IotaObjectDataFilter>,
3812 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3813 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3814 if let Some(indexes) = &self.indexes {
3815 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3816 } else {
3817 Err(IotaError::IndexStoreNotAvailable)
3818 }
3819 }
3820
3821 #[instrument(level = "trace", skip_all)]
3822 pub async fn get_move_objects<T>(
3823 &self,
3824 owner: IotaAddress,
3825 type_: MoveObjectType,
3826 ) -> IotaResult<Vec<T>>
3827 where
3828 T: DeserializeOwned,
3829 {
3830 let object_ids = self
3831 .get_owner_objects_iterator(owner, None, None)?
3832 .filter(|o| match &o.type_ {
3833 ObjectType::Struct(s) => &type_ == s,
3834 ObjectType::Package => false,
3835 })
3836 .map(|info| ObjectKey(info.object_id, info.version))
3837 .collect::<Vec<_>>();
3838 let mut move_objects = vec![];
3839
3840 let objects = self
3841 .get_object_store()
3842 .try_multi_get_objects_by_key(&object_ids)?;
3843
3844 for (o, id) in objects.into_iter().zip(object_ids) {
3845 let object = o.ok_or_else(|| {
3846 IotaError::from(UserInputError::ObjectNotFound {
3847 object_id: id.0,
3848 version: Some(id.1),
3849 })
3850 })?;
3851 let move_object = object.data.try_as_move().ok_or_else(|| {
3852 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3853 })?;
3854 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3855 IotaError::ObjectDeserialization {
3856 error: format!("{e}"),
3857 }
3858 })?);
3859 }
3860 Ok(move_objects)
3861 }
3862
3863 #[instrument(level = "trace", skip_all)]
3864 pub fn get_dynamic_fields(
3865 &self,
3866 owner: ObjectID,
3867 cursor: Option<ObjectID>,
3869 limit: usize,
3870 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3871 Ok(self
3872 .get_dynamic_fields_iterator(owner, cursor)?
3873 .take(limit)
3874 .collect::<Result<Vec<_>, _>>()?)
3875 }
3876
3877 fn get_dynamic_fields_iterator(
3878 &self,
3879 owner: ObjectID,
3880 cursor: Option<ObjectID>,
3882 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3883 {
3884 if let Some(indexes) = &self.indexes {
3885 indexes.get_dynamic_fields_iterator(owner, cursor)
3886 } else {
3887 Err(IotaError::IndexStoreNotAvailable)
3888 }
3889 }
3890
3891 #[instrument(level = "trace", skip_all)]
3892 pub fn get_dynamic_field_object_id(
3893 &self,
3894 owner: ObjectID,
3895 name_type: TypeTag,
3896 name_bcs_bytes: &[u8],
3897 ) -> IotaResult<Option<ObjectID>> {
3898 if let Some(indexes) = &self.indexes {
3899 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3900 } else {
3901 Err(IotaError::IndexStoreNotAvailable)
3902 }
3903 }
3904
3905 #[instrument(level = "trace", skip_all)]
3906 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3907 Ok(self.get_indexes()?.next_sequence_number())
3908 }
3909
3910 #[instrument(level = "trace", skip_all)]
3911 pub async fn get_executed_transaction_and_effects(
3912 &self,
3913 digest: TransactionDigest,
3914 kv_store: Arc<TransactionKeyValueStore>,
3915 ) -> IotaResult<(Transaction, TransactionEffects)> {
3916 let transaction = kv_store.get_tx(digest).await?;
3917 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3918 Ok((transaction, effects))
3919 }
3920
3921 #[instrument(level = "trace", skip_all)]
3922 pub fn multi_get_checkpoint_by_sequence_number(
3923 &self,
3924 sequence_numbers: &[CheckpointSequenceNumber],
3925 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3926 Ok(self
3927 .checkpoint_store
3928 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3929 }
3930
3931 #[instrument(level = "trace", skip_all)]
3932 pub fn get_transaction_events(
3933 &self,
3934 digest: &TransactionEventsDigest,
3935 ) -> IotaResult<TransactionEvents> {
3936 self.get_transaction_cache_reader()
3937 .try_get_events(digest)?
3938 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3939 }
3940
3941 pub fn get_transaction_input_objects(
3942 &self,
3943 effects: &TransactionEffects,
3944 ) -> anyhow::Result<Vec<Object>> {
3945 iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
3946 .map_err(Into::into)
3947 }
3948
3949 pub fn get_transaction_output_objects(
3950 &self,
3951 effects: &TransactionEffects,
3952 ) -> anyhow::Result<Vec<Object>> {
3953 iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
3954 .map_err(Into::into)
3955 }
3956
3957 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3958 match &self.indexes {
3959 Some(i) => Ok(i.clone()),
3960 None => Err(IotaError::UnsupportedFeature {
3961 error: "extended object indexing is not enabled on this server".into(),
3962 }),
3963 }
3964 }
3965
3966 pub async fn get_transactions_for_tests(
3967 self: &Arc<Self>,
3968 filter: Option<TransactionFilter>,
3969 cursor: Option<TransactionDigest>,
3970 limit: Option<usize>,
3971 reverse: bool,
3972 ) -> IotaResult<Vec<TransactionDigest>> {
3973 let metrics = KeyValueStoreMetrics::new_for_tests();
3974 let kv_store = Arc::new(TransactionKeyValueStore::new(
3975 "rocksdb",
3976 metrics,
3977 self.clone(),
3978 ));
3979 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3980 .await
3981 }
3982
3983 #[instrument(level = "trace", skip_all)]
3984 pub async fn get_transactions(
3985 &self,
3986 kv_store: &Arc<TransactionKeyValueStore>,
3987 filter: Option<TransactionFilter>,
3988 cursor: Option<TransactionDigest>,
3990 limit: Option<usize>,
3991 reverse: bool,
3992 ) -> IotaResult<Vec<TransactionDigest>> {
3993 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3994 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3995 let iter = checkpoint_contents.iter().map(|c| c.transaction);
3996 if reverse {
3997 let iter = iter
3998 .rev()
3999 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4000 .skip(usize::from(cursor.is_some()));
4001 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4002 } else {
4003 let iter = iter
4004 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4005 .skip(usize::from(cursor.is_some()));
4006 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4007 }
4008 }
4009 self.get_indexes()?
4010 .get_transactions(filter, cursor, limit, reverse)
4011 }
4012
4013 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4014 &self.checkpoint_store
4015 }
4016
4017 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4018 self.get_checkpoint_store()
4019 .get_highest_executed_checkpoint_seq_number()?
4020 .ok_or(IotaError::UserInput {
4021 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4022 })
4023 }
4024
4025 #[cfg(msim)]
4026 pub fn get_highest_pruned_checkpoint_for_testing(
4027 &self,
4028 ) -> IotaResult<CheckpointSequenceNumber> {
4029 self.database_for_testing()
4030 .perpetual_tables
4031 .get_highest_pruned_checkpoint()
4032 .map(|c| c.unwrap_or(0))
4033 .map_err(Into::into)
4034 }
4035
4036 #[instrument(level = "trace", skip_all)]
4037 pub fn get_checkpoint_summary_by_sequence_number(
4038 &self,
4039 sequence_number: CheckpointSequenceNumber,
4040 ) -> IotaResult<CheckpointSummary> {
4041 let verified_checkpoint = self
4042 .get_checkpoint_store()
4043 .get_checkpoint_by_sequence_number(sequence_number)?;
4044 match verified_checkpoint {
4045 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4046 None => Err(IotaError::UserInput {
4047 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4048 }),
4049 }
4050 }
4051
4052 #[instrument(level = "trace", skip_all)]
4053 pub fn get_checkpoint_summary_by_digest(
4054 &self,
4055 digest: CheckpointDigest,
4056 ) -> IotaResult<CheckpointSummary> {
4057 let verified_checkpoint = self
4058 .get_checkpoint_store()
4059 .get_checkpoint_by_digest(&digest)?;
4060 match verified_checkpoint {
4061 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4062 None => Err(IotaError::UserInput {
4063 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4064 }),
4065 }
4066 }
4067
4068 #[instrument(level = "trace", skip_all)]
4069 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4070 if is_system_package(package_id) {
4071 return self.find_genesis_txn_digest();
4072 }
4073 Ok(self
4074 .get_object_read(&package_id)?
4075 .into_object()?
4076 .previous_transaction)
4077 }
4078
4079 #[instrument(level = "trace", skip_all)]
4080 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4081 let summary = self
4082 .get_verified_checkpoint_by_sequence_number(0)?
4083 .into_message();
4084 let content = self.get_checkpoint_contents(summary.content_digest)?;
4085 let genesis_transaction = content.enumerate_transactions(&summary).next();
4086 Ok(genesis_transaction
4087 .ok_or(IotaError::UserInput {
4088 error: UserInputError::GenesisTransactionNotFound,
4089 })?
4090 .1
4091 .transaction)
4092 }
4093
4094 #[instrument(level = "trace", skip_all)]
4095 pub fn get_verified_checkpoint_by_sequence_number(
4096 &self,
4097 sequence_number: CheckpointSequenceNumber,
4098 ) -> IotaResult<VerifiedCheckpoint> {
4099 let verified_checkpoint = self
4100 .get_checkpoint_store()
4101 .get_checkpoint_by_sequence_number(sequence_number)?;
4102 match verified_checkpoint {
4103 Some(verified_checkpoint) => Ok(verified_checkpoint),
4104 None => Err(IotaError::UserInput {
4105 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4106 }),
4107 }
4108 }
4109
4110 #[instrument(level = "trace", skip_all)]
4111 pub fn get_verified_checkpoint_summary_by_digest(
4112 &self,
4113 digest: CheckpointDigest,
4114 ) -> IotaResult<VerifiedCheckpoint> {
4115 let verified_checkpoint = self
4116 .get_checkpoint_store()
4117 .get_checkpoint_by_digest(&digest)?;
4118 match verified_checkpoint {
4119 Some(verified_checkpoint) => Ok(verified_checkpoint),
4120 None => Err(IotaError::UserInput {
4121 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4122 }),
4123 }
4124 }
4125
4126 #[instrument(level = "trace", skip_all)]
4127 pub fn get_checkpoint_contents(
4128 &self,
4129 digest: CheckpointContentsDigest,
4130 ) -> IotaResult<CheckpointContents> {
4131 self.get_checkpoint_store()
4132 .get_checkpoint_contents(&digest)?
4133 .ok_or(IotaError::UserInput {
4134 error: UserInputError::CheckpointContentsNotFound(digest),
4135 })
4136 }
4137
4138 #[instrument(level = "trace", skip_all)]
4139 pub fn get_checkpoint_contents_by_sequence_number(
4140 &self,
4141 sequence_number: CheckpointSequenceNumber,
4142 ) -> IotaResult<CheckpointContents> {
4143 let verified_checkpoint = self
4144 .get_checkpoint_store()
4145 .get_checkpoint_by_sequence_number(sequence_number)?;
4146 match verified_checkpoint {
4147 Some(verified_checkpoint) => {
4148 let content_digest = verified_checkpoint.into_inner().content_digest;
4149 self.get_checkpoint_contents(content_digest)
4150 }
4151 None => Err(IotaError::UserInput {
4152 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4153 }),
4154 }
4155 }
4156
4157 #[instrument(level = "trace", skip_all)]
4158 pub async fn query_events(
4159 &self,
4160 kv_store: &Arc<TransactionKeyValueStore>,
4161 query: EventFilter,
4162 cursor: Option<EventID>,
4164 limit: usize,
4165 descending: bool,
4166 ) -> IotaResult<Vec<IotaEvent>> {
4167 let index_store = self.get_indexes()?;
4168
4169 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4171 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4172 IotaError::TransactionNotFound {
4173 digest: cursor.tx_digest,
4174 },
4175 )?;
4176 (tx_seq, cursor.event_seq as usize)
4177 } else if descending {
4178 (u64::MAX, usize::MAX)
4179 } else {
4180 (0, 0)
4181 };
4182
4183 let limit = limit + 1;
4184 let mut event_keys = match query {
4185 EventFilter::All(filters) => {
4186 if filters.is_empty() {
4187 index_store.all_events(tx_num, event_num, limit, descending)?
4188 } else {
4189 return Err(IotaError::UserInput {
4190 error: UserInputError::Unsupported(
4191 "This query type does not currently support filter combinations"
4192 .to_string(),
4193 ),
4194 });
4195 }
4196 }
4197 EventFilter::Transaction(digest) => {
4198 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4199 }
4200 EventFilter::MoveModule { package, module } => {
4201 let module_id = ModuleId::new(package.into(), module);
4202 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4203 }
4204 EventFilter::MoveEventType(struct_name) => index_store
4205 .events_by_move_event_struct_name(
4206 &struct_name,
4207 tx_num,
4208 event_num,
4209 limit,
4210 descending,
4211 )?,
4212 EventFilter::Sender(sender) => {
4213 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4214 }
4215 EventFilter::TimeRange {
4216 start_time,
4217 end_time,
4218 } => index_store
4219 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4220 EventFilter::MoveEventModule { package, module } => index_store
4221 .events_by_move_event_module(
4222 &ModuleId::new(package.into(), module),
4223 tx_num,
4224 event_num,
4225 limit,
4226 descending,
4227 )?,
4228 EventFilter::Package(_)
4230 | EventFilter::MoveEventField { .. }
4231 | EventFilter::Any(_)
4232 | EventFilter::And(_, _)
4233 | EventFilter::Or(_, _) => {
4234 return Err(IotaError::UserInput {
4235 error: UserInputError::Unsupported(
4236 "This query type is not supported by the full node.".to_string(),
4237 ),
4238 });
4239 }
4240 };
4241
4242 if cursor.is_some() {
4245 if !event_keys.is_empty() {
4246 event_keys.remove(0);
4247 }
4248 } else {
4249 event_keys.truncate(limit - 1);
4250 }
4251
4252 let transaction_digests = event_keys
4254 .iter()
4255 .map(|(_, digest, _, _)| *digest)
4256 .collect::<HashSet<_>>()
4257 .into_iter()
4258 .collect::<Vec<_>>();
4259
4260 let events = kv_store
4261 .multi_get_events_by_tx_digests(&transaction_digests)
4262 .await?;
4263
4264 let events_map: HashMap<_, _> =
4265 transaction_digests.iter().zip(events.into_iter()).collect();
4266
4267 let stored_events = event_keys
4268 .into_iter()
4269 .map(|k| {
4270 (
4271 k,
4272 events_map
4273 .get(&k.1)
4274 .expect("fetched digest is missing")
4275 .clone()
4276 .and_then(|e| e.data.get(k.2).cloned()),
4277 )
4278 })
4279 .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4280 event
4281 .map(|e| (e, tx_digest, event_seq, timestamp))
4282 .ok_or(IotaError::TransactionEventsNotFound { digest })
4283 })
4284 .collect::<Result<Vec<_>, _>>()?;
4285
4286 let epoch_store = self.load_epoch_store_one_call_per_task();
4287 let backing_store = self.get_backing_package_store().as_ref();
4288 let mut layout_resolver = epoch_store
4289 .executor()
4290 .type_layout_resolver(Box::new(backing_store));
4291 let mut events = vec![];
4292 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4293 events.push(IotaEvent::try_from(
4294 e.clone(),
4295 tx_digest,
4296 event_seq as u64,
4297 Some(timestamp),
4298 layout_resolver.get_annotated_layout(&e.type_)?,
4299 )?)
4300 }
4301 Ok(events)
4302 }
4303
4304 pub async fn insert_genesis_object(&self, object: Object) {
4305 self.get_reconfig_api()
4306 .try_insert_genesis_object(object)
4307 .expect("Cannot insert genesis object")
4308 }
4309
4310 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4311 futures::future::join_all(
4312 objects
4313 .iter()
4314 .map(|o| self.insert_genesis_object(o.clone())),
4315 )
4316 .await;
4317 }
4318
4319 #[instrument(level = "trace", skip_all)]
4321 pub fn get_transaction_status(
4322 &self,
4323 transaction_digest: &TransactionDigest,
4324 epoch_store: &Arc<AuthorityPerEpochStore>,
4325 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4326 if let Some(effects) =
4328 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4329 {
4330 if let Some(transaction) = self
4331 .get_transaction_cache_reader()
4332 .try_get_transaction_block(transaction_digest)?
4333 {
4334 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4335 let events = if let Some(digest) = effects.events_digest() {
4336 self.get_transaction_events(digest)?
4337 } else {
4338 TransactionEvents::default()
4339 };
4340 return Ok(Some((
4341 (*transaction).clone().into_message(),
4342 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4343 )));
4344 } else {
4345 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4350 }
4351 }
4352 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4353 self.metrics.tx_already_processed.inc();
4354 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4355 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4356 } else {
4357 Ok(None)
4358 }
4359 }
4360
4361 #[instrument(level = "trace", skip_all)]
4365 pub fn get_signed_effects_and_maybe_resign(
4366 &self,
4367 transaction_digest: &TransactionDigest,
4368 epoch_store: &Arc<AuthorityPerEpochStore>,
4369 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4370 let effects = self
4371 .get_transaction_cache_reader()
4372 .try_get_executed_effects(transaction_digest)?;
4373 match effects {
4374 Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4375 None => Ok(None),
4376 }
4377 }
4378
4379 #[instrument(level = "trace", skip_all)]
4380 pub(crate) fn sign_effects(
4381 &self,
4382 effects: TransactionEffects,
4383 epoch_store: &Arc<AuthorityPerEpochStore>,
4384 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4385 let tx_digest = *effects.transaction_digest();
4386 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4387 Some(sig) if sig.epoch == epoch_store.epoch() => {
4388 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4389 }
4390 _ => {
4391 debug!(
4414 ?tx_digest,
4415 epoch=?epoch_store.epoch(),
4416 "Re-signing the effects with the current epoch"
4417 );
4418
4419 let sig = AuthoritySignInfo::new(
4420 epoch_store.epoch(),
4421 &effects,
4422 Intent::iota_app(IntentScope::TransactionEffects),
4423 self.name,
4424 &*self.secret,
4425 );
4426
4427 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4428
4429 epoch_store.insert_effects_digest_and_signature(
4430 &tx_digest,
4431 effects.digest(),
4432 &sig,
4433 )?;
4434
4435 effects
4436 }
4437 };
4438
4439 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4440 signed_effects,
4441 ))
4442 }
4443
4444 #[instrument(level = "trace", skip_all)]
4446 fn fullnode_only_get_tx_coins_for_indexing(
4447 &self,
4448 inner_temporary_store: &InnerTemporaryStore,
4449 epoch_store: &Arc<AuthorityPerEpochStore>,
4450 ) -> Option<TxCoins> {
4451 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4452 return None;
4453 }
4454 let written_coin_objects = inner_temporary_store
4455 .written
4456 .iter()
4457 .filter_map(|(k, v)| {
4458 if v.is_coin() {
4459 Some((*k, v.clone()))
4460 } else {
4461 None
4462 }
4463 })
4464 .collect();
4465 let input_coin_objects = inner_temporary_store
4466 .input_objects
4467 .iter()
4468 .filter_map(|(k, v)| {
4469 if v.is_coin() {
4470 Some((*k, v.clone()))
4471 } else {
4472 None
4473 }
4474 })
4475 .collect::<ObjectMap>();
4476 Some((input_coin_objects, written_coin_objects))
4477 }
4478
4479 #[instrument(level = "trace", skip_all)]
4491 pub async fn get_transaction_lock(
4492 &self,
4493 object_ref: &ObjectRef,
4494 epoch_store: &AuthorityPerEpochStore,
4495 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4496 let lock_info = self
4497 .get_object_cache_reader()
4498 .try_get_lock(*object_ref, epoch_store)?;
4499 let lock_info = match lock_info {
4500 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4501 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4502 provided_obj_ref: *object_ref,
4503 current_version: locked_ref.1,
4504 }
4505 .into());
4506 }
4507 ObjectLockStatus::Initialized => {
4508 return Ok(None);
4509 }
4510 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4511 };
4512
4513 epoch_store.get_signed_transaction(&lock_info)
4514 }
4515
4516 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4517 self.get_object_cache_reader().try_get_objects(objects)
4518 }
4519
4520 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4522 self.try_get_objects(objects)
4523 .await
4524 .expect("storage access failed")
4525 }
4526
4527 pub async fn try_get_object_or_tombstone(
4528 &self,
4529 object_id: ObjectID,
4530 ) -> IotaResult<Option<ObjectRef>> {
4531 self.get_object_cache_reader()
4532 .try_get_latest_object_ref_or_tombstone(object_id)
4533 }
4534
4535 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4537 self.try_get_object_or_tombstone(object_id)
4538 .await
4539 .expect("storage access failed")
4540 }
4541
4542 pub fn set_override_protocol_upgrade_buffer_stake(
4552 &self,
4553 expected_epoch: EpochId,
4554 buffer_stake_bps: u64,
4555 ) -> IotaResult {
4556 let epoch_store = self.load_epoch_store_one_call_per_task();
4557 let actual_epoch = epoch_store.epoch();
4558 if actual_epoch != expected_epoch {
4559 return Err(IotaError::WrongEpoch {
4560 expected_epoch,
4561 actual_epoch,
4562 });
4563 }
4564
4565 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4566 }
4567
4568 pub fn clear_override_protocol_upgrade_buffer_stake(
4569 &self,
4570 expected_epoch: EpochId,
4571 ) -> IotaResult {
4572 let epoch_store = self.load_epoch_store_one_call_per_task();
4573 let actual_epoch = epoch_store.epoch();
4574 if actual_epoch != expected_epoch {
4575 return Err(IotaError::WrongEpoch {
4576 expected_epoch,
4577 actual_epoch,
4578 });
4579 }
4580
4581 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4582 }
4583
4584 pub async fn get_available_system_packages(
4588 &self,
4589 binary_config: &BinaryConfig,
4590 ) -> Vec<ObjectRef> {
4591 let mut results = vec![];
4592
4593 let system_packages = BuiltInFramework::iter_system_packages();
4594
4595 #[cfg(msim)]
4597 let extra_packages = framework_injection::get_extra_packages(self.name);
4598 #[cfg(msim)]
4599 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4600
4601 for system_package in system_packages {
4602 let modules = system_package.modules().to_vec();
4603 #[cfg(msim)]
4605 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4606 .unwrap_or(modules);
4607
4608 let Some(obj_ref) = iota_framework::compare_system_package(
4609 &self.get_object_store(),
4610 &system_package.id,
4611 &modules,
4612 system_package.dependencies.to_vec(),
4613 binary_config,
4614 )
4615 .await
4616 else {
4617 return vec![];
4618 };
4619 results.push(obj_ref);
4620 }
4621
4622 results
4623 }
4624
4625 async fn get_system_package_bytes(
4642 &self,
4643 system_packages: Vec<ObjectRef>,
4644 binary_config: &BinaryConfig,
4645 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4646 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4647 let objects = self.get_objects(&ids).await;
4648
4649 let mut res = Vec::with_capacity(system_packages.len());
4650 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4651 let prev_transaction = match object {
4652 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4653 info!("Framework {} does not need updating", system_package_ref.0);
4655 continue;
4656 }
4657
4658 Some(cur_object) => cur_object.previous_transaction,
4659 None => TransactionDigest::genesis_marker(),
4660 };
4661
4662 #[cfg(msim)]
4663 let SystemPackage {
4664 id: _,
4665 bytes,
4666 dependencies,
4667 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4668 .unwrap_or_else(|| {
4669 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4670 });
4671
4672 #[cfg(not(msim))]
4673 let SystemPackage {
4674 id: _,
4675 bytes,
4676 dependencies,
4677 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4678
4679 let modules: Vec<_> = bytes
4680 .iter()
4681 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4682 .collect();
4683
4684 let new_object = Object::new_system_package(
4685 &modules,
4686 system_package_ref.1,
4687 dependencies.clone(),
4688 prev_transaction,
4689 );
4690
4691 let new_ref = new_object.compute_object_reference();
4692 if new_ref != system_package_ref {
4693 error!(
4694 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4695 );
4696 return None;
4697 }
4698
4699 res.push((system_package_ref.1, bytes, dependencies));
4700 }
4701
4702 Some(res)
4703 }
4704
4705 fn is_protocol_version_supported_v1(
4709 proposed_protocol_version: ProtocolVersion,
4710 committee: &Committee,
4711 capabilities: Vec<AuthorityCapabilitiesV1>,
4712 mut buffer_stake_bps: u64,
4713 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4714 if buffer_stake_bps > 10000 {
4715 warn!("clamping buffer_stake_bps to 10000");
4716 buffer_stake_bps = 10000;
4717 }
4718
4719 let mut desired_upgrades: Vec<_> = capabilities
4722 .into_iter()
4723 .filter_map(|mut cap| {
4724 if cap.available_system_packages.is_empty() {
4726 return None;
4727 }
4728
4729 cap.available_system_packages.sort();
4730
4731 info!(
4732 "validator {:?} supports {:?} with system packages: {:?}",
4733 cap.authority.concise(),
4734 cap.supported_protocol_versions,
4735 cap.available_system_packages,
4736 );
4737
4738 cap.supported_protocol_versions
4742 .get_version_digest(proposed_protocol_version)
4743 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4744 })
4745 .collect();
4746
4747 desired_upgrades.sort();
4750 desired_upgrades
4751 .into_iter()
4752 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4753 .into_iter()
4754 .find_map(|((digest, packages), group)| {
4755 assert!(!packages.is_empty());
4757
4758 let mut stake_aggregator: StakeAggregator<(), true> =
4759 StakeAggregator::new(Arc::new(committee.clone()));
4760
4761 for (_, _, authority) in group {
4762 stake_aggregator.insert_generic(authority, ());
4763 }
4764
4765 let total_votes = stake_aggregator.total_votes();
4766 let quorum_threshold = committee.quorum_threshold();
4767 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4768
4769 info!(
4770 protocol_config_digest = ?digest,
4771 ?total_votes,
4772 ?quorum_threshold,
4773 ?buffer_stake_bps,
4774 ?effective_threshold,
4775 ?proposed_protocol_version,
4776 ?packages,
4777 "support for upgrade"
4778 );
4779
4780 let has_support = total_votes >= effective_threshold;
4781 has_support.then_some((proposed_protocol_version, digest, packages))
4782 })
4783 }
4784
4785 fn choose_protocol_version_and_system_packages_v1(
4789 current_protocol_version: ProtocolVersion,
4790 current_protocol_digest: Digest,
4791 committee: &Committee,
4792 capabilities: Vec<AuthorityCapabilitiesV1>,
4793 buffer_stake_bps: u64,
4794 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4795 let mut next_protocol_version = current_protocol_version;
4796 let mut system_packages = vec![];
4797 let mut protocol_version_digest = current_protocol_digest;
4798
4799 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4803 next_protocol_version + 1,
4804 committee,
4805 capabilities.clone(),
4806 buffer_stake_bps,
4807 ) {
4808 next_protocol_version = version;
4809 protocol_version_digest = digest;
4810 system_packages = packages;
4811 }
4812
4813 (
4814 next_protocol_version,
4815 protocol_version_digest,
4816 system_packages,
4817 )
4818 }
4819
4820 fn get_validators_supporting_protocol_version(
4825 target_protocol_version: ProtocolVersion,
4826 target_digest: Digest,
4827 active_validators: &[AuthorityPublicKey],
4828 capabilities: &[AuthorityCapabilitiesV1],
4829 ) -> Vec<u64> {
4830 let mut eligible_validators = Vec::new();
4831
4832 for capability in capabilities {
4833 if let Some(digest) = capability
4835 .supported_protocol_versions
4836 .get_version_digest(target_protocol_version)
4837 {
4838 if digest == target_digest {
4839 if let Some(index) = active_validators
4841 .iter()
4842 .position(|name| AuthorityName::from(name) == capability.authority)
4843 {
4844 eligible_validators.push(index as u64);
4845 }
4846 }
4847 }
4848 }
4849
4850 eligible_validators.sort();
4852 eligible_validators
4853 }
4854
4855 fn calculate_eligible_validators_weight(
4860 eligible_validator_indices: &[u64],
4861 active_validators: &[AuthorityPublicKey],
4862 committee: &Committee,
4863 ) -> u64 {
4864 let mut total_weight = 0u64;
4865
4866 for &index in eligible_validator_indices {
4867 let authority_pubkey = &active_validators[index as usize];
4868 if let Some((_, weight)) = committee
4870 .members()
4871 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4872 {
4873 total_weight += weight;
4874 }
4875 }
4876
4877 total_weight
4878 }
4879
4880 #[instrument(level = "debug", skip_all)]
4881 fn create_authenticator_state_tx(
4882 &self,
4883 epoch_store: &Arc<AuthorityPerEpochStore>,
4884 ) -> Option<EndOfEpochTransactionKind> {
4885 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4886 info!("authenticator state transactions not enabled");
4887 return None;
4888 }
4889
4890 let authenticator_state_exists = epoch_store.authenticator_state_exists();
4891 let tx = if authenticator_state_exists {
4892 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4893 let min_epoch =
4894 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4895 let authenticator_obj_initial_shared_version = epoch_store
4896 .epoch_start_config()
4897 .authenticator_obj_initial_shared_version()
4898 .expect("initial version must exist");
4899
4900 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4901 min_epoch,
4902 authenticator_obj_initial_shared_version,
4903 );
4904
4905 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4906
4907 tx
4908 } else {
4909 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4910 info!("Creating AuthenticatorStateCreate tx");
4911 tx
4912 };
4913 Some(tx)
4914 }
4915
4916 #[instrument(level = "error", skip_all)]
4929 pub async fn create_and_execute_advance_epoch_tx(
4930 &self,
4931 epoch_store: &Arc<AuthorityPerEpochStore>,
4932 gas_cost_summary: &GasCostSummary,
4933 checkpoint: CheckpointSequenceNumber,
4934 epoch_start_timestamp_ms: CheckpointTimestamp,
4935 scores: Vec<u64>,
4936 ) -> anyhow::Result<(
4937 IotaSystemState,
4938 Option<SystemEpochInfoEvent>,
4939 TransactionEffects,
4940 )> {
4941 let mut txns = Vec::new();
4942
4943 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4944 txns.push(tx);
4945 }
4946
4947 let next_epoch = epoch_store.epoch() + 1;
4948
4949 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4950 let authority_capabilities = epoch_store
4951 .get_capabilities_v1()
4952 .expect("read capabilities from db cannot fail");
4953 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4954 Self::choose_protocol_version_and_system_packages_v1(
4955 epoch_store.protocol_version(),
4956 SupportedProtocolVersionsWithHashes::protocol_config_digest(
4957 epoch_store.protocol_config(),
4958 ),
4959 epoch_store.committee(),
4960 authority_capabilities.clone(),
4961 buffer_stake_bps,
4962 );
4963
4964 let config = epoch_store.protocol_config();
4968 let binary_config = to_binary_config(config);
4969 let Some(next_epoch_system_package_bytes) = self
4970 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4971 .await
4972 else {
4973 error!(
4974 "upgraded system packages {:?} are not locally available, cannot create \
4975 ChangeEpochTx. validator binary must be upgraded to the correct version!",
4976 next_epoch_system_packages
4977 );
4978 bail!("missing system packages: cannot form ChangeEpochTx");
4988 };
4989
4990 if config.select_committee_from_eligible_validators() {
4993 let active_validators = epoch_store.epoch_start_state().get_active_validators();
4995
4996 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
4997
4998 if config.select_committee_supporting_next_epoch_version() {
5002 eligible_active_validators = Self::get_validators_supporting_protocol_version(
5003 next_epoch_protocol_version,
5004 next_epoch_protocol_digest,
5005 &active_validators,
5006 &authority_capabilities,
5007 );
5008
5009 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5011 &eligible_active_validators,
5012 &active_validators,
5013 epoch_store.committee(),
5014 );
5015
5016 let committee = epoch_store.committee();
5020 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5021
5022 if eligible_validators_weight < effective_threshold {
5023 error!(
5024 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5025 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5026 );
5027 eligible_active_validators = (0..active_validators.len() as u64).collect();
5030 }
5031 }
5032
5033 if config.pass_validator_scores_to_advance_epoch() {
5036 txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5037 next_epoch,
5038 next_epoch_protocol_version,
5039 gas_cost_summary.storage_cost,
5040 gas_cost_summary.computation_cost,
5041 gas_cost_summary.computation_cost_burned,
5042 gas_cost_summary.storage_rebate,
5043 gas_cost_summary.non_refundable_storage_fee,
5044 epoch_start_timestamp_ms,
5045 next_epoch_system_package_bytes,
5046 eligible_active_validators,
5047 scores,
5048 config.adjust_rewards_by_score(),
5049 ));
5050 } else {
5051 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5052 next_epoch,
5053 next_epoch_protocol_version,
5054 gas_cost_summary.storage_cost,
5055 gas_cost_summary.computation_cost,
5056 gas_cost_summary.computation_cost_burned,
5057 gas_cost_summary.storage_rebate,
5058 gas_cost_summary.non_refundable_storage_fee,
5059 epoch_start_timestamp_ms,
5060 next_epoch_system_package_bytes,
5061 eligible_active_validators,
5062 ));
5063 }
5064 } else if config.protocol_defined_base_fee()
5065 && config.max_committee_members_count_as_option().is_some()
5066 {
5067 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5068 next_epoch,
5069 next_epoch_protocol_version,
5070 gas_cost_summary.storage_cost,
5071 gas_cost_summary.computation_cost,
5072 gas_cost_summary.computation_cost_burned,
5073 gas_cost_summary.storage_rebate,
5074 gas_cost_summary.non_refundable_storage_fee,
5075 epoch_start_timestamp_ms,
5076 next_epoch_system_package_bytes,
5077 ));
5078 } else {
5079 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5080 next_epoch,
5081 next_epoch_protocol_version,
5082 gas_cost_summary.storage_cost,
5083 gas_cost_summary.computation_cost,
5084 gas_cost_summary.storage_rebate,
5085 gas_cost_summary.non_refundable_storage_fee,
5086 epoch_start_timestamp_ms,
5087 next_epoch_system_package_bytes,
5088 ));
5089 }
5090
5091 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5092
5093 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5094 tx.clone(),
5095 epoch_store.epoch(),
5096 checkpoint,
5097 );
5098
5099 let tx_digest = executable_tx.digest();
5100
5101 info!(
5102 ?next_epoch,
5103 ?next_epoch_protocol_version,
5104 ?next_epoch_system_packages,
5105 computation_cost=?gas_cost_summary.computation_cost,
5106 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5107 storage_cost=?gas_cost_summary.storage_cost,
5108 storage_rebate=?gas_cost_summary.storage_rebate,
5109 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5110 ?tx_digest,
5111 "Creating advance epoch transaction"
5112 );
5113
5114 fail_point_async!("change_epoch_tx_delay");
5115 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5116
5117 if self
5121 .get_transaction_cache_reader()
5122 .try_is_tx_already_executed(tx_digest)?
5123 {
5124 warn!("change epoch tx has already been executed via state sync");
5125 bail!("change epoch tx has already been executed via state sync",);
5126 }
5127
5128 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5129
5130 epoch_store.assign_shared_object_versions_idempotent(
5134 self.get_object_cache_reader().as_ref(),
5135 std::slice::from_ref(&executable_tx),
5136 )?;
5137
5138 let (input_objects, _, _) =
5139 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5140
5141 let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5142 &execution_guard,
5143 &executable_tx,
5144 input_objects,
5145 None,
5146 None,
5147 epoch_store,
5148 )?;
5149 let system_obj = get_iota_system_state(&temporary_store.written)
5150 .expect("change epoch tx must write to system object");
5151 let system_epoch_info_event = temporary_store
5153 .events
5154 .data
5155 .into_iter()
5156 .find(|event| event.is_system_epoch_info_event())
5157 .map(SystemEpochInfoEvent::from);
5158 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5161
5162 self.get_state_sync_store()
5166 .try_insert_transaction_and_effects(&tx, &effects)
5167 .map_err(|err| {
5168 let err: anyhow::Error = err.into();
5169 err
5170 })?;
5171
5172 info!(
5173 "Effects summary of the change epoch transaction: {:?}",
5174 effects.summary_for_debug()
5175 );
5176 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5177 assert!(effects.status().is_ok());
5179 Ok((system_obj, system_epoch_info_event, effects))
5180 }
5181
5182 #[instrument(level = "error", skip_all)]
5186 async fn revert_uncommitted_epoch_transactions(
5187 &self,
5188 epoch_store: &AuthorityPerEpochStore,
5189 ) -> IotaResult {
5190 {
5191 let state = epoch_store.get_reconfig_state_write_lock_guard();
5192 if state.should_accept_user_certs() {
5193 epoch_store.close_user_certs(state);
5202 }
5203 }
5205 let pending_certificates = epoch_store.pending_consensus_certificates();
5206 info!(
5207 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5208 pending_certificates.len(),
5209 pending_certificates,
5210 );
5211 for digest in pending_certificates {
5212 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5213 info!(
5214 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5215 digest
5216 );
5217 continue;
5218 }
5219 info!("Reverting {:?} at the end of epoch", digest);
5220 epoch_store.revert_executed_transaction(&digest)?;
5221 self.get_reconfig_api().try_revert_state_update(&digest)?;
5222 }
5223 info!("All uncommitted local transactions reverted");
5224 Ok(())
5225 }
5226
5227 #[instrument(level = "error", skip_all)]
5228 async fn reopen_epoch_db(
5229 &self,
5230 cur_epoch_store: &AuthorityPerEpochStore,
5231 new_committee: Committee,
5232 epoch_start_configuration: EpochStartConfiguration,
5233 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5234 epoch_last_checkpoint: CheckpointSequenceNumber,
5235 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5236 let new_epoch = new_committee.epoch;
5237 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5238 assert_eq!(
5239 epoch_start_configuration.epoch_start_state().epoch(),
5240 new_committee.epoch
5241 );
5242 fail_point!("before-open-new-epoch-store");
5243 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5244 self.name,
5245 new_committee,
5246 epoch_start_configuration,
5247 self.get_backing_package_store().clone(),
5248 self.get_object_store().clone(),
5249 expensive_safety_check_config,
5250 epoch_last_checkpoint,
5251 )?;
5252 self.epoch_store.store(new_epoch_store.clone());
5253 Ok(new_epoch_store)
5254 }
5255
5256 fn check_move_account(
5259 &self,
5260 auth_account_object_id: ObjectID,
5261 auth_account_object_seq_number: Option<SequenceNumber>,
5262 auth_account_object_digest: Option<ObjectDigest>,
5263 account_object: ObjectReadResult,
5264 signer: &IotaAddress,
5265 ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5266 let account_object = match account_object.object {
5267 ObjectReadResultKind::Object(object) => Ok(object),
5268 ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5269 Err(UserInputError::AccountObjectDeleted {
5270 account_id: account_object.id(),
5271 account_version: version,
5272 transaction_digest: digest,
5273 })
5274 }
5275 ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5278 Err(UserInputError::AccountObjectInCanceledTransaction {
5279 account_id: account_object.id(),
5280 account_version: version,
5281 })
5282 }
5283 }?;
5284
5285 let account_object_addr = IotaAddress::from(auth_account_object_id);
5286
5287 fp_ensure!(
5288 signer == &account_object_addr,
5289 UserInputError::IncorrectUserSignature {
5290 error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5291 }
5292 .into()
5293 );
5294
5295 fp_ensure!(
5296 account_object.is_shared() || account_object.is_immutable(),
5297 UserInputError::AccountObjectNotSupported {
5298 object_id: auth_account_object_id
5299 }
5300 .into()
5301 );
5302
5303 let auth_account_object_seq_number =
5304 if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5305 let account_object_version = account_object.version();
5306
5307 fp_ensure!(
5308 account_object_version == auth_account_object_seq_number,
5309 UserInputError::AccountObjectVersionMismatch {
5310 object_id: auth_account_object_id,
5311 expected_version: auth_account_object_seq_number,
5312 actual_version: account_object_version,
5313 }
5314 .into()
5315 );
5316
5317 auth_account_object_seq_number
5318 } else {
5319 account_object.version()
5320 };
5321
5322 if let Some(auth_account_object_digest) = auth_account_object_digest {
5323 let expected_digest = account_object.digest();
5324 fp_ensure!(
5325 expected_digest == auth_account_object_digest,
5326 UserInputError::InvalidAccountObjectDigest {
5327 object_id: auth_account_object_id,
5328 expected_digest,
5329 actual_digest: auth_account_object_digest,
5330 }
5331 .into()
5332 );
5333 }
5334
5335 let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5336 auth_account_object_id,
5337 &AuthenticatorFunctionRefV1Key::tag().into(),
5338 &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5339 )
5340 .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5341 account_object_id: auth_account_object_id,
5342 })?;
5343
5344 let authenticator_function_ref_field = self
5345 .get_object_cache_reader()
5346 .try_find_object_lt_or_eq_version(
5347 authenticator_function_ref_field_id,
5348 auth_account_object_seq_number,
5349 )?;
5350
5351 if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5352 let field_move_object = authenticator_function_ref_field_obj
5353 .data
5354 .try_as_move()
5355 .expect("dynamic field should never be a package object");
5356
5357 let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5358 field_move_object.to_rust().ok_or(
5359 UserInputError::InvalidAuthenticatorFunctionRefField {
5360 account_object_id: auth_account_object_id,
5361 },
5362 )?;
5363
5364 Ok(AuthenticatorFunctionRefForExecution::new_v1(
5365 field.value,
5366 authenticator_function_ref_field_obj.compute_object_reference(),
5367 authenticator_function_ref_field_obj.owner,
5368 authenticator_function_ref_field_obj.storage_rebate,
5369 authenticator_function_ref_field_obj.previous_transaction,
5370 ))
5371 } else {
5372 Err(UserInputError::MoveAuthenticatorNotFound {
5373 authenticator_function_ref_id: authenticator_function_ref_field_id,
5374 account_object_id: auth_account_object_id,
5375 account_object_version: auth_account_object_seq_number,
5376 }
5377 .into())
5378 }
5379 }
5380
5381 fn read_objects_for_signing(
5382 &self,
5383 transaction: &VerifiedTransaction,
5384 epoch: u64,
5385 ) -> IotaResult<(
5386 InputObjects,
5387 ReceivingObjects,
5388 Option<InputObjects>,
5389 Option<ObjectReadResult>,
5390 )> {
5391 let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5392 Some(transaction.digest()),
5393 &transaction.collect_all_input_object_kind_for_reading()?,
5394 &transaction.data().transaction_data().receiving_objects(),
5395 epoch,
5396 )?;
5397
5398 transaction
5399 .split_input_objects_into_groups_for_reading(input_objects)
5400 .map(|(tx_input_objects, auth_input_objects, account_object)| {
5401 (
5402 tx_input_objects,
5403 tx_receiving_objects,
5404 auth_input_objects,
5405 account_object,
5406 )
5407 })
5408 }
5409
5410 fn check_transaction_inputs_for_signing(
5411 &self,
5412 protocol_config: &ProtocolConfig,
5413 reference_gas_price: u64,
5414 tx_data: &TransactionData,
5415 tx_input_objects: InputObjects,
5416 tx_receiving_objects: &ReceivingObjects,
5417 move_authenticator: Option<&MoveAuthenticator>,
5418 auth_input_objects: Option<InputObjects>,
5419 account_object: Option<ObjectReadResult>,
5420 ) -> IotaResult<(
5421 IotaGasStatus,
5422 CheckedInputObjects,
5423 Option<CheckedInputObjects>,
5424 Option<AuthenticatorFunctionRef>,
5425 )> {
5426 let (
5427 auth_checked_input_objects_union,
5428 authenticator_function_ref,
5429 authenticator_gas_budget,
5430 ) = if let Some(move_authenticator) = move_authenticator {
5431 let auth_input_objects =
5432 auth_input_objects.expect("MoveAuthenticator input objects must be provided");
5433 let account_object = account_object.expect("Move account object must be provided");
5434
5435 let (
5437 auth_account_object_id,
5438 auth_account_object_seq_number,
5439 auth_account_object_digest,
5440 ) = move_authenticator.object_to_authenticate_components()?;
5441
5442 let AuthenticatorFunctionRefForExecution {
5444 authenticator_function_ref,
5445 ..
5446 } = self.check_move_account(
5447 auth_account_object_id,
5448 auth_account_object_seq_number,
5449 auth_account_object_digest,
5450 account_object,
5451 &tx_data.sender(),
5452 )?;
5453
5454 let auth_checked_input_objects =
5456 iota_transaction_checks::check_move_authenticator_input_for_signing(
5457 auth_input_objects,
5458 )?;
5459
5460 let authenticator_gas_budget = protocol_config.max_auth_gas();
5463
5464 (
5465 Some(auth_checked_input_objects),
5466 Some(authenticator_function_ref),
5467 authenticator_gas_budget,
5468 )
5469 } else {
5470 (None, None, 0)
5471 };
5472
5473 let (gas_status, tx_checked_input_objects) =
5475 iota_transaction_checks::check_transaction_input(
5476 protocol_config,
5477 reference_gas_price,
5478 tx_data,
5479 tx_input_objects,
5480 tx_receiving_objects,
5481 &self.metrics.bytecode_verifier_metrics,
5482 &self.config.verifier_signing_config,
5483 authenticator_gas_budget,
5484 )?;
5485
5486 Ok((
5487 gas_status,
5488 tx_checked_input_objects,
5489 auth_checked_input_objects_union,
5490 authenticator_function_ref,
5491 ))
5492 }
5493
5494 #[cfg(test)]
5495 pub(crate) fn iter_live_object_set_for_testing(
5496 &self,
5497 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5498 self.get_accumulator_store()
5499 .iter_cached_live_object_set_for_testing()
5500 }
5501
5502 #[cfg(test)]
5503 pub(crate) fn shutdown_execution_for_test(&self) {
5504 self.tx_execution_shutdown
5505 .lock()
5506 .take()
5507 .unwrap()
5508 .send(())
5509 .unwrap();
5510 }
5511
5512 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5515 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5516 self.get_object_cache_reader()
5517 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5518 self.get_reconfig_api()
5519 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5520 }
5521}
5522
5523pub struct RandomnessRoundReceiver {
5524 authority_state: Arc<AuthorityState>,
5525 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5526}
5527
5528impl RandomnessRoundReceiver {
5529 pub fn spawn(
5530 authority_state: Arc<AuthorityState>,
5531 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5532 ) -> JoinHandle<()> {
5533 let rrr = RandomnessRoundReceiver {
5534 authority_state,
5535 randomness_rx,
5536 };
5537 spawn_monitored_task!(rrr.run())
5538 }
5539
5540 async fn run(mut self) {
5541 info!("RandomnessRoundReceiver event loop started");
5542
5543 loop {
5544 tokio::select! {
5545 maybe_recv = self.randomness_rx.recv() => {
5546 if let Some((epoch, round, bytes)) = maybe_recv {
5547 self.handle_new_randomness(epoch, round, bytes);
5548 } else {
5549 break;
5550 }
5551 },
5552 }
5553 }
5554
5555 info!("RandomnessRoundReceiver event loop ended");
5556 }
5557
5558 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5559 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5560 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5561 if epoch_store.epoch() != epoch {
5562 warn!(
5563 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5564 epoch_store.epoch()
5565 );
5566 return;
5567 }
5568 let transaction = VerifiedTransaction::new_randomness_state_update(
5569 epoch,
5570 round,
5571 bytes,
5572 epoch_store
5573 .epoch_start_config()
5574 .randomness_obj_initial_shared_version(),
5575 );
5576 debug!(
5577 "created randomness state update transaction with digest: {:?}",
5578 transaction.digest()
5579 );
5580 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5581 let digest = *transaction.digest();
5582
5583 self.authority_state
5588 .get_cache_commit()
5589 .persist_transaction(&transaction);
5590
5591 self.authority_state
5593 .transaction_manager()
5594 .enqueue(vec![transaction], &epoch_store);
5595
5596 let authority_state = self.authority_state.clone();
5597 spawn_monitored_task!(async move {
5598 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5607 let result = tokio::time::timeout(
5608 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5609 authority_state
5610 .get_transaction_cache_reader()
5611 .try_notify_read_executed_effects(&[digest]),
5612 )
5613 .await;
5614 let result = match result {
5615 Ok(result) => result,
5616 Err(_) => {
5617 if cfg!(debug_assertions) {
5618 panic!(
5620 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5621 );
5622 }
5623 warn!(
5624 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5625 );
5626 authority_state
5628 .get_transaction_cache_reader()
5629 .try_notify_read_executed_effects(&[digest])
5630 .await
5631 }
5632 };
5633
5634 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5635 let effects = effects.pop().expect("should return effects");
5636 if *effects.status() != ExecutionStatus::Success {
5637 fatal!(
5638 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5639 );
5640 }
5641 debug!(
5642 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5643 );
5644 });
5645 }
5646}
5647
5648#[async_trait]
5649impl TransactionKeyValueStoreTrait for AuthorityState {
5650 async fn multi_get(
5651 &self,
5652 transaction_keys: &[TransactionDigest],
5653 effects_keys: &[TransactionDigest],
5654 ) -> IotaResult<KVStoreTransactionData> {
5655 let txns = if !transaction_keys.is_empty() {
5656 self.get_transaction_cache_reader()
5657 .try_multi_get_transaction_blocks(transaction_keys)?
5658 .into_iter()
5659 .map(|t| t.map(|t| (*t).clone().into_inner()))
5660 .collect()
5661 } else {
5662 vec![]
5663 };
5664
5665 let fx = if !effects_keys.is_empty() {
5666 self.get_transaction_cache_reader()
5667 .try_multi_get_executed_effects(effects_keys)?
5668 } else {
5669 vec![]
5670 };
5671
5672 Ok((txns, fx))
5673 }
5674
5675 async fn multi_get_checkpoints(
5676 &self,
5677 checkpoint_summaries: &[CheckpointSequenceNumber],
5678 checkpoint_contents: &[CheckpointSequenceNumber],
5679 checkpoint_summaries_by_digest: &[CheckpointDigest],
5680 ) -> IotaResult<(
5681 Vec<Option<CertifiedCheckpointSummary>>,
5682 Vec<Option<CheckpointContents>>,
5683 Vec<Option<CertifiedCheckpointSummary>>,
5684 )> {
5685 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5687 let store = self.get_checkpoint_store();
5688 for seq in checkpoint_summaries {
5689 let checkpoint = store
5690 .get_checkpoint_by_sequence_number(*seq)?
5691 .map(|c| c.into_inner());
5692
5693 summaries.push(checkpoint);
5694 }
5695
5696 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5697 for seq in checkpoint_contents {
5698 let checkpoint = store
5699 .get_checkpoint_by_sequence_number(*seq)?
5700 .and_then(|summary| {
5701 store
5702 .get_checkpoint_contents(&summary.content_digest)
5703 .expect("db read cannot fail")
5704 });
5705 contents.push(checkpoint);
5706 }
5707
5708 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5709 for digest in checkpoint_summaries_by_digest {
5710 let checkpoint = store
5711 .get_checkpoint_by_digest(digest)?
5712 .map(|c| c.into_inner());
5713 summaries_by_digest.push(checkpoint);
5714 }
5715
5716 Ok((summaries, contents, summaries_by_digest))
5717 }
5718
5719 async fn get_transaction_perpetual_checkpoint(
5720 &self,
5721 digest: TransactionDigest,
5722 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5723 self.get_checkpoint_cache()
5724 .try_get_transaction_perpetual_checkpoint(&digest)
5725 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5726 }
5727
5728 async fn get_object(
5729 &self,
5730 object_id: ObjectID,
5731 version: VersionNumber,
5732 ) -> IotaResult<Option<Object>> {
5733 self.get_object_cache_reader()
5734 .try_get_object_by_key(&object_id, version)
5735 }
5736
5737 async fn multi_get_transactions_perpetual_checkpoints(
5738 &self,
5739 digests: &[TransactionDigest],
5740 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5741 let res = self
5742 .get_checkpoint_cache()
5743 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5744
5745 Ok(res
5746 .into_iter()
5747 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5748 .collect())
5749 }
5750
5751 #[instrument(skip(self))]
5752 async fn multi_get_events_by_tx_digests(
5753 &self,
5754 digests: &[TransactionDigest],
5755 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5756 if digests.is_empty() {
5757 return Ok(vec![]);
5758 }
5759 let events_digests: Vec<_> = self
5760 .get_transaction_cache_reader()
5761 .try_multi_get_executed_effects(digests)?
5762 .into_iter()
5763 .map(|t| t.and_then(|t| t.events_digest().cloned()))
5764 .collect();
5765 let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5766 let mut events = self
5767 .get_transaction_cache_reader()
5768 .try_multi_get_events(&non_empty_events)?
5769 .into_iter();
5770 Ok(events_digests
5771 .into_iter()
5772 .map(|ev| ev.and_then(|_| events.next()?))
5773 .collect())
5774 }
5775}
5776
5777#[cfg(msim)]
5778pub mod framework_injection {
5779 use std::{
5780 cell::RefCell,
5781 collections::{BTreeMap, BTreeSet},
5782 };
5783
5784 use iota_framework::{BuiltInFramework, SystemPackage};
5785 use iota_types::{
5786 base_types::{AuthorityName, ObjectID},
5787 is_system_package,
5788 };
5789 use move_binary_format::CompiledModule;
5790
5791 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5792
5793 thread_local! {
5795 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5796 }
5797
5798 type Framework = Vec<CompiledModule>;
5799
5800 pub type PackageUpgradeCallback =
5801 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5802
5803 enum PackageOverrideConfig {
5804 Global(Framework),
5805 PerValidator(PackageUpgradeCallback),
5806 }
5807
5808 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5809 modules
5810 .iter()
5811 .map(|m| {
5812 let mut buf = Vec::new();
5813 m.serialize_with_version(m.version, &mut buf).unwrap();
5814 buf
5815 })
5816 .collect()
5817 }
5818
5819 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5820 OVERRIDE.with(|bs| {
5821 bs.borrow_mut()
5822 .insert(package_id, PackageOverrideConfig::Global(modules))
5823 });
5824 }
5825
5826 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5827 OVERRIDE.with(|bs| {
5828 bs.borrow_mut()
5829 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5830 });
5831 }
5832
5833 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5834 OVERRIDE.with(|cfg| {
5835 cfg.borrow().get(package_id).and_then(|entry| match entry {
5836 PackageOverrideConfig::Global(framework) => {
5837 Some(compiled_modules_to_bytes(framework))
5838 }
5839 PackageOverrideConfig::PerValidator(func) => {
5840 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5841 }
5842 })
5843 })
5844 }
5845
5846 pub fn get_override_modules(
5847 package_id: &ObjectID,
5848 name: AuthorityName,
5849 ) -> Option<Vec<CompiledModule>> {
5850 OVERRIDE.with(|cfg| {
5851 cfg.borrow().get(package_id).and_then(|entry| match entry {
5852 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5853 PackageOverrideConfig::PerValidator(func) => func(name),
5854 })
5855 })
5856 }
5857
5858 pub fn get_override_system_package(
5859 package_id: &ObjectID,
5860 name: AuthorityName,
5861 ) -> Option<SystemPackage> {
5862 let bytes = get_override_bytes(package_id, name)?;
5863 let dependencies = if is_system_package(*package_id) {
5864 BuiltInFramework::get_package_by_id(package_id)
5865 .dependencies
5866 .to_vec()
5867 } else {
5868 BuiltInFramework::all_package_ids()
5871 };
5872 Some(SystemPackage {
5873 id: *package_id,
5874 bytes,
5875 dependencies,
5876 })
5877 }
5878
5879 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5880 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5881 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5882 cfg.borrow()
5883 .keys()
5884 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5885 .collect()
5886 });
5887
5888 extra
5889 .into_iter()
5890 .map(|package| SystemPackage {
5891 id: package,
5892 bytes: get_override_bytes(&package, name).unwrap(),
5893 dependencies: BuiltInFramework::all_package_ids(),
5894 })
5895 .collect()
5896 }
5897}
5898
5899#[derive(Debug, Serialize, Deserialize, Clone)]
5900pub struct ObjDumpFormat {
5901 pub id: ObjectID,
5902 pub version: VersionNumber,
5903 pub digest: ObjectDigest,
5904 pub object: Object,
5905}
5906
5907impl ObjDumpFormat {
5908 fn new(object: Object) -> Self {
5909 let oref = object.compute_object_reference();
5910 Self {
5911 id: oref.0,
5912 version: oref.1,
5913 digest: oref.2,
5914 object,
5915 }
5916 }
5917}
5918
5919#[derive(Debug, Serialize, Deserialize, Clone)]
5920pub struct NodeStateDump {
5921 pub tx_digest: TransactionDigest,
5922 pub sender_signed_data: SenderSignedData,
5923 pub executed_epoch: u64,
5924 pub reference_gas_price: u64,
5925 pub protocol_version: u64,
5926 pub epoch_start_timestamp_ms: u64,
5927 pub computed_effects: TransactionEffects,
5928 pub expected_effects_digest: TransactionEffectsDigest,
5929 pub relevant_system_packages: Vec<ObjDumpFormat>,
5930 pub shared_objects: Vec<ObjDumpFormat>,
5931 pub loaded_child_objects: Vec<ObjDumpFormat>,
5932 pub modified_at_versions: Vec<ObjDumpFormat>,
5933 pub runtime_reads: Vec<ObjDumpFormat>,
5934 pub input_objects: Vec<ObjDumpFormat>,
5935}
5936
5937impl NodeStateDump {
5938 pub fn new(
5939 tx_digest: &TransactionDigest,
5940 effects: &TransactionEffects,
5941 expected_effects_digest: TransactionEffectsDigest,
5942 object_store: &dyn ObjectStore,
5943 epoch_store: &Arc<AuthorityPerEpochStore>,
5944 inner_temporary_store: &InnerTemporaryStore,
5945 certificate: &VerifiedExecutableTransaction,
5946 ) -> IotaResult<Self> {
5947 let executed_epoch = epoch_store.epoch();
5949 let reference_gas_price = epoch_store.reference_gas_price();
5950 let epoch_start_config = epoch_store.epoch_start_config();
5951 let protocol_version = epoch_store.protocol_version().as_u64();
5952 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5953
5954 let mut relevant_system_packages = Vec::new();
5956 for sys_package_id in BuiltInFramework::all_package_ids() {
5957 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5958 relevant_system_packages.push(ObjDumpFormat::new(w))
5959 }
5960 }
5961
5962 let mut shared_objects = Vec::new();
5964 for kind in effects.input_shared_objects() {
5965 match kind {
5966 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5967 if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5968 shared_objects.push(ObjDumpFormat::new(w))
5969 }
5970 }
5971 InputSharedObject::ReadDeleted(..)
5972 | InputSharedObject::MutateDeleted(..)
5973 | InputSharedObject::Cancelled(..) => (), }
5976 }
5977
5978 let mut loaded_child_objects = Vec::new();
5981 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5982 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5983 loaded_child_objects.push(ObjDumpFormat::new(w))
5984 }
5985 }
5986
5987 let mut modified_at_versions = Vec::new();
5989 for (id, ver) in effects.modified_at_versions() {
5990 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5991 modified_at_versions.push(ObjDumpFormat::new(w))
5992 }
5993 }
5994
5995 let mut runtime_reads = Vec::new();
5999 for obj in inner_temporary_store
6000 .runtime_packages_loaded_from_db
6001 .values()
6002 {
6003 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6004 }
6005
6006 Ok(Self {
6009 tx_digest: *tx_digest,
6010 executed_epoch,
6011 reference_gas_price,
6012 epoch_start_timestamp_ms,
6013 protocol_version,
6014 relevant_system_packages,
6015 shared_objects,
6016 loaded_child_objects,
6017 modified_at_versions,
6018 runtime_reads,
6019 sender_signed_data: certificate.clone().into_message(),
6020 input_objects: inner_temporary_store
6021 .input_objects
6022 .values()
6023 .map(|o| ObjDumpFormat::new(o.clone()))
6024 .collect(),
6025 computed_effects: effects.clone(),
6026 expected_effects_digest,
6027 })
6028 }
6029
6030 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6031 let mut objects = Vec::new();
6032 objects.extend(self.relevant_system_packages.clone());
6033 objects.extend(self.shared_objects.clone());
6034 objects.extend(self.loaded_child_objects.clone());
6035 objects.extend(self.modified_at_versions.clone());
6036 objects.extend(self.runtime_reads.clone());
6037 objects.extend(self.input_objects.clone());
6038 objects
6039 }
6040
6041 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6042 let file_name = format!(
6043 "{}_{}_NODE_DUMP.json",
6044 self.tx_digest,
6045 AuthorityState::unixtime_now_ms()
6046 );
6047 let mut path = path.to_path_buf();
6048 path.push(&file_name);
6049 let mut file = File::create(path.clone())?;
6050 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6051 Ok(path)
6052 }
6053
6054 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6055 let file = File::open(path)?;
6056 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6057 }
6058}