1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49 key_value_store::{
50 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51 },
52 key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57 IOTA_SYSTEM_ADDRESS, TypeTag,
58 account_abstraction::{
59 account::AuthenticatorFunctionRefV1Key,
60 authenticator_function::{
61 AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62 AuthenticatorFunctionRefV1,
63 },
64 },
65 authenticator_state::get_authenticator_state,
66 base_types::*,
67 committee::{Committee, EpochId, ProtocolVersion},
68 crypto::{
69 AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70 default_hash,
71 },
72 deny_list_v1::check_coin_deny_list_v1_during_signing,
73 digests::{ChainIdentifier, Digest, TransactionEventsDigest},
74 dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75 effects::{
76 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77 TransactionEvents, VerifiedSignedTransactionEffects,
78 },
79 error::{ExecutionError, IotaError, IotaResult, UserInputError},
80 event::{Event, EventID, SystemEpochInfoEvent},
81 executable_transaction::VerifiedExecutableTransaction,
82 execution_config_utils::to_binary_config,
83 execution_status::ExecutionStatus,
84 fp_ensure,
85 gas::{GasCostSummary, IotaGasStatus},
86 gas_coin::NANOS_PER_IOTA,
87 inner_temporary_store::{
88 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89 WrittenObjects,
90 },
91 iota_system_state::{
92 IotaSystemState, IotaSystemStateTrait,
93 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94 },
95 is_system_package,
96 layout_resolver::{LayoutResolver, into_struct_layout},
97 message_envelope::Message,
98 messages_checkpoint::{
99 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103 },
104 messages_consensus::AuthorityCapabilitiesV1,
105 messages_grpc::{
106 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108 TransactionStatus,
109 },
110 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111 move_authenticator::MoveAuthenticator,
112 object::{
113 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114 bounded_visitor::BoundedVisitor,
115 },
116 storage::{
117 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118 },
119 supported_protocol_versions::{
120 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121 },
122 transaction::*,
123 transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131 register_histogram_vec_with_registry, register_histogram_with_registry,
132 register_int_counter_vec_with_registry, register_int_counter_with_registry,
133 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139 task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152 authority::{
153 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157 authority_store_tables::AuthorityPrunerTables,
158 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159 },
160 authority_client::NetworkAuthorityClient,
161 checkpoints::CheckpointStore,
162 congestion_tracker::CongestionTracker,
163 consensus_adapter::ConsensusAdapter,
164 epoch::committee_store::CommitteeStore,
165 execution_cache::{
166 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168 TransactionCacheRead,
169 },
170 execution_driver::execution_process,
171 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
172 metrics::{LatencyObserver, RateTracker},
173 module_cache_metrics::ResolverMetrics,
174 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
175 rest_index::RestIndexStore,
176 stake_aggregator::StakeAggregator,
177 state_accumulator::{AccumulatorStore, StateAccumulator},
178 subscription_handler::SubscriptionHandler,
179 transaction_input_loader::TransactionInputLoader,
180 transaction_manager::TransactionManager,
181 transaction_outputs::TransactionOutputs,
182 validator_tx_finalizer::ValidatorTxFinalizer,
183 verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223pub mod authority_store_pruner;
224pub mod authority_store_tables;
225pub mod authority_store_types;
226pub mod epoch_start_configuration;
227pub mod shared_object_congestion_tracker;
228pub mod shared_object_version_manager;
229pub mod suggested_gas_price_calculator;
230pub mod test_authority_builder;
231pub mod transaction_deferral;
232
233pub(crate) mod authority_store;
234pub mod backpressure;
235
236pub struct AuthorityMetrics {
238 tx_orders: IntCounter,
239 total_certs: IntCounter,
240 total_cert_attempts: IntCounter,
241 total_effects: IntCounter,
242 pub shared_obj_tx: IntCounter,
243 sponsored_tx: IntCounter,
244 tx_already_processed: IntCounter,
245 num_input_objs: Histogram,
246 num_shared_objects: Histogram,
247 batch_size: Histogram,
248
249 authority_state_handle_transaction_latency: Histogram,
250
251 execute_certificate_latency_single_writer: Histogram,
252 execute_certificate_latency_shared_object: Histogram,
253
254 internal_execution_latency: Histogram,
255 execution_load_input_objects_latency: Histogram,
256 prepare_certificate_latency: Histogram,
257 commit_certificate_latency: Histogram,
258 db_checkpoint_latency: Histogram,
259
260 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
261 pub(crate) transaction_manager_num_missing_objects: IntGauge,
262 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
263 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
264 pub(crate) transaction_manager_num_ready: IntGauge,
265 pub(crate) transaction_manager_object_cache_size: IntGauge,
266 pub(crate) transaction_manager_object_cache_hits: IntCounter,
267 pub(crate) transaction_manager_object_cache_misses: IntCounter,
268 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
269 pub(crate) transaction_manager_package_cache_size: IntGauge,
270 pub(crate) transaction_manager_package_cache_hits: IntCounter,
271 pub(crate) transaction_manager_package_cache_misses: IntCounter,
272 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
273 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
274
275 pub(crate) execution_driver_executed_transactions: IntCounter,
276 pub(crate) execution_driver_dispatch_queue: IntGauge,
277 pub(crate) execution_queueing_delay_s: Histogram,
278 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
279 pub(crate) execution_gas_latency_ratio: Histogram,
280
281 pub(crate) skipped_consensus_txns: IntCounter,
282 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
283
284 pub(crate) authority_overload_status: IntGauge,
285 pub(crate) authority_load_shedding_percentage: IntGauge,
286
287 pub(crate) transaction_overload_sources: IntCounterVec,
288
289 post_processing_total_events_emitted: IntCounter,
291 post_processing_total_tx_indexed: IntCounter,
292 post_processing_total_tx_had_event_processed: IntCounter,
293 post_processing_total_failures: IntCounter,
294
295 pub consensus_handler_processed: IntCounterVec,
297 pub consensus_handler_transaction_sizes: HistogramVec,
298 pub consensus_handler_num_low_scoring_authorities: IntGauge,
299 pub consensus_handler_scores: IntGaugeVec,
300 pub consensus_handler_deferred_transactions: IntCounter,
301 pub consensus_handler_congested_transactions: IntCounter,
302 pub consensus_handler_cancelled_transactions: IntCounter,
303 pub consensus_handler_max_object_costs: IntGaugeVec,
304 pub consensus_committed_subdags: IntCounterVec,
305 pub consensus_committed_messages: IntGaugeVec,
306 pub consensus_committed_user_transactions: IntGaugeVec,
307 pub consensus_handler_leader_round: IntGauge,
308 pub consensus_calculated_throughput: IntGauge,
309 pub consensus_calculated_throughput_profile: IntGauge,
310
311 pub limits_metrics: Arc<LimitsMetrics>,
312
313 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
315
316 pub zklogin_sig_count: IntCounter,
318 pub multisig_sig_count: IntCounter,
320
321 pub execution_queueing_latency: LatencyObserver,
324
325 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
332
333 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
336}
337
338const POSITIVE_INT_BUCKETS: &[f64] = &[
340 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
341 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
342 7000000., 10000000.,
343];
344
345const LATENCY_SEC_BUCKETS: &[f64] = &[
346 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
347 10., 20., 30., 60., 90.,
348];
349
350const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
352 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
353 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
354];
355
356const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
357 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
358 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
359];
360
361pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
365 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
366 let execute_certificate_latency = register_histogram_vec_with_registry!(
367 "authority_state_execute_certificate_latency",
368 "Latency of executing certificates, including waiting for inputs",
369 &["tx_type"],
370 LATENCY_SEC_BUCKETS.to_vec(),
371 registry,
372 )
373 .unwrap();
374
375 let execute_certificate_latency_single_writer =
376 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
377 let execute_certificate_latency_shared_object =
378 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
379
380 Self {
381 tx_orders: register_int_counter_with_registry!(
382 "total_transaction_orders",
383 "Total number of transaction orders",
384 registry,
385 )
386 .unwrap(),
387 total_certs: register_int_counter_with_registry!(
388 "total_transaction_certificates",
389 "Total number of transaction certificates handled",
390 registry,
391 )
392 .unwrap(),
393 total_cert_attempts: register_int_counter_with_registry!(
394 "total_handle_certificate_attempts",
395 "Number of calls to handle_certificate",
396 registry,
397 )
398 .unwrap(),
399 total_effects: register_int_counter_with_registry!(
401 "total_transaction_effects",
402 "Total number of transaction effects produced",
403 registry,
404 )
405 .unwrap(),
406
407 shared_obj_tx: register_int_counter_with_registry!(
408 "num_shared_obj_tx",
409 "Number of transactions involving shared objects",
410 registry,
411 )
412 .unwrap(),
413
414 sponsored_tx: register_int_counter_with_registry!(
415 "num_sponsored_tx",
416 "Number of sponsored transactions",
417 registry,
418 )
419 .unwrap(),
420
421 tx_already_processed: register_int_counter_with_registry!(
422 "num_tx_already_processed",
423 "Number of transaction orders already processed previously",
424 registry,
425 )
426 .unwrap(),
427 num_input_objs: register_histogram_with_registry!(
428 "num_input_objects",
429 "Distribution of number of input TX objects per TX",
430 POSITIVE_INT_BUCKETS.to_vec(),
431 registry,
432 )
433 .unwrap(),
434 num_shared_objects: register_histogram_with_registry!(
435 "num_shared_objects",
436 "Number of shared input objects per TX",
437 POSITIVE_INT_BUCKETS.to_vec(),
438 registry,
439 )
440 .unwrap(),
441 batch_size: register_histogram_with_registry!(
442 "batch_size",
443 "Distribution of size of transaction batch",
444 POSITIVE_INT_BUCKETS.to_vec(),
445 registry,
446 )
447 .unwrap(),
448 authority_state_handle_transaction_latency: register_histogram_with_registry!(
449 "authority_state_handle_transaction_latency",
450 "Latency of handling transactions",
451 LATENCY_SEC_BUCKETS.to_vec(),
452 registry,
453 )
454 .unwrap(),
455 execute_certificate_latency_single_writer,
456 execute_certificate_latency_shared_object,
457 internal_execution_latency: register_histogram_with_registry!(
458 "authority_state_internal_execution_latency",
459 "Latency of actual certificate executions",
460 LATENCY_SEC_BUCKETS.to_vec(),
461 registry,
462 )
463 .unwrap(),
464 execution_load_input_objects_latency: register_histogram_with_registry!(
465 "authority_state_execution_load_input_objects_latency",
466 "Latency of loading input objects for execution",
467 LOW_LATENCY_SEC_BUCKETS.to_vec(),
468 registry,
469 )
470 .unwrap(),
471 prepare_certificate_latency: register_histogram_with_registry!(
472 "authority_state_prepare_certificate_latency",
473 "Latency of executing certificates, before committing the results",
474 LATENCY_SEC_BUCKETS.to_vec(),
475 registry,
476 )
477 .unwrap(),
478 commit_certificate_latency: register_histogram_with_registry!(
479 "authority_state_commit_certificate_latency",
480 "Latency of committing certificate execution results",
481 LATENCY_SEC_BUCKETS.to_vec(),
482 registry,
483 )
484 .unwrap(),
485 db_checkpoint_latency: register_histogram_with_registry!(
486 "db_checkpoint_latency",
487 "Latency of checkpointing dbs",
488 LATENCY_SEC_BUCKETS.to_vec(),
489 registry,
490 ).unwrap(),
491 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
492 "transaction_manager_num_enqueued_certificates",
493 "Current number of certificates enqueued to TransactionManager",
494 &["result"],
495 registry,
496 )
497 .unwrap(),
498 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
499 "transaction_manager_num_missing_objects",
500 "Current number of missing objects in TransactionManager",
501 registry,
502 )
503 .unwrap(),
504 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
505 "transaction_manager_num_pending_certificates",
506 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
507 registry,
508 )
509 .unwrap(),
510 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
511 "transaction_manager_num_executing_certificates",
512 "Number of executing certificates, including queued and actually running certificates",
513 registry,
514 )
515 .unwrap(),
516 transaction_manager_num_ready: register_int_gauge_with_registry!(
517 "transaction_manager_num_ready",
518 "Number of ready transactions in TransactionManager",
519 registry,
520 )
521 .unwrap(),
522 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
523 "transaction_manager_object_cache_size",
524 "Current size of object-availability cache in TransactionManager",
525 registry,
526 )
527 .unwrap(),
528 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
529 "transaction_manager_object_cache_hits",
530 "Number of object-availability cache hits in TransactionManager",
531 registry,
532 )
533 .unwrap(),
534 authority_overload_status: register_int_gauge_with_registry!(
535 "authority_overload_status",
536 "Whether authority is current experiencing overload and enters load shedding mode.",
537 registry)
538 .unwrap(),
539 authority_load_shedding_percentage: register_int_gauge_with_registry!(
540 "authority_load_shedding_percentage",
541 "The percentage of transactions is shed when the authority is in load shedding mode.",
542 registry)
543 .unwrap(),
544 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
545 "transaction_manager_object_cache_misses",
546 "Number of object-availability cache misses in TransactionManager",
547 registry,
548 )
549 .unwrap(),
550 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
551 "transaction_manager_object_cache_evictions",
552 "Number of object-availability cache evictions in TransactionManager",
553 registry,
554 )
555 .unwrap(),
556 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
557 "transaction_manager_package_cache_size",
558 "Current size of package-availability cache in TransactionManager",
559 registry,
560 )
561 .unwrap(),
562 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
563 "transaction_manager_package_cache_hits",
564 "Number of package-availability cache hits in TransactionManager",
565 registry,
566 )
567 .unwrap(),
568 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
569 "transaction_manager_package_cache_misses",
570 "Number of package-availability cache misses in TransactionManager",
571 registry,
572 )
573 .unwrap(),
574 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
575 "transaction_manager_package_cache_evictions",
576 "Number of package-availability cache evictions in TransactionManager",
577 registry,
578 )
579 .unwrap(),
580 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
581 "transaction_manager_transaction_queue_age_s",
582 "Time spent in waiting for transaction in the queue",
583 LATENCY_SEC_BUCKETS.to_vec(),
584 registry,
585 )
586 .unwrap(),
587 transaction_overload_sources: register_int_counter_vec_with_registry!(
588 "transaction_overload_sources",
589 "Number of times each source indicates transaction overload.",
590 &["source"],
591 registry)
592 .unwrap(),
593 execution_driver_executed_transactions: register_int_counter_with_registry!(
594 "execution_driver_executed_transactions",
595 "Cumulative number of transaction executed by execution driver",
596 registry,
597 )
598 .unwrap(),
599 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
600 "execution_driver_dispatch_queue",
601 "Number of transaction pending in execution driver dispatch queue",
602 registry,
603 )
604 .unwrap(),
605 execution_queueing_delay_s: register_histogram_with_registry!(
606 "execution_queueing_delay_s",
607 "Queueing delay between a transaction is ready for execution until it starts executing.",
608 LATENCY_SEC_BUCKETS.to_vec(),
609 registry
610 )
611 .unwrap(),
612 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
613 "prepare_cert_gas_latency_ratio",
614 "The ratio of computation gas divided by VM execution latency.",
615 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
616 registry
617 )
618 .unwrap(),
619 execution_gas_latency_ratio: register_histogram_with_registry!(
620 "execution_gas_latency_ratio",
621 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
622 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
623 registry
624 )
625 .unwrap(),
626 skipped_consensus_txns: register_int_counter_with_registry!(
627 "skipped_consensus_txns",
628 "Total number of consensus transactions skipped",
629 registry,
630 )
631 .unwrap(),
632 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
633 "skipped_consensus_txns_cache_hit",
634 "Total number of consensus transactions skipped because of local cache hit",
635 registry,
636 )
637 .unwrap(),
638 post_processing_total_events_emitted: register_int_counter_with_registry!(
639 "post_processing_total_events_emitted",
640 "Total number of events emitted in post processing",
641 registry,
642 )
643 .unwrap(),
644 post_processing_total_tx_indexed: register_int_counter_with_registry!(
645 "post_processing_total_tx_indexed",
646 "Total number of txes indexed in post processing",
647 registry,
648 )
649 .unwrap(),
650 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
651 "post_processing_total_tx_had_event_processed",
652 "Total number of txes finished event processing in post processing",
653 registry,
654 )
655 .unwrap(),
656 post_processing_total_failures: register_int_counter_with_registry!(
657 "post_processing_total_failures",
658 "Total number of failure in post processing",
659 registry,
660 )
661 .unwrap(),
662 consensus_handler_processed: register_int_counter_vec_with_registry!(
663 "consensus_handler_processed",
664 "Number of transactions processed by consensus handler",
665 &["class"],
666 registry
667 ).unwrap(),
668 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
669 "consensus_handler_transaction_sizes",
670 "Sizes of each type of transactions processed by consensus handler",
671 &["class"],
672 POSITIVE_INT_BUCKETS.to_vec(),
673 registry
674 ).unwrap(),
675 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
676 "consensus_handler_num_low_scoring_authorities",
677 "Number of low scoring authorities based on reputation scores from consensus",
678 registry
679 ).unwrap(),
680 consensus_handler_scores: register_int_gauge_vec_with_registry!(
681 "consensus_handler_scores",
682 "scores from consensus for each authority",
683 &["authority"],
684 registry,
685 ).unwrap(),
686 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
687 "consensus_handler_deferred_transactions",
688 "Number of transactions deferred by consensus handler",
689 registry,
690 ).unwrap(),
691 consensus_handler_congested_transactions: register_int_counter_with_registry!(
692 "consensus_handler_congested_transactions",
693 "Number of transactions deferred by consensus handler due to congestion",
694 registry,
695 ).unwrap(),
696 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
697 "consensus_handler_cancelled_transactions",
698 "Number of transactions cancelled by consensus handler",
699 registry,
700 ).unwrap(),
701 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
702 "consensus_handler_max_congestion_control_object_costs",
703 "Max object costs for congestion control in the current consensus commit",
704 &["commit_type"],
705 registry,
706 ).unwrap(),
707 consensus_committed_subdags: register_int_counter_vec_with_registry!(
708 "consensus_committed_subdags",
709 "Number of committed subdags, sliced by leader",
710 &["authority"],
711 registry,
712 ).unwrap(),
713 consensus_committed_messages: register_int_gauge_vec_with_registry!(
714 "consensus_committed_messages",
715 "Total number of committed consensus messages, sliced by author",
716 &["authority"],
717 registry,
718 ).unwrap(),
719 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
720 "consensus_committed_user_transactions",
721 "Number of committed user transactions, sliced by submitter",
722 &["authority"],
723 registry,
724 ).unwrap(),
725 consensus_handler_leader_round: register_int_gauge_with_registry!(
726 "consensus_handler_leader_round",
727 "The leader round of the current consensus output being processed in the consensus handler",
728 registry,
729 ).unwrap(),
730 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
731 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
732 zklogin_sig_count: register_int_counter_with_registry!(
733 "zklogin_sig_count",
734 "Count of zkLogin signatures",
735 registry,
736 )
737 .unwrap(),
738 multisig_sig_count: register_int_counter_with_registry!(
739 "multisig_sig_count",
740 "Count of zkLogin signatures",
741 registry,
742 )
743 .unwrap(),
744 consensus_calculated_throughput: register_int_gauge_with_registry!(
745 "consensus_calculated_throughput",
746 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
747 registry,
748 ).unwrap(),
749 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
750 "consensus_calculated_throughput_profile",
751 "The current active calculated throughput profile",
752 registry
753 ).unwrap(),
754 execution_queueing_latency: LatencyObserver::new(),
755 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
756 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757 }
758 }
759
760 pub fn reset_on_reconfigure(&self) {
764 self.consensus_committed_messages.reset();
765 self.consensus_handler_scores.reset();
766 self.consensus_committed_user_transactions.reset();
767 }
768}
769
770pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
777
778pub struct AuthorityState {
779 pub name: AuthorityName,
782 pub secret: StableSyncAuthoritySigner,
784
785 input_loader: TransactionInputLoader,
787 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
788
789 epoch_store: ArcSwap<AuthorityPerEpochStore>,
790
791 execution_lock: RwLock<EpochId>,
797
798 pub indexes: Option<Arc<IndexStore>>,
799 pub rest_index: Option<Arc<RestIndexStore>>,
800
801 pub subscription_handler: Arc<SubscriptionHandler>,
802 pub checkpoint_store: Arc<CheckpointStore>,
803
804 committee_store: Arc<CommitteeStore>,
805
806 transaction_manager: Arc<TransactionManager>,
808
809 #[cfg_attr(not(test), expect(unused))]
811 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
812
813 pub metrics: Arc<AuthorityMetrics>,
814 _pruner: AuthorityStorePruner,
815 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
816
817 db_checkpoint_config: DBCheckpointConfig,
819
820 pub config: NodeConfig,
821
822 pub overload_info: AuthorityOverloadInfo,
824
825 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
826
827 chain_identifier: ChainIdentifier,
830
831 pub(crate) congestion_tracker: Arc<CongestionTracker>,
832}
833
834impl AuthorityState {
843 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
844 epoch_store.committee().authority_exists(&self.name)
845 }
846
847 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
848 epoch_store
849 .active_validators()
850 .iter()
851 .any(|a| AuthorityName::from(a) == self.name)
852 }
853
854 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
855 !self.is_committee_validator(epoch_store)
856 }
857
858 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
859 &self.committee_store
860 }
861
862 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
863 self.committee_store.clone()
864 }
865
866 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
867 &self.config.authority_overload_config
868 }
869
870 pub fn get_epoch_state_commitments(
871 &self,
872 epoch: EpochId,
873 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
874 self.checkpoint_store.get_epoch_state_commitments(epoch)
875 }
876
877 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
881 async fn handle_transaction_impl(
882 &self,
883 transaction: VerifiedTransaction,
884 epoch_store: &Arc<AuthorityPerEpochStore>,
885 ) -> IotaResult<VerifiedSignedTransaction> {
886 let _execution_lock = self.execution_lock_for_signing();
888
889 let protocol_config = epoch_store.protocol_config();
890 let reference_gas_price = epoch_store.reference_gas_price();
891
892 let epoch = epoch_store.epoch();
893
894 let tx_data = transaction.data().transaction_data();
895
896 iota_transaction_checks::deny::check_transaction_for_signing(
900 tx_data,
901 transaction.tx_signatures(),
902 &transaction.input_objects()?,
903 &tx_data.receiving_objects(),
904 &self.config.transaction_deny_config,
905 self.get_backing_package_store().as_ref(),
906 )?;
907
908 let (tx_input_objects, tx_receiving_objects, auth_input_objects, account_object) =
913 self.read_objects_for_signing(&transaction, epoch)?;
914
915 let move_authenticator = transaction.sender_move_authenticator();
919
920 let (
926 gas_status,
927 tx_checked_input_objects,
928 auth_checked_input_objects,
929 authenticator_function_ref,
930 ) = self.check_transaction_inputs_for_signing(
931 protocol_config,
932 reference_gas_price,
933 tx_data,
934 tx_input_objects,
935 &tx_receiving_objects,
936 move_authenticator,
937 auth_input_objects,
938 account_object,
939 )?;
940
941 check_coin_deny_list_v1_during_signing(
942 tx_data.sender(),
943 &tx_checked_input_objects,
944 &tx_receiving_objects,
945 &auth_checked_input_objects,
946 &self.get_object_store(),
947 )?;
948
949 if let Some(move_authenticator) = move_authenticator {
950 let auth_checked_input_objects = auth_checked_input_objects
954 .expect("MoveAuthenticator input objects must be provided");
955 let authenticator_function_ref = authenticator_function_ref
956 .expect("AuthenticatorFunctionRefV1 object must be provided");
957
958 let (kind, signer, _) = tx_data.execution_parts();
959
960 let validation_result = epoch_store.executor().authenticate_transaction(
962 self.get_backing_store().as_ref(),
963 protocol_config,
964 self.metrics.limits_metrics.clone(),
965 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
966 epoch_store
967 .epoch_start_config()
968 .epoch_data()
969 .epoch_start_timestamp(),
970 gas_status,
971 move_authenticator.to_owned(),
972 authenticator_function_ref,
973 auth_checked_input_objects,
974 kind,
975 signer,
976 transaction.digest().to_owned(),
977 &mut None,
978 );
979
980 if let Err(validation_error) = validation_result {
981 return Err(IotaError::MoveAuthenticatorExecutionFailure {
982 error: validation_error.to_string(),
983 });
984 }
985 }
986
987 let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
988
989 let signed_transaction =
990 VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
991
992 self.get_cache_writer().try_acquire_transaction_locks(
997 epoch_store,
998 &owned_objects,
999 signed_transaction.clone(),
1000 )?;
1001
1002 Ok(signed_transaction)
1003 }
1004
1005 #[instrument(name = "handle_transaction", level = "trace", skip_all)]
1007 pub async fn handle_transaction(
1008 &self,
1009 epoch_store: &Arc<AuthorityPerEpochStore>,
1010 transaction: VerifiedTransaction,
1011 ) -> IotaResult<HandleTransactionResponse> {
1012 let tx_digest = *transaction.digest();
1013 debug!("handle_transaction");
1014
1015 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1017 return Ok(HandleTransactionResponse { status });
1018 }
1019
1020 let _metrics_guard = self
1021 .metrics
1022 .authority_state_handle_transaction_latency
1023 .start_timer();
1024 self.metrics.tx_orders.inc();
1025
1026 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1027 match signed {
1028 Ok(s) => {
1029 if self.is_committee_validator(epoch_store) {
1030 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1031 let tx = s.clone();
1032 let validator_tx_finalizer = validator_tx_finalizer.clone();
1033 let cache_reader = self.get_transaction_cache_reader().clone();
1034 let epoch_store = epoch_store.clone();
1035 spawn_monitored_task!(epoch_store.within_alive_epoch(
1036 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1037 ));
1038 }
1039 }
1040 Ok(HandleTransactionResponse {
1041 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1042 })
1043 }
1044 Err(err) => Ok(HandleTransactionResponse {
1048 status: self
1049 .get_transaction_status(&tx_digest, epoch_store)?
1050 .ok_or(err)?
1051 .1,
1052 }),
1053 }
1054 }
1055
1056 pub fn check_system_overload_at_signing(&self) -> bool {
1057 self.config
1058 .authority_overload_config
1059 .check_system_overload_at_signing
1060 }
1061
1062 pub fn check_system_overload_at_execution(&self) -> bool {
1063 self.config
1064 .authority_overload_config
1065 .check_system_overload_at_execution
1066 }
1067
1068 pub(crate) fn check_system_overload(
1069 &self,
1070 consensus_adapter: &Arc<ConsensusAdapter>,
1071 tx_data: &SenderSignedData,
1072 do_authority_overload_check: bool,
1073 ) -> IotaResult {
1074 if do_authority_overload_check {
1075 self.check_authority_overload(tx_data).tap_err(|_| {
1076 self.update_overload_metrics("execution_queue");
1077 })?;
1078 }
1079 self.transaction_manager
1080 .check_execution_overload(self.overload_config(), tx_data)
1081 .tap_err(|_| {
1082 self.update_overload_metrics("execution_pending");
1083 })?;
1084 consensus_adapter.check_consensus_overload().tap_err(|_| {
1085 self.update_overload_metrics("consensus");
1086 })?;
1087
1088 let pending_tx_count = self
1089 .get_cache_commit()
1090 .approximate_pending_transaction_count();
1091 if pending_tx_count
1092 > self
1093 .config
1094 .execution_cache_config
1095 .writeback_cache
1096 .backpressure_threshold_for_rpc()
1097 {
1098 return Err(IotaError::ValidatorOverloadedRetryAfter {
1099 retry_after_secs: 10,
1100 });
1101 }
1102
1103 Ok(())
1104 }
1105
1106 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1107 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1108 return Ok(());
1109 }
1110
1111 let load_shedding_percentage = self
1112 .overload_info
1113 .load_shedding_percentage
1114 .load(Ordering::Relaxed);
1115 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1116 }
1117
1118 fn update_overload_metrics(&self, source: &str) {
1119 self.metrics
1120 .transaction_overload_sources
1121 .with_label_values(&[source])
1122 .inc();
1123 }
1124
1125 #[instrument(level = "trace", skip_all)]
1127 pub async fn execute_certificate(
1128 &self,
1129 certificate: &VerifiedCertificate,
1130 epoch_store: &Arc<AuthorityPerEpochStore>,
1131 ) -> IotaResult<TransactionEffects> {
1132 let _metrics_guard = if certificate.contains_shared_object() {
1133 self.metrics
1134 .execute_certificate_latency_shared_object
1135 .start_timer()
1136 } else {
1137 self.metrics
1138 .execute_certificate_latency_single_writer
1139 .start_timer()
1140 };
1141 trace!("execute_certificate");
1142
1143 self.metrics.total_cert_attempts.inc();
1144
1145 if !certificate.contains_shared_object() {
1146 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1151 }
1152
1153 epoch_store
1156 .within_alive_epoch(self.notify_read_effects(certificate))
1157 .await
1158 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1159 .and_then(|r| r)
1160 }
1161
1162 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1177 pub fn try_execute_immediately(
1178 &self,
1179 certificate: &VerifiedExecutableTransaction,
1180 expected_effects_digest: Option<TransactionEffectsDigest>,
1181 epoch_store: &Arc<AuthorityPerEpochStore>,
1182 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1183 let _scope = monitored_scope("Execution::try_execute_immediately");
1184 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1185
1186 let tx_digest = certificate.digest();
1187
1188 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1190
1191 if let Some(effects) = self
1194 .get_transaction_cache_reader()
1195 .try_get_executed_effects(tx_digest)?
1196 {
1197 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1198 assert_eq!(
1199 effects.digest(),
1200 expected_effects_digest_inner,
1201 "Unexpected effects digest for transaction {tx_digest:?}"
1202 );
1203 }
1204 tx_guard.release();
1205 return Ok((effects, None));
1206 }
1207
1208 let (tx_input_objects, authenticator_input_objects, account_object) =
1209 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1210
1211 let expected_effects_digest =
1217 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1218
1219 self.process_certificate(
1220 tx_guard,
1221 certificate,
1222 tx_input_objects,
1223 account_object,
1224 authenticator_input_objects,
1225 expected_effects_digest,
1226 epoch_store,
1227 )
1228 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1229 .tap_ok(
1230 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1231 )
1232 }
1233
1234 pub fn read_objects_for_execution(
1235 &self,
1236 tx_lock: &CertLockGuard,
1237 certificate: &VerifiedExecutableTransaction,
1238 epoch_store: &Arc<AuthorityPerEpochStore>,
1239 ) -> IotaResult<(InputObjects, Option<InputObjects>, Option<ObjectReadResult>)> {
1240 let _scope = monitored_scope("Execution::load_input_objects");
1241 let _metrics_guard = self
1242 .metrics
1243 .execution_load_input_objects_latency
1244 .start_timer();
1245
1246 let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1247
1248 let input_objects = self.input_loader.read_objects_for_execution(
1249 epoch_store,
1250 &certificate.key(),
1251 tx_lock,
1252 &input_objects,
1253 epoch_store.epoch(),
1254 )?;
1255
1256 certificate.split_input_objects_into_groups_for_reading(input_objects)
1257 }
1258
1259 pub fn try_execute_for_test(
1263 &self,
1264 certificate: &VerifiedCertificate,
1265 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1266 let epoch_store = self.epoch_store_for_testing();
1267 let (effects, execution_error_opt) = self.try_execute_immediately(
1268 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1269 None,
1270 &epoch_store,
1271 )?;
1272 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1273 Ok((signed_effects, execution_error_opt))
1274 }
1275
1276 pub fn execute_for_test(
1278 &self,
1279 certificate: &VerifiedCertificate,
1280 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1281 self.try_execute_for_test(certificate)
1282 .expect("try_execute_for_test should not fail")
1283 }
1284
1285 pub async fn notify_read_effects(
1286 &self,
1287 certificate: &VerifiedCertificate,
1288 ) -> IotaResult<TransactionEffects> {
1289 self.get_transaction_cache_reader()
1290 .try_notify_read_executed_effects(&[*certificate.digest()])
1291 .await
1292 .map(|mut r| r.pop().expect("must return correct number of effects"))
1293 }
1294
1295 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1296 self.get_object_cache_reader()
1297 .try_check_owned_objects_are_live(owned_object_refs)
1298 }
1299
1300 pub(crate) fn debug_dump_transaction_state(
1305 &self,
1306 tx_digest: &TransactionDigest,
1307 effects: &TransactionEffects,
1308 expected_effects_digest: TransactionEffectsDigest,
1309 inner_temporary_store: &InnerTemporaryStore,
1310 certificate: &VerifiedExecutableTransaction,
1311 debug_dump_config: &StateDebugDumpConfig,
1312 ) -> IotaResult<PathBuf> {
1313 let dump_dir = debug_dump_config
1314 .dump_file_directory
1315 .as_ref()
1316 .cloned()
1317 .unwrap_or(std::env::temp_dir());
1318 let epoch_store = self.load_epoch_store_one_call_per_task();
1319
1320 NodeStateDump::new(
1321 tx_digest,
1322 effects,
1323 expected_effects_digest,
1324 self.get_object_store().as_ref(),
1325 &epoch_store,
1326 inner_temporary_store,
1327 certificate,
1328 )?
1329 .write_to_file(&dump_dir)
1330 .map_err(|e| IotaError::FileIO(e.to_string()))
1331 }
1332
1333 #[instrument(level = "trace", skip_all)]
1334 pub(crate) fn process_certificate(
1335 &self,
1336 tx_guard: CertTxGuard,
1337 certificate: &VerifiedExecutableTransaction,
1338 tx_input_objects: InputObjects,
1339 account_object: Option<ObjectReadResult>,
1340 authenticator_input_objects: Option<InputObjects>,
1341 expected_effects_digest: Option<TransactionEffectsDigest>,
1342 epoch_store: &Arc<AuthorityPerEpochStore>,
1343 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1344 let process_certificate_start_time = tokio::time::Instant::now();
1345 let digest = *certificate.digest();
1346
1347 fail_point_if!("correlated-crash-process-certificate", || {
1348 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1349 iota_simulator::task::kill_current_node(None);
1350 }
1351 });
1352
1353 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1354 let execution_guard = match execution_guard {
1360 Ok(execution_guard) => execution_guard,
1361 Err(err) => {
1362 tx_guard.release();
1363 return Err(err);
1364 }
1365 };
1366 if *execution_guard != epoch_store.epoch() {
1370 tx_guard.release();
1371 info!("The epoch of the execution_guard doesn't match the epoch store");
1372 return Err(IotaError::WrongEpoch {
1373 expected_epoch: epoch_store.epoch(),
1374 actual_epoch: *execution_guard,
1375 });
1376 }
1377
1378 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1384 &execution_guard,
1385 certificate,
1386 tx_input_objects,
1387 account_object,
1388 authenticator_input_objects,
1389 epoch_store,
1390 ) {
1391 Err(e) => {
1392 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1393 tx_guard.release();
1394 return Err(e);
1395 }
1396 Ok(res) => res,
1397 };
1398
1399 if let Some(expected_effects_digest) = expected_effects_digest {
1400 if effects.digest() != expected_effects_digest {
1401 match self.debug_dump_transaction_state(
1403 &digest,
1404 &effects,
1405 expected_effects_digest,
1406 &inner_temporary_store,
1407 certificate,
1408 &self.config.state_debug_dump_config,
1409 ) {
1410 Ok(out_path) => {
1411 info!(
1412 "Dumped node state for transaction {} to {}",
1413 digest,
1414 out_path.as_path().display().to_string()
1415 );
1416 }
1417 Err(e) => {
1418 error!("Error dumping state for transaction {}: {e}", digest);
1419 }
1420 }
1421 error!(
1422 tx_digest = ?digest,
1423 ?expected_effects_digest,
1424 actual_effects = ?effects,
1425 "fork detected!"
1426 );
1427 panic!(
1428 "Transaction {} is expected to have effects digest {}, but got {}!",
1429 digest,
1430 expected_effects_digest,
1431 effects.digest(),
1432 );
1433 }
1434 }
1435
1436 fail_point!("crash");
1437
1438 self.commit_certificate(
1439 certificate,
1440 inner_temporary_store,
1441 &effects,
1442 tx_guard,
1443 execution_guard,
1444 epoch_store,
1445 )?;
1446
1447 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1448 certificate.data().transaction_data().kind()
1449 {
1450 if let Some(err) = &execution_error_opt {
1451 debug_fatal!("Authenticator state update failed: {:?}", err);
1452 }
1453 epoch_store.update_authenticator_state(auth_state);
1454
1455 if cfg!(debug_assertions) {
1458 let authenticator_state = get_authenticator_state(self.get_object_store())
1459 .expect("Read cannot fail")
1460 .expect("Authenticator state must exist");
1461
1462 let mut sys_jwks: Vec<_> = authenticator_state
1463 .active_jwks
1464 .into_iter()
1465 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1466 .collect();
1467 let mut active_jwks: Vec<_> = epoch_store
1468 .signature_verifier
1469 .get_jwks()
1470 .into_iter()
1471 .collect();
1472 sys_jwks.sort();
1473 active_jwks.sort();
1474
1475 assert_eq!(sys_jwks, active_jwks);
1476 }
1477 }
1478
1479 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1480 if elapsed > 0.0 {
1481 self.metrics
1482 .execution_gas_latency_ratio
1483 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1484 };
1485 Ok((effects, execution_error_opt))
1486 }
1487
1488 #[instrument(level = "trace", skip_all)]
1489 fn commit_certificate(
1490 &self,
1491 certificate: &VerifiedExecutableTransaction,
1492 inner_temporary_store: InnerTemporaryStore,
1493 effects: &TransactionEffects,
1494 tx_guard: CertTxGuard,
1495 _execution_guard: ExecutionLockReadGuard<'_>,
1496 epoch_store: &Arc<AuthorityPerEpochStore>,
1497 ) -> IotaResult {
1498 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1499 monitored_scope("Execution::commit_certificate");
1500 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1501
1502 let tx_key = certificate.key();
1503 let tx_digest = certificate.digest();
1504 let input_object_count = inner_temporary_store.input_objects.len();
1505 let shared_object_count = effects.input_shared_objects().len();
1506
1507 let output_keys = inner_temporary_store.get_output_keys(effects);
1508
1509 let _ = self
1511 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1512 .tap_err(|e| {
1513 self.metrics.post_processing_total_failures.inc();
1514 error!(?tx_digest, "tx post processing failed: {e}");
1515 });
1516
1517 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1521
1522 fail_point!("crash");
1524
1525 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1526 certificate.clone().into_unsigned(),
1527 effects.clone(),
1528 inner_temporary_store,
1529 );
1530 self.get_cache_writer()
1531 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1532
1533 if certificate.transaction_data().is_end_of_epoch_tx() {
1534 self.get_object_cache_reader()
1537 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1538 }
1539
1540 tx_guard.commit_tx();
1542
1543 self.transaction_manager
1547 .notify_commit(tx_digest, output_keys, epoch_store);
1548
1549 self.update_metrics(certificate, input_object_count, shared_object_count);
1550
1551 Ok(())
1552 }
1553
1554 fn update_metrics(
1555 &self,
1556 certificate: &VerifiedExecutableTransaction,
1557 input_object_count: usize,
1558 shared_object_count: usize,
1559 ) {
1560 if certificate.has_zklogin_sig() {
1562 self.metrics.zklogin_sig_count.inc();
1563 } else if certificate.has_upgraded_multisig() {
1564 self.metrics.multisig_sig_count.inc();
1565 }
1566
1567 self.metrics.total_effects.inc();
1568 self.metrics.total_certs.inc();
1569
1570 if shared_object_count > 0 {
1571 self.metrics.shared_obj_tx.inc();
1572 }
1573
1574 if certificate.is_sponsored_tx() {
1575 self.metrics.sponsored_tx.inc();
1576 }
1577
1578 self.metrics
1579 .num_input_objs
1580 .observe(input_object_count as f64);
1581 self.metrics
1582 .num_shared_objects
1583 .observe(shared_object_count as f64);
1584 self.metrics.batch_size.observe(
1585 certificate
1586 .data()
1587 .intent_message()
1588 .value
1589 .kind()
1590 .num_commands() as f64,
1591 );
1592 }
1593
1594 #[instrument(level = "trace", skip_all)]
1606 fn prepare_certificate(
1607 &self,
1608 _execution_guard: &ExecutionLockReadGuard<'_>,
1609 certificate: &VerifiedExecutableTransaction,
1610 tx_input_objects: InputObjects,
1611 account_object: Option<ObjectReadResult>,
1612 move_authenticator_input_objects: Option<InputObjects>,
1613 epoch_store: &Arc<AuthorityPerEpochStore>,
1614 ) -> IotaResult<(
1615 InnerTemporaryStore,
1616 TransactionEffects,
1617 Option<ExecutionError>,
1618 )> {
1619 let _scope = monitored_scope("Execution::prepare_certificate");
1620 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1621 let prepare_certificate_start_time = tokio::time::Instant::now();
1622
1623 let protocol_config = epoch_store.protocol_config();
1624
1625 let reference_gas_price = epoch_store.reference_gas_price();
1626
1627 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1628 let epoch_start_timestamp = epoch_store
1629 .epoch_start_config()
1630 .epoch_data()
1631 .epoch_start_timestamp();
1632
1633 let backing_store = self.get_backing_store().as_ref();
1634
1635 let tx_digest = *certificate.digest();
1636
1637 let tx_data = certificate.data().transaction_data();
1640 tx_data.validity_check(protocol_config)?;
1641
1642 let (kind, signer, gas) = tx_data.execution_parts();
1643
1644 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1645 let (inner_temp_store, _, mut effects, execution_error_opt) = if let Some(
1646 move_authenticator,
1647 ) =
1648 certificate.sender_move_authenticator()
1649 {
1650 let (
1657 auth_account_object_id,
1658 auth_account_object_seq_number,
1659 auth_account_object_digest,
1660 ) = move_authenticator.object_to_authenticate_components()?;
1661
1662 let account_object = account_object.expect("Account object must be provided");
1665
1666 let authenticator_function_ref_for_execution = self.check_move_account(
1667 auth_account_object_id,
1668 auth_account_object_seq_number,
1669 auth_account_object_digest,
1670 account_object,
1671 &signer,
1672 )?;
1673
1674 let move_authenticator_input_objects = move_authenticator_input_objects.expect(
1675 "In case of a `MoveAuthenticator` signature, the authenticator input objects must be provided",
1676 );
1677
1678 let authenticator_gas_budget = protocol_config.max_auth_gas();
1683 let (
1684 gas_status,
1685 authenticator_checked_input_objects,
1686 authenticator_and_tx_checked_input_objects,
1687 ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1688 certificate,
1689 tx_input_objects,
1690 move_authenticator_input_objects,
1691 authenticator_gas_budget,
1692 protocol_config,
1693 reference_gas_price,
1694 )?;
1695
1696 let owned_object_refs = authenticator_and_tx_checked_input_objects
1697 .inner()
1698 .filter_owned_objects();
1699 self.check_owned_locks(&owned_object_refs)?;
1700
1701 epoch_store
1702 .executor()
1703 .authenticate_then_execute_transaction_to_effects(
1704 backing_store,
1705 protocol_config,
1706 self.metrics.limits_metrics.clone(),
1707 self.config
1708 .expensive_safety_check_config
1709 .enable_deep_per_tx_iota_conservation_check(),
1710 self.config.certificate_deny_config.certificate_deny_set(),
1711 &epoch_id,
1712 epoch_start_timestamp,
1713 gas_status,
1714 gas,
1715 move_authenticator.to_owned(),
1716 authenticator_function_ref_for_execution,
1717 authenticator_checked_input_objects,
1718 authenticator_and_tx_checked_input_objects,
1719 kind,
1720 signer,
1721 tx_digest,
1722 &mut None,
1723 )
1724 } else {
1725 let (tx_gas_status, tx_checked_input_objects) =
1730 iota_transaction_checks::check_certificate_input(
1731 certificate,
1732 tx_input_objects,
1733 protocol_config,
1734 reference_gas_price,
1735 )?;
1736
1737 let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1738 self.check_owned_locks(&owned_object_refs)?;
1739 epoch_store.executor().execute_transaction_to_effects(
1740 backing_store,
1741 protocol_config,
1742 self.metrics.limits_metrics.clone(),
1743 self.config
1746 .expensive_safety_check_config
1747 .enable_deep_per_tx_iota_conservation_check(),
1748 self.config.certificate_deny_config.certificate_deny_set(),
1749 &epoch_id,
1750 epoch_start_timestamp,
1751 tx_checked_input_objects,
1752 gas,
1753 tx_gas_status,
1754 kind,
1755 signer,
1756 tx_digest,
1757 &mut None,
1758 )
1759 };
1760
1761 fail_point_if!("cp_execution_nondeterminism", || {
1762 #[cfg(msim)]
1763 self.create_fail_state(certificate, epoch_store, &mut effects);
1764 });
1765
1766 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1767 if elapsed > 0.0 {
1768 self.metrics
1769 .prepare_cert_gas_latency_ratio
1770 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1771 }
1772
1773 Ok((inner_temp_store, effects, execution_error_opt.err()))
1774 }
1775
1776 pub fn prepare_certificate_for_benchmark(
1777 &self,
1778 certificate: &VerifiedExecutableTransaction,
1779 input_objects: InputObjects,
1780 epoch_store: &Arc<AuthorityPerEpochStore>,
1781 ) -> IotaResult<(
1782 InnerTemporaryStore,
1783 TransactionEffects,
1784 Option<ExecutionError>,
1785 )> {
1786 let lock = RwLock::new(epoch_store.epoch());
1787 let execution_guard = lock.try_read().unwrap();
1788
1789 self.prepare_certificate(
1790 &execution_guard,
1791 certificate,
1792 input_objects,
1793 None,
1794 None,
1795 epoch_store,
1796 )
1797 }
1798
1799 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1802 #[allow(clippy::type_complexity)]
1803 pub fn dry_exec_transaction(
1804 &self,
1805 transaction: TransactionData,
1806 transaction_digest: TransactionDigest,
1807 ) -> IotaResult<(
1808 DryRunTransactionBlockResponse,
1809 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1810 TransactionEffects,
1811 Option<ObjectID>,
1812 )> {
1813 let epoch_store = self.load_epoch_store_one_call_per_task();
1814 if !self.is_fullnode(&epoch_store) {
1815 return Err(IotaError::UnsupportedFeature {
1816 error: "dry-exec is only supported on fullnodes".to_string(),
1817 });
1818 }
1819
1820 if transaction.kind().is_system_tx() {
1821 return Err(IotaError::UnsupportedFeature {
1822 error: "dry-exec does not support system transactions".to_string(),
1823 });
1824 }
1825
1826 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1827 }
1828
1829 #[allow(clippy::type_complexity)]
1830 pub fn dry_exec_transaction_for_benchmark(
1831 &self,
1832 transaction: TransactionData,
1833 transaction_digest: TransactionDigest,
1834 ) -> IotaResult<(
1835 DryRunTransactionBlockResponse,
1836 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1837 TransactionEffects,
1838 Option<ObjectID>,
1839 )> {
1840 let epoch_store = self.load_epoch_store_one_call_per_task();
1841 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1842 }
1843
1844 #[instrument(level = "trace", skip_all)]
1845 #[allow(clippy::type_complexity)]
1846 fn dry_exec_transaction_impl(
1847 &self,
1848 epoch_store: &AuthorityPerEpochStore,
1849 transaction: TransactionData,
1850 transaction_digest: TransactionDigest,
1851 ) -> IotaResult<(
1852 DryRunTransactionBlockResponse,
1853 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1854 TransactionEffects,
1855 Option<ObjectID>,
1856 )> {
1857 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1859
1860 let input_object_kinds = transaction.input_objects()?;
1861 let receiving_object_refs = transaction.receiving_objects();
1862
1863 iota_transaction_checks::deny::check_transaction_for_signing(
1864 &transaction,
1865 &[],
1866 &input_object_kinds,
1867 &receiving_object_refs,
1868 &self.config.transaction_deny_config,
1869 self.get_backing_package_store().as_ref(),
1870 )?;
1871
1872 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1873 None,
1875 &input_object_kinds,
1876 &receiving_object_refs,
1877 epoch_store.epoch(),
1878 )?;
1879
1880 let mut transaction = transaction;
1882 let mut gas_object_refs = transaction.gas().to_vec();
1883 let reference_gas_price = epoch_store.reference_gas_price();
1884 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1885 let sender = transaction.gas_owner();
1886 let gas_object_id = ObjectID::random();
1887 let gas_object = Object::new_move(
1888 MoveObject::new_gas_coin(
1889 OBJECT_START_VERSION,
1890 gas_object_id,
1891 SIMULATION_GAS_COIN_VALUE,
1892 ),
1893 Owner::AddressOwner(sender),
1894 TransactionDigest::genesis_marker(),
1895 );
1896 let gas_object_ref = gas_object.compute_object_reference();
1897 gas_object_refs = vec![gas_object_ref];
1898 transaction.gas_data_mut().payment = gas_object_refs.clone();
1900 (
1901 iota_transaction_checks::check_transaction_input_with_given_gas(
1902 epoch_store.protocol_config(),
1903 reference_gas_price,
1904 &transaction,
1905 input_objects,
1906 receiving_objects,
1907 gas_object,
1908 &self.metrics.bytecode_verifier_metrics,
1909 &self.config.verifier_signing_config,
1910 )?,
1911 Some(gas_object_id),
1912 )
1913 } else {
1914 let authenticator_gas_budget = 0;
1917
1918 (
1919 iota_transaction_checks::check_transaction_input(
1920 epoch_store.protocol_config(),
1921 reference_gas_price,
1922 &transaction,
1923 input_objects,
1924 &receiving_objects,
1925 &self.metrics.bytecode_verifier_metrics,
1926 &self.config.verifier_signing_config,
1927 authenticator_gas_budget,
1928 )?,
1929 None,
1930 )
1931 };
1932
1933 let protocol_config = epoch_store.protocol_config();
1934 let (kind, signer, _) = transaction.execution_parts();
1935
1936 let silent = true;
1937 let executor = iota_execution::executor(protocol_config, silent, None)
1938 .expect("Creating an executor should not fail here");
1939
1940 let expensive_checks = false;
1941 let (inner_temp_store, _, effects, execution_error) = executor
1942 .execute_transaction_to_effects(
1943 self.get_backing_store().as_ref(),
1944 protocol_config,
1945 self.metrics.limits_metrics.clone(),
1946 expensive_checks,
1947 self.config.certificate_deny_config.certificate_deny_set(),
1948 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1949 epoch_store
1950 .epoch_start_config()
1951 .epoch_data()
1952 .epoch_start_timestamp(),
1953 checked_input_objects,
1954 gas_object_refs,
1955 gas_status,
1956 kind,
1957 signer,
1958 transaction_digest,
1959 &mut None,
1960 );
1961 let tx_digest = *effects.transaction_digest();
1962
1963 let module_cache =
1964 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1965
1966 let mut layout_resolver =
1967 epoch_store
1968 .executor()
1969 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1970 &inner_temp_store,
1971 self.get_backing_package_store(),
1972 )));
1973 let object_changes = Vec::new();
1975
1976 let balance_changes = Vec::new();
1978
1979 let written_with_kind = effects
1980 .created()
1981 .into_iter()
1982 .map(|(oref, _)| (oref, WriteKind::Create))
1983 .chain(
1984 effects
1985 .unwrapped()
1986 .into_iter()
1987 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1988 )
1989 .chain(
1990 effects
1991 .mutated()
1992 .into_iter()
1993 .map(|(oref, _)| (oref, WriteKind::Mutate)),
1994 )
1995 .map(|(oref, kind)| {
1996 let obj = inner_temp_store.written.get(&oref.0).unwrap();
1997 (oref.0, (oref, obj.clone(), kind))
1999 })
2000 .collect();
2001
2002 let execution_error_source = execution_error
2003 .as_ref()
2004 .err()
2005 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2006
2007 Ok((
2008 DryRunTransactionBlockResponse {
2009 suggested_gas_price: self
2011 .congestion_tracker
2012 .get_prediction_suggested_gas_price(&transaction),
2013 input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
2014 .map_err(|e| IotaError::TransactionSerialization {
2015 error: format!(
2016 "Failed to convert transaction to IotaTransactionBlockData: {e}",
2017 ),
2018 })?, effects: effects.clone().try_into()?,
2020 events: IotaTransactionBlockEvents::try_from(
2021 inner_temp_store.events.clone(),
2022 tx_digest,
2023 None,
2024 layout_resolver.as_mut(),
2025 )?,
2026 object_changes,
2027 balance_changes,
2028 execution_error_source,
2029 },
2030 written_with_kind,
2031 effects,
2032 mock_gas,
2033 ))
2034 }
2035
2036 pub fn simulate_transaction(
2037 &self,
2038 mut transaction: TransactionData,
2039 checks: VmChecks,
2040 ) -> IotaResult<SimulateTransactionResult> {
2041 if transaction.kind().is_system_tx() {
2042 return Err(IotaError::UnsupportedFeature {
2043 error: "simulate does not support system transactions".to_string(),
2044 });
2045 }
2046
2047 let epoch_store = self.load_epoch_store_one_call_per_task();
2048 if !self.is_fullnode(&epoch_store) {
2049 return Err(IotaError::UnsupportedFeature {
2050 error: "simulate is only supported on fullnodes".to_string(),
2051 });
2052 }
2053
2054 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2058
2059 let input_object_kinds = transaction.input_objects()?;
2060 let receiving_object_refs = transaction.receiving_objects();
2061
2062 iota_transaction_checks::deny::check_transaction_for_signing(
2065 &transaction,
2066 &[],
2067 &input_object_kinds,
2068 &receiving_object_refs,
2069 &self.config.transaction_deny_config,
2070 self.get_backing_package_store().as_ref(),
2071 )?;
2072
2073 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2075 None,
2077 &input_object_kinds,
2078 &receiving_object_refs,
2079 epoch_store.epoch(),
2080 )?;
2081
2082 let mock_gas_id = if transaction.gas().is_empty() {
2084 let mock_gas_object = Object::new_move(
2085 MoveObject::new_gas_coin(
2086 OBJECT_START_VERSION,
2087 ObjectID::MAX,
2088 SIMULATION_GAS_COIN_VALUE,
2089 ),
2090 Owner::AddressOwner(transaction.gas_data().owner),
2091 TransactionDigest::genesis_marker(),
2092 );
2093 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2094 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2095 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2096 Some(mock_gas_object.id())
2097 } else {
2098 None
2099 };
2100
2101 let protocol_config = epoch_store.protocol_config();
2102
2103 let authenticator_gas_budget = 0;
2106
2107 let (gas_status, checked_input_objects) = if checks.enabled() {
2110 iota_transaction_checks::check_transaction_input(
2111 protocol_config,
2112 epoch_store.reference_gas_price(),
2113 &transaction,
2114 input_objects,
2115 &receiving_objects,
2116 &self.metrics.bytecode_verifier_metrics,
2117 &self.config.verifier_signing_config,
2118 authenticator_gas_budget,
2119 )?
2120 } else {
2121 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2122 protocol_config,
2123 transaction.kind(),
2124 input_objects,
2125 receiving_objects,
2126 )?;
2127 let gas_status = IotaGasStatus::new(
2128 transaction.gas_budget(),
2129 transaction.gas_price(),
2130 epoch_store.reference_gas_price(),
2131 protocol_config,
2132 )?;
2133
2134 (gas_status, checked_input_objects)
2135 };
2136
2137 let executor = iota_execution::executor(
2139 protocol_config,
2140 true, None,
2142 )
2143 .expect("Creating an executor should not fail here");
2144
2145 let (kind, signer, gas_coins) = transaction.execution_parts();
2147 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2148 self.get_backing_store().as_ref(),
2149 protocol_config,
2150 self.metrics.limits_metrics.clone(),
2151 false, self.config.certificate_deny_config.certificate_deny_set(),
2153 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2154 epoch_store
2155 .epoch_start_config()
2156 .epoch_data()
2157 .epoch_start_timestamp(),
2158 checked_input_objects,
2159 gas_coins,
2160 gas_status,
2161 kind,
2162 signer,
2163 transaction.digest(),
2164 checks.disabled(),
2165 );
2166
2167 Ok(SimulateTransactionResult {
2170 input_objects: inner_temp_store.input_objects,
2171 output_objects: inner_temp_store.written,
2172 events: effects.events_digest().map(|_| inner_temp_store.events),
2173 effects,
2174 execution_result,
2175 suggested_gas_price: self
2176 .congestion_tracker
2177 .get_prediction_suggested_gas_price(&transaction),
2178 mock_gas_id,
2179 })
2180 }
2181
2182 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2187 pub async fn dev_inspect_transaction_block(
2188 &self,
2189 sender: IotaAddress,
2190 transaction_kind: TransactionKind,
2191 gas_price: Option<u64>,
2192 gas_budget: Option<u64>,
2193 gas_sponsor: Option<IotaAddress>,
2194 gas_objects: Option<Vec<ObjectRef>>,
2195 show_raw_txn_data_and_effects: Option<bool>,
2196 skip_checks: Option<bool>,
2197 ) -> IotaResult<DevInspectResults> {
2198 let epoch_store = self.load_epoch_store_one_call_per_task();
2199
2200 if !self.is_fullnode(&epoch_store) {
2201 return Err(IotaError::UnsupportedFeature {
2202 error: "dev-inspect is only supported on fullnodes".to_string(),
2203 });
2204 }
2205
2206 if transaction_kind.is_system_tx() {
2207 return Err(IotaError::UnsupportedFeature {
2208 error: "system transactions are not supported".to_string(),
2209 });
2210 }
2211
2212 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2213 let skip_checks = skip_checks.unwrap_or(true);
2214 let reference_gas_price = epoch_store.reference_gas_price();
2215 let protocol_config = epoch_store.protocol_config();
2216 let max_tx_gas = protocol_config.max_tx_gas();
2217
2218 let price = gas_price.unwrap_or(reference_gas_price);
2219 let budget = gas_budget.unwrap_or(max_tx_gas);
2220 let owner = gas_sponsor.unwrap_or(sender);
2221 let payment = gas_objects.unwrap_or_default();
2224 let transaction = TransactionData::V1(TransactionDataV1 {
2225 kind: transaction_kind.clone(),
2226 sender,
2227 gas_data: GasData {
2228 payment,
2229 owner,
2230 price,
2231 budget,
2232 },
2233 expiration: TransactionExpiration::None,
2234 });
2235
2236 let raw_txn_data = if show_raw_txn_data_and_effects {
2237 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2238 error: "Failed to serialize transaction during dev inspect".to_string(),
2239 })?
2240 } else {
2241 vec![]
2242 };
2243
2244 transaction.validity_check_no_gas_check(protocol_config)?;
2245
2246 let input_object_kinds = transaction.input_objects()?;
2247 let receiving_object_refs = transaction.receiving_objects();
2248
2249 iota_transaction_checks::deny::check_transaction_for_signing(
2250 &transaction,
2251 &[],
2252 &input_object_kinds,
2253 &receiving_object_refs,
2254 &self.config.transaction_deny_config,
2255 self.get_backing_package_store().as_ref(),
2256 )?;
2257
2258 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2259 None,
2261 &input_object_kinds,
2262 &receiving_object_refs,
2263 epoch_store.epoch(),
2264 )?;
2265
2266 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2268 SIMULATION_GAS_COIN_VALUE,
2269 transaction.gas_owner(),
2270 );
2271
2272 let gas_objects = if transaction.gas().is_empty() {
2273 let gas_object_ref = dummy_gas_object.compute_object_reference();
2274 vec![gas_object_ref]
2275 } else {
2276 transaction.gas().to_vec()
2277 };
2278
2279 let (gas_status, checked_input_objects) = if skip_checks {
2280 if transaction.gas().is_empty() {
2285 input_objects.push(ObjectReadResult::new(
2286 InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2287 dummy_gas_object.into(),
2288 ));
2289 }
2290 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2291 protocol_config,
2292 &transaction_kind,
2293 input_objects,
2294 receiving_objects,
2295 )?;
2296 let gas_status = IotaGasStatus::new(
2297 max_tx_gas,
2298 transaction.gas_price(),
2299 reference_gas_price,
2300 protocol_config,
2301 )?;
2302
2303 (gas_status, checked_input_objects)
2304 } else {
2305 if transaction.gas().is_empty() {
2309 iota_transaction_checks::check_transaction_input_with_given_gas(
2310 epoch_store.protocol_config(),
2311 reference_gas_price,
2312 &transaction,
2313 input_objects,
2314 receiving_objects,
2315 dummy_gas_object,
2316 &self.metrics.bytecode_verifier_metrics,
2317 &self.config.verifier_signing_config,
2318 )?
2319 } else {
2320 let authenticator_gas_budget = 0;
2323
2324 iota_transaction_checks::check_transaction_input(
2325 epoch_store.protocol_config(),
2326 reference_gas_price,
2327 &transaction,
2328 input_objects,
2329 &receiving_objects,
2330 &self.metrics.bytecode_verifier_metrics,
2331 &self.config.verifier_signing_config,
2332 authenticator_gas_budget,
2333 )?
2334 }
2335 };
2336
2337 let executor = iota_execution::executor(protocol_config, true, None)
2338 .expect("Creating an executor should not fail here");
2339 let intent_msg = IntentMessage::new(
2340 Intent {
2341 version: IntentVersion::V0,
2342 scope: IntentScope::TransactionData,
2343 app_id: IntentAppId::Iota,
2344 },
2345 transaction,
2346 );
2347 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2348 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2349 self.get_backing_store().as_ref(),
2350 protocol_config,
2351 self.metrics.limits_metrics.clone(),
2352 false,
2354 self.config.certificate_deny_config.certificate_deny_set(),
2355 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2356 epoch_store
2357 .epoch_start_config()
2358 .epoch_data()
2359 .epoch_start_timestamp(),
2360 checked_input_objects,
2361 gas_objects,
2362 gas_status,
2363 transaction_kind,
2364 sender,
2365 transaction_digest,
2366 skip_checks,
2367 );
2368
2369 let raw_effects = if show_raw_txn_data_and_effects {
2370 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2371 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2372 })?
2373 } else {
2374 vec![]
2375 };
2376
2377 let mut layout_resolver =
2378 epoch_store
2379 .executor()
2380 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2381 &inner_temp_store,
2382 self.get_backing_package_store(),
2383 )));
2384
2385 DevInspectResults::new(
2386 effects,
2387 inner_temp_store.events.clone(),
2388 execution_result,
2389 raw_txn_data,
2390 raw_effects,
2391 layout_resolver.as_mut(),
2392 )
2393 }
2394
2395 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2397 let epoch_store = self.epoch_store_for_testing();
2398 Ok(epoch_store.reference_gas_price())
2399 }
2400
2401 #[instrument(level = "trace", skip_all)]
2402 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2403 self.get_transaction_cache_reader()
2404 .try_is_tx_already_executed(digest)
2405 }
2406
2407 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2409 self.try_is_tx_already_executed(digest)
2410 .expect("storage access failed")
2411 }
2412
2413 #[instrument(level = "debug", skip_all, err)]
2415 fn index_tx(
2416 &self,
2417 indexes: &IndexStore,
2418 digest: &TransactionDigest,
2419 cert: &VerifiedExecutableTransaction,
2421 effects: &TransactionEffects,
2422 events: &TransactionEvents,
2423 timestamp_ms: u64,
2424 tx_coins: Option<TxCoins>,
2425 written: &WrittenObjects,
2426 inner_temporary_store: &InnerTemporaryStore,
2427 ) -> IotaResult<u64> {
2428 let changes = self
2429 .process_object_index(effects, written, inner_temporary_store)
2430 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2431
2432 indexes.index_tx(
2433 cert.data().intent_message().value.sender(),
2434 cert.data()
2435 .intent_message()
2436 .value
2437 .input_objects()?
2438 .iter()
2439 .map(|o| o.object_id()),
2440 effects
2441 .all_changed_objects()
2442 .into_iter()
2443 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2444 cert.data()
2445 .intent_message()
2446 .value
2447 .move_calls()
2448 .into_iter()
2449 .map(|(package, module, function)| {
2450 (*package, module.to_owned(), function.to_owned())
2451 }),
2452 events,
2453 changes,
2454 digest,
2455 timestamp_ms,
2456 tx_coins,
2457 )
2458 }
2459
2460 #[cfg(msim)]
2461 fn create_fail_state(
2462 &self,
2463 certificate: &VerifiedExecutableTransaction,
2464 epoch_store: &Arc<AuthorityPerEpochStore>,
2465 effects: &mut TransactionEffects,
2466 ) {
2467 use std::cell::RefCell;
2468 thread_local! {
2469 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2470 }
2471 if !certificate.data().intent_message().value.is_system_tx() {
2472 let committee = epoch_store.committee();
2473 let cur_stake = (**committee).weight(&self.name);
2474 if cur_stake > 0 {
2475 FAIL_STATE.with_borrow_mut(|fail_state| {
2476 if fail_state.0 < committee.validity_threshold() {
2478 fail_state.0 += cur_stake;
2479 fail_state.1.insert(self.name);
2480 }
2481
2482 if fail_state.1.contains(&self.name) {
2483 info!("cp_exec failing tx");
2484 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2485 }
2486 });
2487 }
2488 }
2489 }
2490
2491 fn process_object_index(
2492 &self,
2493 effects: &TransactionEffects,
2494 written: &WrittenObjects,
2495 inner_temporary_store: &InnerTemporaryStore,
2496 ) -> IotaResult<ObjectIndexChanges> {
2497 let epoch_store = self.load_epoch_store_one_call_per_task();
2498 let mut layout_resolver =
2499 epoch_store
2500 .executor()
2501 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2502 inner_temporary_store,
2503 self.get_backing_package_store(),
2504 )));
2505
2506 let modified_at_version = effects
2507 .modified_at_versions()
2508 .into_iter()
2509 .collect::<HashMap<_, _>>();
2510
2511 let tx_digest = effects.transaction_digest();
2512 let mut deleted_owners = vec![];
2513 let mut deleted_dynamic_fields = vec![];
2514 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2515 let old_version = modified_at_version.get(&id).unwrap();
2516 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2519 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2520 ) {
2521 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2522 Owner::ObjectOwner(object_id) => {
2523 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2524 }
2525 _ => {}
2526 }
2527 }
2528
2529 let mut new_owners = vec![];
2530 let mut new_dynamic_fields = vec![];
2531
2532 for (oref, owner, kind) in effects.all_changed_objects() {
2533 let id = &oref.0;
2534 if let WriteKind::Mutate = kind {
2537 let Some(old_version) = modified_at_version.get(id) else {
2538 panic!(
2539 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2540 );
2541 };
2542 let Some(old_object) = self
2545 .get_object_store()
2546 .try_get_object_by_key(id, *old_version)?
2547 else {
2548 panic!(
2549 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2550 );
2551 };
2552 if old_object.owner != owner {
2553 match old_object.owner {
2554 Owner::AddressOwner(addr) => {
2555 deleted_owners.push((addr, *id));
2556 }
2557 Owner::ObjectOwner(object_id) => {
2558 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2559 }
2560 _ => {}
2561 }
2562 }
2563 }
2564
2565 match owner {
2566 Owner::AddressOwner(addr) => {
2567 let new_object = written.get(id).unwrap_or_else(
2570 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2571 );
2572 assert_eq!(
2573 new_object.version(),
2574 oref.1,
2575 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2576 tx_digest,
2577 id,
2578 new_object.version(),
2579 oref.1
2580 );
2581
2582 let type_ = new_object
2583 .type_()
2584 .map(|type_| ObjectType::Struct(type_.clone()))
2585 .unwrap_or(ObjectType::Package);
2586
2587 new_owners.push((
2588 (addr, *id),
2589 ObjectInfo {
2590 object_id: *id,
2591 version: oref.1,
2592 digest: oref.2,
2593 type_,
2594 owner,
2595 previous_transaction: *effects.transaction_digest(),
2596 },
2597 ));
2598 }
2599 Owner::ObjectOwner(owner) => {
2600 let new_object = written.get(id).unwrap_or_else(
2601 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2602 );
2603 assert_eq!(
2604 new_object.version(),
2605 oref.1,
2606 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2607 tx_digest,
2608 id,
2609 new_object.version(),
2610 oref.1
2611 );
2612
2613 let Some(df_info) = self
2614 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2615 .unwrap_or_else(|e| {
2616 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2617 None
2618 }
2619 )
2620 else {
2621 continue;
2623 };
2624 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2625 }
2626 _ => {}
2627 }
2628 }
2629
2630 Ok(ObjectIndexChanges {
2631 deleted_owners,
2632 deleted_dynamic_fields,
2633 new_owners,
2634 new_dynamic_fields,
2635 })
2636 }
2637
2638 fn try_create_dynamic_field_info(
2639 &self,
2640 o: &Object,
2641 written: &WrittenObjects,
2642 resolver: &mut dyn LayoutResolver,
2643 ) -> IotaResult<Option<DynamicFieldInfo>> {
2644 let Some(move_object) = o.data.try_as_move().cloned() else {
2646 return Ok(None);
2647 };
2648
2649 if !move_object.type_().is_dynamic_field() {
2651 return Ok(None);
2652 }
2653
2654 let layout = resolver
2655 .get_annotated_layout(&move_object.type_().clone().into())?
2656 .into_layout();
2657
2658 let field =
2659 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2660 IotaError::ObjectDeserialization {
2661 error: e.to_string(),
2662 }
2663 })?;
2664
2665 let type_ = field.kind;
2666 let name_type: TypeTag = field.name_layout.into();
2667 let bcs_name = field.name_bytes.to_owned();
2668
2669 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2670 .map_err(|e| {
2671 warn!("{e}");
2672 IotaError::ObjectDeserialization {
2673 error: e.to_string(),
2674 }
2675 })?;
2676
2677 let name = DynamicFieldName {
2678 type_: name_type,
2679 value: IotaMoveValue::from(name_value).to_json_value(),
2680 };
2681
2682 let value_metadata = field.value_metadata().map_err(|e| {
2683 warn!("{e}");
2684 IotaError::ObjectDeserialization {
2685 error: e.to_string(),
2686 }
2687 })?;
2688
2689 Ok(Some(match value_metadata {
2690 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2691 name,
2692 bcs_name,
2693 type_,
2694 object_type: object_type.to_canonical_string(true),
2695 object_id: o.id(),
2696 version: o.version(),
2697 digest: o.digest(),
2698 },
2699
2700 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2701 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2706 (
2707 object.version(),
2708 object.digest(),
2709 object.data.type_().unwrap().clone(),
2710 )
2711 } else {
2712 let object = self
2714 .get_object_store()
2715 .try_get_object_by_key(&object_id, o.version())?
2716 .ok_or_else(|| UserInputError::ObjectNotFound {
2717 object_id,
2718 version: Some(o.version()),
2719 })?;
2720 let version = object.version();
2721 let digest = object.digest();
2722 let object_type = object.data.type_().unwrap().clone();
2723 (version, digest, object_type)
2724 };
2725
2726 DynamicFieldInfo {
2727 name,
2728 bcs_name,
2729 type_,
2730 object_type: object_type.to_string(),
2731 object_id,
2732 version,
2733 digest,
2734 }
2735 }
2736 }))
2737 }
2738
2739 #[instrument(level = "trace", skip_all, err)]
2740 fn post_process_one_tx(
2741 &self,
2742 certificate: &VerifiedExecutableTransaction,
2743 effects: &TransactionEffects,
2744 inner_temporary_store: &InnerTemporaryStore,
2745 epoch_store: &Arc<AuthorityPerEpochStore>,
2746 ) -> IotaResult {
2747 if self.indexes.is_none() {
2748 return Ok(());
2749 }
2750
2751 let tx_digest = certificate.digest();
2752 let timestamp_ms = Self::unixtime_now_ms();
2753 let events = &inner_temporary_store.events;
2754 let written = &inner_temporary_store.written;
2755 let tx_coins =
2756 self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2757
2758 if let Some(indexes) = &self.indexes {
2760 let _ = self
2761 .index_tx(
2762 indexes.as_ref(),
2763 tx_digest,
2764 certificate,
2765 effects,
2766 events,
2767 timestamp_ms,
2768 tx_coins,
2769 written,
2770 inner_temporary_store,
2771 )
2772 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2773 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2774 .expect("Indexing tx should not fail");
2775
2776 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2777 let events = self.make_transaction_block_events(
2778 events.clone(),
2779 *tx_digest,
2780 timestamp_ms,
2781 epoch_store,
2782 inner_temporary_store,
2783 )?;
2784 self.subscription_handler
2786 .process_tx(certificate.data().transaction_data(), &effects, &events)
2787 .tap_ok(|_| {
2788 self.metrics
2789 .post_processing_total_tx_had_event_processed
2790 .inc()
2791 })
2792 .tap_err(|e| {
2793 warn!(
2794 ?tx_digest,
2795 "Post processing - Couldn't process events for tx: {}", e
2796 )
2797 })?;
2798
2799 self.metrics
2800 .post_processing_total_events_emitted
2801 .inc_by(events.data.len() as u64);
2802 };
2803 Ok(())
2804 }
2805
2806 fn make_transaction_block_events(
2807 &self,
2808 transaction_events: TransactionEvents,
2809 digest: TransactionDigest,
2810 timestamp_ms: u64,
2811 epoch_store: &Arc<AuthorityPerEpochStore>,
2812 inner_temporary_store: &InnerTemporaryStore,
2813 ) -> IotaResult<IotaTransactionBlockEvents> {
2814 let mut layout_resolver =
2815 epoch_store
2816 .executor()
2817 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2818 inner_temporary_store,
2819 self.get_backing_package_store(),
2820 )));
2821 IotaTransactionBlockEvents::try_from(
2822 transaction_events,
2823 digest,
2824 Some(timestamp_ms),
2825 layout_resolver.as_mut(),
2826 )
2827 }
2828
2829 pub fn unixtime_now_ms() -> u64 {
2830 let now = SystemTime::now()
2831 .duration_since(UNIX_EPOCH)
2832 .expect("Time went backwards")
2833 .as_millis();
2834 u64::try_from(now).expect("Travelling in time machine")
2835 }
2836
2837 #[instrument(level = "trace", skip_all)]
2838 pub async fn handle_transaction_info_request(
2839 &self,
2840 request: TransactionInfoRequest,
2841 ) -> IotaResult<TransactionInfoResponse> {
2842 let epoch_store = self.load_epoch_store_one_call_per_task();
2843 let (transaction, status) = self
2844 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2845 .ok_or(IotaError::TransactionNotFound {
2846 digest: request.transaction_digest,
2847 })?;
2848 Ok(TransactionInfoResponse {
2849 transaction,
2850 status,
2851 })
2852 }
2853
2854 #[instrument(level = "trace", skip_all)]
2855 pub async fn handle_object_info_request(
2856 &self,
2857 request: ObjectInfoRequest,
2858 ) -> IotaResult<ObjectInfoResponse> {
2859 let epoch_store = self.load_epoch_store_one_call_per_task();
2860
2861 let requested_object_seq = match request.request_kind {
2862 ObjectInfoRequestKind::LatestObjectInfo => {
2863 let (_, seq, _) = self
2864 .try_get_object_or_tombstone(request.object_id)
2865 .await?
2866 .ok_or_else(|| {
2867 IotaError::from(UserInputError::ObjectNotFound {
2868 object_id: request.object_id,
2869 version: None,
2870 })
2871 })?;
2872 seq
2873 }
2874 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2875 };
2876
2877 let object = self
2878 .get_object_store()
2879 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2880 .ok_or_else(|| {
2881 IotaError::from(UserInputError::ObjectNotFound {
2882 object_id: request.object_id,
2883 version: Some(requested_object_seq),
2884 })
2885 })?;
2886
2887 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2888 (request.generate_layout, object.data.try_as_move())
2889 {
2890 Some(into_struct_layout(
2891 epoch_store
2892 .executor()
2893 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2894 .get_annotated_layout(&move_obj.type_().clone().into())?,
2895 )?)
2896 } else {
2897 None
2898 };
2899
2900 let lock = if !object.is_address_owned() {
2901 None
2903 } else {
2904 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2905 .await?
2906 .map(|s| s.into_inner())
2907 };
2908
2909 Ok(ObjectInfoResponse {
2910 object,
2911 layout,
2912 lock_for_debugging: lock,
2913 })
2914 }
2915
2916 #[instrument(level = "trace", skip_all)]
2917 pub fn handle_checkpoint_request(
2918 &self,
2919 request: &CheckpointRequest,
2920 ) -> IotaResult<CheckpointResponse> {
2921 let summary = if request.certified {
2922 let summary = match request.sequence_number {
2923 Some(seq) => self
2924 .checkpoint_store
2925 .get_checkpoint_by_sequence_number(seq)?,
2926 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2927 }
2928 .map(|v| v.into_inner());
2929 summary.map(CheckpointSummaryResponse::Certified)
2930 } else {
2931 let summary = match request.sequence_number {
2932 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2933 None => self
2934 .checkpoint_store
2935 .get_latest_locally_computed_checkpoint()?,
2936 };
2937 summary.map(CheckpointSummaryResponse::Pending)
2938 };
2939 let contents = match &summary {
2940 Some(s) => self
2941 .checkpoint_store
2942 .get_checkpoint_contents(&s.content_digest())?,
2943 None => None,
2944 };
2945 Ok(CheckpointResponse {
2946 checkpoint: summary,
2947 contents,
2948 })
2949 }
2950
2951 fn check_protocol_version(
2952 supported_protocol_versions: SupportedProtocolVersions,
2953 current_version: ProtocolVersion,
2954 ) {
2955 info!("current protocol version is now {:?}", current_version);
2956 info!("supported versions are: {:?}", supported_protocol_versions);
2957 if !supported_protocol_versions.is_version_supported(current_version) {
2958 let msg = format!(
2959 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2960 );
2961
2962 error!("{}", msg);
2963 eprintln!("{msg}");
2964
2965 #[cfg(not(msim))]
2966 std::process::exit(1);
2967
2968 #[cfg(msim)]
2969 iota_simulator::task::shutdown_current_node();
2970 }
2971 }
2972
2973 #[expect(clippy::disallowed_methods)] pub async fn new(
2975 name: AuthorityName,
2976 secret: StableSyncAuthoritySigner,
2977 supported_protocol_versions: SupportedProtocolVersions,
2978 store: Arc<AuthorityStore>,
2979 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2980 epoch_store: Arc<AuthorityPerEpochStore>,
2981 committee_store: Arc<CommitteeStore>,
2982 indexes: Option<Arc<IndexStore>>,
2983 rest_index: Option<Arc<RestIndexStore>>,
2984 checkpoint_store: Arc<CheckpointStore>,
2985 prometheus_registry: &Registry,
2986 genesis_objects: &[Object],
2987 db_checkpoint_config: &DBCheckpointConfig,
2988 config: NodeConfig,
2989 archive_readers: ArchiveReaderBalancer,
2990 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2991 chain_identifier: ChainIdentifier,
2992 pruner_db: Option<Arc<AuthorityPrunerTables>>,
2993 ) -> Arc<Self> {
2994 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2995
2996 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2997 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2998 let transaction_manager = Arc::new(TransactionManager::new(
2999 execution_cache_trait_pointers.object_cache_reader.clone(),
3000 execution_cache_trait_pointers
3001 .transaction_cache_reader
3002 .clone(),
3003 &epoch_store,
3004 tx_ready_certificates,
3005 metrics.clone(),
3006 ));
3007 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3008
3009 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3010 epoch_store.get_parent_path(),
3011 &config.authority_store_pruning_config,
3012 );
3013 let _pruner = AuthorityStorePruner::new(
3014 store.perpetual_tables.clone(),
3015 checkpoint_store.clone(),
3016 rest_index.clone(),
3017 indexes.clone(),
3018 config.authority_store_pruning_config.clone(),
3019 epoch_store.committee().authority_exists(&name),
3020 epoch_store.epoch_start_state().epoch_duration_ms(),
3021 prometheus_registry,
3022 archive_readers,
3023 pruner_db,
3024 );
3025 let input_loader =
3026 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3027 let epoch = epoch_store.epoch();
3028 let rgp = epoch_store.reference_gas_price();
3029 let state = Arc::new(AuthorityState {
3030 name,
3031 secret,
3032 execution_lock: RwLock::new(epoch),
3033 epoch_store: ArcSwap::new(epoch_store.clone()),
3034 input_loader,
3035 execution_cache_trait_pointers,
3036 indexes,
3037 rest_index,
3038 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3039 checkpoint_store,
3040 committee_store,
3041 transaction_manager,
3042 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3043 metrics,
3044 _pruner,
3045 _authority_per_epoch_pruner,
3046 db_checkpoint_config: db_checkpoint_config.clone(),
3047 config,
3048 overload_info: AuthorityOverloadInfo::default(),
3049 validator_tx_finalizer,
3050 chain_identifier,
3051 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3052 });
3053
3054 let authority_state = Arc::downgrade(&state);
3056 spawn_monitored_task!(execution_process(
3057 authority_state,
3058 rx_ready_certificates,
3059 rx_execution_shutdown,
3060 ));
3061
3062 state
3064 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3065 .expect("Error indexing genesis objects.");
3066
3067 state
3068 }
3069
3070 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3072 &self.execution_cache_trait_pointers.object_cache_reader
3073 }
3074
3075 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3076 &self.execution_cache_trait_pointers.transaction_cache_reader
3077 }
3078
3079 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3080 &self.execution_cache_trait_pointers.cache_writer
3081 }
3082
3083 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3084 &self.execution_cache_trait_pointers.backing_store
3085 }
3086
3087 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3088 &self.execution_cache_trait_pointers.backing_package_store
3089 }
3090
3091 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3092 &self.execution_cache_trait_pointers.object_store
3093 }
3094
3095 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3096 &self.execution_cache_trait_pointers.reconfig_api
3097 }
3098
3099 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3100 &self.execution_cache_trait_pointers.accumulator_store
3101 }
3102
3103 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3104 &self.execution_cache_trait_pointers.checkpoint_cache
3105 }
3106
3107 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3108 &self.execution_cache_trait_pointers.state_sync_store
3109 }
3110
3111 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3112 &self.execution_cache_trait_pointers.cache_commit
3113 }
3114
3115 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3116 self.execution_cache_trait_pointers
3117 .testing_api
3118 .database_for_testing()
3119 }
3120
3121 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3122 &self,
3123 config: NodeConfig,
3124 metrics: Arc<AuthorityStorePruningMetrics>,
3125 ) -> anyhow::Result<()> {
3126 let archive_readers =
3127 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3128 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3129 &self.database_for_testing().perpetual_tables,
3130 &self.checkpoint_store,
3131 self.rest_index.as_deref(),
3132 None,
3133 config.authority_store_pruning_config,
3134 metrics,
3135 archive_readers,
3136 EPOCH_DURATION_MS_FOR_TESTING,
3137 )
3138 .await
3139 }
3140
3141 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3142 &self.transaction_manager
3143 }
3144
3145 pub fn enqueue_transactions_for_execution(
3148 &self,
3149 txns: Vec<VerifiedExecutableTransaction>,
3150 epoch_store: &Arc<AuthorityPerEpochStore>,
3151 ) {
3152 self.transaction_manager.enqueue(txns, epoch_store)
3153 }
3154
3155 pub fn enqueue_certificates_for_execution(
3157 &self,
3158 certs: Vec<VerifiedCertificate>,
3159 epoch_store: &Arc<AuthorityPerEpochStore>,
3160 ) {
3161 self.transaction_manager
3162 .enqueue_certificates(certs, epoch_store)
3163 }
3164
3165 pub fn enqueue_with_expected_effects_digest(
3166 &self,
3167 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3168 epoch_store: &AuthorityPerEpochStore,
3169 ) {
3170 self.transaction_manager
3171 .enqueue_with_expected_effects_digest(certs, epoch_store)
3172 }
3173
3174 fn create_owner_index_if_empty(
3175 &self,
3176 genesis_objects: &[Object],
3177 epoch_store: &Arc<AuthorityPerEpochStore>,
3178 ) -> IotaResult {
3179 let Some(index_store) = &self.indexes else {
3180 return Ok(());
3181 };
3182 if !index_store.is_empty() {
3183 return Ok(());
3184 }
3185
3186 let mut new_owners = vec![];
3187 let mut new_dynamic_fields = vec![];
3188 let mut layout_resolver = epoch_store
3189 .executor()
3190 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3191 for o in genesis_objects.iter() {
3192 match o.owner {
3193 Owner::AddressOwner(addr) => new_owners.push((
3194 (addr, o.id()),
3195 ObjectInfo::new(&o.compute_object_reference(), o),
3196 )),
3197 Owner::ObjectOwner(object_id) => {
3198 let id = o.id();
3199 let info = match self.try_create_dynamic_field_info(
3200 o,
3201 &BTreeMap::new(),
3202 layout_resolver.as_mut(),
3203 ) {
3204 Ok(Some(info)) => info,
3205 Ok(None) => continue,
3206 Err(IotaError::UserInput {
3207 error:
3208 UserInputError::ObjectNotFound {
3209 object_id: not_found_id,
3210 version,
3211 },
3212 }) => {
3213 warn!(
3214 ?not_found_id,
3215 ?version,
3216 object_owner=?object_id,
3217 field=?id,
3218 "Skipping dynamic field: referenced genesis object not found"
3219 );
3220 continue;
3221 }
3222 Err(e) => return Err(e),
3223 };
3224 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3225 }
3226 _ => {}
3227 }
3228 }
3229
3230 index_store.insert_genesis_objects(ObjectIndexChanges {
3231 deleted_owners: vec![],
3232 deleted_dynamic_fields: vec![],
3233 new_owners,
3234 new_dynamic_fields,
3235 })
3236 }
3237
3238 pub fn execution_lock_for_executable_transaction(
3242 &self,
3243 transaction: &VerifiedExecutableTransaction,
3244 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3245 let lock = self
3246 .execution_lock
3247 .try_read()
3248 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3249 if *lock == transaction.auth_sig().epoch() {
3250 Ok(lock)
3251 } else {
3252 Err(IotaError::WrongEpoch {
3253 expected_epoch: *lock,
3254 actual_epoch: transaction.auth_sig().epoch(),
3255 })
3256 }
3257 }
3258
3259 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3265 self.execution_lock
3266 .try_read()
3267 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3268 }
3269
3270 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3271 self.execution_lock.write().await
3272 }
3273
3274 #[instrument(level = "error", skip_all)]
3275 pub async fn reconfigure(
3276 &self,
3277 cur_epoch_store: &AuthorityPerEpochStore,
3278 supported_protocol_versions: SupportedProtocolVersions,
3279 new_committee: Committee,
3280 epoch_start_configuration: EpochStartConfiguration,
3281 accumulator: Arc<StateAccumulator>,
3282 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3283 epoch_supply_change: i64,
3284 epoch_last_checkpoint: CheckpointSequenceNumber,
3285 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3286 Self::check_protocol_version(
3287 supported_protocol_versions,
3288 epoch_start_configuration
3289 .epoch_start_state()
3290 .protocol_version(),
3291 );
3292 self.metrics.reset_on_reconfigure();
3293 self.committee_store.insert_new_committee(&new_committee)?;
3294
3295 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3297
3298 cur_epoch_store.epoch_terminated().await;
3300
3301 let highest_locally_built_checkpoint_seq = self
3302 .checkpoint_store
3303 .get_latest_locally_computed_checkpoint()?
3304 .map(|c| *c.sequence_number())
3305 .unwrap_or(0);
3306
3307 assert!(
3308 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3309 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3310 );
3311 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3312 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3316 if num_shared_version_assignments > 1 {
3319 debug_fatal!(
3321 "all shared_version_assignments should have been removed \
3322 (num_shared_version_assignments: {num_shared_version_assignments})"
3323 );
3324 }
3325 }
3326
3327 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3333 .await?;
3334 self.get_reconfig_api()
3335 .clear_state_end_of_epoch(&execution_lock);
3336 self.check_system_consistency(
3337 cur_epoch_store,
3338 accumulator,
3339 expensive_safety_check_config,
3340 epoch_supply_change,
3341 )?;
3342 self.get_reconfig_api()
3343 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3344 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3345 if self
3346 .db_checkpoint_config
3347 .perform_db_checkpoints_at_epoch_end
3348 {
3349 let checkpoint_indexes = self
3350 .db_checkpoint_config
3351 .perform_index_db_checkpoints_at_epoch_end
3352 .unwrap_or(false);
3353 let current_epoch = cur_epoch_store.epoch();
3354 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3355 self.checkpoint_all_dbs(
3356 &epoch_checkpoint_path,
3357 cur_epoch_store,
3358 checkpoint_indexes,
3359 )?;
3360 }
3361 }
3362
3363 self.get_reconfig_api()
3364 .reconfigure_cache(&epoch_start_configuration)
3365 .await;
3366
3367 let new_epoch = new_committee.epoch;
3368 let new_epoch_store = self
3369 .reopen_epoch_db(
3370 cur_epoch_store,
3371 new_committee,
3372 epoch_start_configuration,
3373 expensive_safety_check_config,
3374 epoch_last_checkpoint,
3375 )
3376 .await?;
3377 assert_eq!(new_epoch_store.epoch(), new_epoch);
3378 self.transaction_manager.reconfigure(new_epoch);
3379 *execution_lock = new_epoch;
3380 Ok(new_epoch_store)
3384 }
3385
3386 pub async fn reconfigure_for_testing(&self) {
3391 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3392 let epoch_store = self.epoch_store_for_testing().clone();
3393 let protocol_config = epoch_store.protocol_config().clone();
3394 let _guard =
3402 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3403 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3404 self.get_backing_package_store().clone(),
3405 self.get_object_store().clone(),
3406 &self.config.expensive_safety_check_config,
3407 self.checkpoint_store
3408 .get_epoch_last_checkpoint(epoch_store.epoch())
3409 .unwrap()
3410 .map(|c| *c.sequence_number())
3411 .unwrap_or_default(),
3412 );
3413 let new_epoch = new_epoch_store.epoch();
3414 self.transaction_manager.reconfigure(new_epoch);
3415 self.epoch_store.store(new_epoch_store);
3416 epoch_store.epoch_terminated().await;
3417 *execution_lock = new_epoch;
3418 }
3419
3420 #[instrument(level = "error", skip_all)]
3421 fn check_system_consistency(
3422 &self,
3423 cur_epoch_store: &AuthorityPerEpochStore,
3424 accumulator: Arc<StateAccumulator>,
3425 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3426 epoch_supply_change: i64,
3427 ) -> IotaResult<()> {
3428 info!(
3429 "Performing iota conservation consistency check for epoch {}",
3430 cur_epoch_store.epoch()
3431 );
3432
3433 if cfg!(debug_assertions) {
3434 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3435 }
3436
3437 self.get_reconfig_api()
3438 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3439
3440 if expensive_safety_check_config.enable_state_consistency_check() {
3442 info!(
3443 "Performing state consistency check for epoch {}",
3444 cur_epoch_store.epoch()
3445 );
3446 self.expensive_check_is_consistent_state(
3447 accumulator,
3448 cur_epoch_store,
3449 cfg!(debug_assertions), );
3451 }
3452
3453 if expensive_safety_check_config.enable_secondary_index_checks() {
3454 if let Some(indexes) = self.indexes.clone() {
3455 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3456 .expect("secondary indexes are inconsistent");
3457 }
3458 }
3459
3460 Ok(())
3461 }
3462
3463 fn expensive_check_is_consistent_state(
3464 &self,
3465 accumulator: Arc<StateAccumulator>,
3466 cur_epoch_store: &AuthorityPerEpochStore,
3467 panic: bool,
3468 ) {
3469 let live_object_set_hash = accumulator.digest_live_object_set();
3470
3471 let root_state_hash: ECMHLiveObjectSetDigest = self
3472 .get_accumulator_store()
3473 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3474 .expect("Retrieving root state hash cannot fail")
3475 .expect("Root state hash for epoch must exist")
3476 .1
3477 .digest()
3478 .into();
3479
3480 let is_inconsistent = root_state_hash != live_object_set_hash;
3481 if is_inconsistent {
3482 if panic {
3483 panic!(
3484 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3485 );
3486 } else {
3487 error!(
3488 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3489 root_state_hash, live_object_set_hash
3490 );
3491 }
3492 } else {
3493 info!("State consistency check passed");
3494 }
3495
3496 if !panic {
3497 accumulator.set_inconsistent_state(is_inconsistent);
3498 }
3499 }
3500
3501 pub fn current_epoch_for_testing(&self) -> EpochId {
3502 self.epoch_store_for_testing().epoch()
3503 }
3504
3505 #[instrument(level = "error", skip_all)]
3506 pub fn checkpoint_all_dbs(
3507 &self,
3508 checkpoint_path: &Path,
3509 cur_epoch_store: &AuthorityPerEpochStore,
3510 checkpoint_indexes: bool,
3511 ) -> IotaResult {
3512 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3513 let current_epoch = cur_epoch_store.epoch();
3514
3515 if checkpoint_path.exists() {
3516 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3517 return Ok(());
3518 }
3519
3520 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3521 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3522
3523 if checkpoint_path_tmp.exists() {
3524 fs::remove_dir_all(&checkpoint_path_tmp)
3525 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3526 }
3527
3528 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3529 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3530
3531 self.checkpoint_store
3534 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3535
3536 self.get_reconfig_api()
3537 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3538
3539 self.committee_store
3540 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3541
3542 if checkpoint_indexes {
3543 if let Some(indexes) = self.indexes.as_ref() {
3544 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3545 }
3546 if let Some(rest_index) = self.rest_index.as_ref() {
3547 rest_index.checkpoint_db(&checkpoint_path_tmp.join("grpc_indexes"))?;
3548 }
3549 }
3550
3551 fs::rename(checkpoint_path_tmp, checkpoint_path)
3552 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3553 Ok(())
3554 }
3555
3556 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3563 self.epoch_store.load()
3564 }
3565
3566 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3568 self.load_epoch_store_one_call_per_task()
3569 }
3570
3571 pub fn clone_committee_for_testing(&self) -> Committee {
3572 Committee::clone(self.epoch_store_for_testing().committee())
3573 }
3574
3575 #[instrument(level = "trace", skip_all)]
3576 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3577 self.get_object_store()
3578 .try_get_object(object_id)
3579 .map_err(Into::into)
3580 }
3581
3582 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3584 self.try_get_object(object_id)
3585 .await
3586 .expect("storage access failed")
3587 }
3588
3589 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3590 Ok(self
3591 .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3592 .await?
3593 .expect("framework object should always exist")
3594 .compute_object_reference())
3595 }
3596
3597 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3599 self.get_object_cache_reader()
3600 .try_get_iota_system_state_object_unsafe()
3601 }
3602
3603 #[instrument(level = "trace", skip_all)]
3604 pub fn get_checkpoint_by_sequence_number(
3605 &self,
3606 sequence_number: CheckpointSequenceNumber,
3607 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3608 Ok(self
3609 .checkpoint_store
3610 .get_checkpoint_by_sequence_number(sequence_number)?)
3611 }
3612
3613 #[instrument(level = "trace", skip_all)]
3614 pub fn get_transaction_checkpoint_for_tests(
3615 &self,
3616 digest: &TransactionDigest,
3617 epoch_store: &AuthorityPerEpochStore,
3618 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3619 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3620 let Some(checkpoint) = checkpoint else {
3621 return Ok(None);
3622 };
3623 let checkpoint = self
3624 .checkpoint_store
3625 .get_checkpoint_by_sequence_number(checkpoint)?;
3626 Ok(checkpoint)
3627 }
3628
3629 #[instrument(level = "trace", skip_all)]
3630 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3631 Ok(
3632 match self
3633 .get_object_cache_reader()
3634 .try_get_latest_object_or_tombstone(*object_id)?
3635 {
3636 Some((_, ObjectOrTombstone::Object(object))) => {
3637 let layout = self.get_object_layout(&object)?;
3638 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3639 }
3640 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3641 None => ObjectRead::NotExists(*object_id),
3642 },
3643 )
3644 }
3645
3646 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3648 self.chain_identifier
3649 }
3650
3651 #[instrument(level = "trace", skip_all)]
3652 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3653 where
3654 T: DeserializeOwned,
3655 {
3656 let o = self.get_object_read(object_id)?.into_object()?;
3657 if let Some(move_object) = o.data.try_as_move() {
3658 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3659 IotaError::ObjectDeserialization {
3660 error: format!("{e}"),
3661 }
3662 })?)
3663 } else {
3664 Err(IotaError::ObjectDeserialization {
3665 error: format!("Provided object : [{object_id}] is not a Move object."),
3666 })
3667 }
3668 }
3669
3670 #[instrument(level = "trace", skip_all)]
3676 pub fn get_past_object_read(
3677 &self,
3678 object_id: &ObjectID,
3679 version: SequenceNumber,
3680 ) -> IotaResult<PastObjectRead> {
3681 let Some(obj_ref) = self
3683 .get_object_cache_reader()
3684 .try_get_latest_object_ref_or_tombstone(*object_id)?
3685 else {
3686 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3687 };
3688
3689 if version > obj_ref.1 {
3690 return Ok(PastObjectRead::VersionTooHigh {
3691 object_id: *object_id,
3692 asked_version: version,
3693 latest_version: obj_ref.1,
3694 });
3695 }
3696
3697 if version < obj_ref.1 {
3698 return Ok(match self.read_object_at_version(object_id, version)? {
3700 Some((object, layout)) => {
3701 let obj_ref = object.compute_object_reference();
3702 PastObjectRead::VersionFound(obj_ref, object, layout)
3703 }
3704
3705 None => PastObjectRead::VersionNotFound(*object_id, version),
3706 });
3707 }
3708
3709 if !obj_ref.2.is_alive() {
3710 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3711 }
3712
3713 match self.read_object_at_version(object_id, obj_ref.1)? {
3714 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3715 None => {
3716 error!(
3717 "Object with in parent_entry is missing from object store, datastore is \
3718 inconsistent",
3719 );
3720 Err(UserInputError::ObjectNotFound {
3721 object_id: *object_id,
3722 version: Some(obj_ref.1),
3723 }
3724 .into())
3725 }
3726 }
3727 }
3728
3729 #[instrument(level = "trace", skip_all)]
3730 fn read_object_at_version(
3731 &self,
3732 object_id: &ObjectID,
3733 version: SequenceNumber,
3734 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3735 let Some(object) = self
3736 .get_object_cache_reader()
3737 .try_get_object_by_key(object_id, version)?
3738 else {
3739 return Ok(None);
3740 };
3741
3742 let layout = self.get_object_layout(&object)?;
3743 Ok(Some((object, layout)))
3744 }
3745
3746 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3747 let layout = object
3748 .data
3749 .try_as_move()
3750 .map(|object| {
3751 into_struct_layout(
3752 self.load_epoch_store_one_call_per_task()
3753 .executor()
3754 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3756 .get_annotated_layout(&object.type_().clone().into())?,
3757 )
3758 })
3759 .transpose()?;
3760 Ok(layout)
3761 }
3762
3763 fn get_owner_at_version(
3764 &self,
3765 object_id: &ObjectID,
3766 version: SequenceNumber,
3767 ) -> IotaResult<Owner> {
3768 self.get_object_store()
3769 .try_get_object_by_key(object_id, version)?
3770 .ok_or_else(|| {
3771 IotaError::from(UserInputError::ObjectNotFound {
3772 object_id: *object_id,
3773 version: Some(version),
3774 })
3775 })
3776 .map(|o| o.owner)
3777 }
3778
3779 #[instrument(level = "trace", skip_all)]
3780 pub fn get_owner_objects(
3781 &self,
3782 owner: IotaAddress,
3783 cursor: Option<ObjectID>,
3785 limit: usize,
3786 filter: Option<IotaObjectDataFilter>,
3787 ) -> IotaResult<Vec<ObjectInfo>> {
3788 if let Some(indexes) = &self.indexes {
3789 indexes.get_owner_objects(owner, cursor, limit, filter)
3790 } else {
3791 Err(IotaError::IndexStoreNotAvailable)
3792 }
3793 }
3794
3795 #[instrument(level = "trace", skip_all)]
3796 pub fn get_owned_coins_iterator_with_cursor(
3797 &self,
3798 owner: IotaAddress,
3799 cursor: (String, ObjectID),
3801 limit: usize,
3802 one_coin_type_only: bool,
3803 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3804 if let Some(indexes) = &self.indexes {
3805 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3806 } else {
3807 Err(IotaError::IndexStoreNotAvailable)
3808 }
3809 }
3810
3811 #[instrument(level = "trace", skip_all)]
3812 pub fn get_owner_objects_iterator(
3813 &self,
3814 owner: IotaAddress,
3815 cursor: Option<ObjectID>,
3817 filter: Option<IotaObjectDataFilter>,
3818 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3819 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3820 if let Some(indexes) = &self.indexes {
3821 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3822 } else {
3823 Err(IotaError::IndexStoreNotAvailable)
3824 }
3825 }
3826
3827 #[instrument(level = "trace", skip_all)]
3828 pub async fn get_move_objects<T>(
3829 &self,
3830 owner: IotaAddress,
3831 type_: MoveObjectType,
3832 ) -> IotaResult<Vec<T>>
3833 where
3834 T: DeserializeOwned,
3835 {
3836 let object_ids = self
3837 .get_owner_objects_iterator(owner, None, None)?
3838 .filter(|o| match &o.type_ {
3839 ObjectType::Struct(s) => &type_ == s,
3840 ObjectType::Package => false,
3841 })
3842 .map(|info| ObjectKey(info.object_id, info.version))
3843 .collect::<Vec<_>>();
3844 let mut move_objects = vec![];
3845
3846 let objects = self
3847 .get_object_store()
3848 .try_multi_get_objects_by_key(&object_ids)?;
3849
3850 for (o, id) in objects.into_iter().zip(object_ids) {
3851 let object = o.ok_or_else(|| {
3852 IotaError::from(UserInputError::ObjectNotFound {
3853 object_id: id.0,
3854 version: Some(id.1),
3855 })
3856 })?;
3857 let move_object = object.data.try_as_move().ok_or_else(|| {
3858 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3859 })?;
3860 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3861 IotaError::ObjectDeserialization {
3862 error: format!("{e}"),
3863 }
3864 })?);
3865 }
3866 Ok(move_objects)
3867 }
3868
3869 #[instrument(level = "trace", skip_all)]
3870 pub fn get_dynamic_fields(
3871 &self,
3872 owner: ObjectID,
3873 cursor: Option<ObjectID>,
3875 limit: usize,
3876 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3877 Ok(self
3878 .get_dynamic_fields_iterator(owner, cursor)?
3879 .take(limit)
3880 .collect::<Result<Vec<_>, _>>()?)
3881 }
3882
3883 fn get_dynamic_fields_iterator(
3884 &self,
3885 owner: ObjectID,
3886 cursor: Option<ObjectID>,
3888 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3889 {
3890 if let Some(indexes) = &self.indexes {
3891 indexes.get_dynamic_fields_iterator(owner, cursor)
3892 } else {
3893 Err(IotaError::IndexStoreNotAvailable)
3894 }
3895 }
3896
3897 #[instrument(level = "trace", skip_all)]
3898 pub fn get_dynamic_field_object_id(
3899 &self,
3900 owner: ObjectID,
3901 name_type: TypeTag,
3902 name_bcs_bytes: &[u8],
3903 ) -> IotaResult<Option<ObjectID>> {
3904 if let Some(indexes) = &self.indexes {
3905 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3906 } else {
3907 Err(IotaError::IndexStoreNotAvailable)
3908 }
3909 }
3910
3911 #[instrument(level = "trace", skip_all)]
3912 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3913 Ok(self.get_indexes()?.next_sequence_number())
3914 }
3915
3916 #[instrument(level = "trace", skip_all)]
3917 pub async fn get_executed_transaction_and_effects(
3918 &self,
3919 digest: TransactionDigest,
3920 kv_store: Arc<TransactionKeyValueStore>,
3921 ) -> IotaResult<(Transaction, TransactionEffects)> {
3922 let transaction = kv_store.get_tx(digest).await?;
3923 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3924 Ok((transaction, effects))
3925 }
3926
3927 #[instrument(level = "trace", skip_all)]
3928 pub fn multi_get_checkpoint_by_sequence_number(
3929 &self,
3930 sequence_numbers: &[CheckpointSequenceNumber],
3931 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3932 Ok(self
3933 .checkpoint_store
3934 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3935 }
3936
3937 #[instrument(level = "trace", skip_all)]
3938 pub fn get_transaction_events(
3939 &self,
3940 digest: &TransactionEventsDigest,
3941 ) -> IotaResult<TransactionEvents> {
3942 self.get_transaction_cache_reader()
3943 .try_get_events(digest)?
3944 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3945 }
3946
3947 pub fn get_transaction_input_objects(
3948 &self,
3949 effects: &TransactionEffects,
3950 ) -> anyhow::Result<Vec<Object>> {
3951 iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
3952 .map_err(Into::into)
3953 }
3954
3955 pub fn get_transaction_output_objects(
3956 &self,
3957 effects: &TransactionEffects,
3958 ) -> anyhow::Result<Vec<Object>> {
3959 iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
3960 .map_err(Into::into)
3961 }
3962
3963 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3964 match &self.indexes {
3965 Some(i) => Ok(i.clone()),
3966 None => Err(IotaError::UnsupportedFeature {
3967 error: "extended object indexing is not enabled on this server".into(),
3968 }),
3969 }
3970 }
3971
3972 pub async fn get_transactions_for_tests(
3973 self: &Arc<Self>,
3974 filter: Option<TransactionFilter>,
3975 cursor: Option<TransactionDigest>,
3976 limit: Option<usize>,
3977 reverse: bool,
3978 ) -> IotaResult<Vec<TransactionDigest>> {
3979 let metrics = KeyValueStoreMetrics::new_for_tests();
3980 let kv_store = Arc::new(TransactionKeyValueStore::new(
3981 "rocksdb",
3982 metrics,
3983 self.clone(),
3984 ));
3985 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3986 .await
3987 }
3988
3989 #[instrument(level = "trace", skip_all)]
3990 pub async fn get_transactions(
3991 &self,
3992 kv_store: &Arc<TransactionKeyValueStore>,
3993 filter: Option<TransactionFilter>,
3994 cursor: Option<TransactionDigest>,
3996 limit: Option<usize>,
3997 reverse: bool,
3998 ) -> IotaResult<Vec<TransactionDigest>> {
3999 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4000 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4001 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4002 if reverse {
4003 let iter = iter
4004 .rev()
4005 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4006 .skip(usize::from(cursor.is_some()));
4007 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4008 } else {
4009 let iter = iter
4010 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4011 .skip(usize::from(cursor.is_some()));
4012 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4013 }
4014 }
4015 self.get_indexes()?
4016 .get_transactions(filter, cursor, limit, reverse)
4017 }
4018
4019 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4020 &self.checkpoint_store
4021 }
4022
4023 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4024 self.get_checkpoint_store()
4025 .get_highest_executed_checkpoint_seq_number()?
4026 .ok_or(IotaError::UserInput {
4027 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4028 })
4029 }
4030
4031 #[cfg(msim)]
4032 pub fn get_highest_pruned_checkpoint_for_testing(
4033 &self,
4034 ) -> IotaResult<CheckpointSequenceNumber> {
4035 self.database_for_testing()
4036 .perpetual_tables
4037 .get_highest_pruned_checkpoint()
4038 .map(|c| c.unwrap_or(0))
4039 .map_err(Into::into)
4040 }
4041
4042 #[instrument(level = "trace", skip_all)]
4043 pub fn get_checkpoint_summary_by_sequence_number(
4044 &self,
4045 sequence_number: CheckpointSequenceNumber,
4046 ) -> IotaResult<CheckpointSummary> {
4047 let verified_checkpoint = self
4048 .get_checkpoint_store()
4049 .get_checkpoint_by_sequence_number(sequence_number)?;
4050 match verified_checkpoint {
4051 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4052 None => Err(IotaError::UserInput {
4053 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4054 }),
4055 }
4056 }
4057
4058 #[instrument(level = "trace", skip_all)]
4059 pub fn get_checkpoint_summary_by_digest(
4060 &self,
4061 digest: CheckpointDigest,
4062 ) -> IotaResult<CheckpointSummary> {
4063 let verified_checkpoint = self
4064 .get_checkpoint_store()
4065 .get_checkpoint_by_digest(&digest)?;
4066 match verified_checkpoint {
4067 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4068 None => Err(IotaError::UserInput {
4069 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4070 }),
4071 }
4072 }
4073
4074 #[instrument(level = "trace", skip_all)]
4075 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4076 if is_system_package(package_id) {
4077 return self.find_genesis_txn_digest();
4078 }
4079 Ok(self
4080 .get_object_read(&package_id)?
4081 .into_object()?
4082 .previous_transaction)
4083 }
4084
4085 #[instrument(level = "trace", skip_all)]
4086 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4087 let summary = self
4088 .get_verified_checkpoint_by_sequence_number(0)?
4089 .into_message();
4090 let content = self.get_checkpoint_contents(summary.content_digest)?;
4091 let genesis_transaction = content.enumerate_transactions(&summary).next();
4092 Ok(genesis_transaction
4093 .ok_or(IotaError::UserInput {
4094 error: UserInputError::GenesisTransactionNotFound,
4095 })?
4096 .1
4097 .transaction)
4098 }
4099
4100 #[instrument(level = "trace", skip_all)]
4101 pub fn get_verified_checkpoint_by_sequence_number(
4102 &self,
4103 sequence_number: CheckpointSequenceNumber,
4104 ) -> IotaResult<VerifiedCheckpoint> {
4105 let verified_checkpoint = self
4106 .get_checkpoint_store()
4107 .get_checkpoint_by_sequence_number(sequence_number)?;
4108 match verified_checkpoint {
4109 Some(verified_checkpoint) => Ok(verified_checkpoint),
4110 None => Err(IotaError::UserInput {
4111 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4112 }),
4113 }
4114 }
4115
4116 #[instrument(level = "trace", skip_all)]
4117 pub fn get_verified_checkpoint_summary_by_digest(
4118 &self,
4119 digest: CheckpointDigest,
4120 ) -> IotaResult<VerifiedCheckpoint> {
4121 let verified_checkpoint = self
4122 .get_checkpoint_store()
4123 .get_checkpoint_by_digest(&digest)?;
4124 match verified_checkpoint {
4125 Some(verified_checkpoint) => Ok(verified_checkpoint),
4126 None => Err(IotaError::UserInput {
4127 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4128 }),
4129 }
4130 }
4131
4132 #[instrument(level = "trace", skip_all)]
4133 pub fn get_checkpoint_contents(
4134 &self,
4135 digest: CheckpointContentsDigest,
4136 ) -> IotaResult<CheckpointContents> {
4137 self.get_checkpoint_store()
4138 .get_checkpoint_contents(&digest)?
4139 .ok_or(IotaError::UserInput {
4140 error: UserInputError::CheckpointContentsNotFound(digest),
4141 })
4142 }
4143
4144 #[instrument(level = "trace", skip_all)]
4145 pub fn get_checkpoint_contents_by_sequence_number(
4146 &self,
4147 sequence_number: CheckpointSequenceNumber,
4148 ) -> IotaResult<CheckpointContents> {
4149 let verified_checkpoint = self
4150 .get_checkpoint_store()
4151 .get_checkpoint_by_sequence_number(sequence_number)?;
4152 match verified_checkpoint {
4153 Some(verified_checkpoint) => {
4154 let content_digest = verified_checkpoint.into_inner().content_digest;
4155 self.get_checkpoint_contents(content_digest)
4156 }
4157 None => Err(IotaError::UserInput {
4158 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4159 }),
4160 }
4161 }
4162
4163 #[instrument(level = "trace", skip_all)]
4164 pub async fn query_events(
4165 &self,
4166 kv_store: &Arc<TransactionKeyValueStore>,
4167 query: EventFilter,
4168 cursor: Option<EventID>,
4170 limit: usize,
4171 descending: bool,
4172 ) -> IotaResult<Vec<IotaEvent>> {
4173 let index_store = self.get_indexes()?;
4174
4175 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4177 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4178 IotaError::TransactionNotFound {
4179 digest: cursor.tx_digest,
4180 },
4181 )?;
4182 (tx_seq, cursor.event_seq as usize)
4183 } else if descending {
4184 (u64::MAX, usize::MAX)
4185 } else {
4186 (0, 0)
4187 };
4188
4189 let limit = limit + 1;
4190 let mut event_keys = match query {
4191 EventFilter::All(filters) => {
4192 if filters.is_empty() {
4193 index_store.all_events(tx_num, event_num, limit, descending)?
4194 } else {
4195 return Err(IotaError::UserInput {
4196 error: UserInputError::Unsupported(
4197 "This query type does not currently support filter combinations"
4198 .to_string(),
4199 ),
4200 });
4201 }
4202 }
4203 EventFilter::Transaction(digest) => {
4204 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4205 }
4206 EventFilter::MoveModule { package, module } => {
4207 let module_id = ModuleId::new(package.into(), module);
4208 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4209 }
4210 EventFilter::MoveEventType(struct_name) => index_store
4211 .events_by_move_event_struct_name(
4212 &struct_name,
4213 tx_num,
4214 event_num,
4215 limit,
4216 descending,
4217 )?,
4218 EventFilter::Sender(sender) => {
4219 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4220 }
4221 EventFilter::TimeRange {
4222 start_time,
4223 end_time,
4224 } => index_store
4225 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4226 EventFilter::MoveEventModule { package, module } => index_store
4227 .events_by_move_event_module(
4228 &ModuleId::new(package.into(), module),
4229 tx_num,
4230 event_num,
4231 limit,
4232 descending,
4233 )?,
4234 EventFilter::Package(_)
4236 | EventFilter::MoveEventField { .. }
4237 | EventFilter::Any(_)
4238 | EventFilter::And(_, _)
4239 | EventFilter::Or(_, _) => {
4240 return Err(IotaError::UserInput {
4241 error: UserInputError::Unsupported(
4242 "This query type is not supported by the full node.".to_string(),
4243 ),
4244 });
4245 }
4246 };
4247
4248 if cursor.is_some() {
4251 if !event_keys.is_empty() {
4252 event_keys.remove(0);
4253 }
4254 } else {
4255 event_keys.truncate(limit - 1);
4256 }
4257
4258 let transaction_digests = event_keys
4260 .iter()
4261 .map(|(_, digest, _, _)| *digest)
4262 .collect::<HashSet<_>>()
4263 .into_iter()
4264 .collect::<Vec<_>>();
4265
4266 let events = kv_store
4267 .multi_get_events_by_tx_digests(&transaction_digests)
4268 .await?;
4269
4270 let events_map: HashMap<_, _> =
4271 transaction_digests.iter().zip(events.into_iter()).collect();
4272
4273 let stored_events = event_keys
4274 .into_iter()
4275 .map(|k| {
4276 (
4277 k,
4278 events_map
4279 .get(&k.1)
4280 .expect("fetched digest is missing")
4281 .clone()
4282 .and_then(|e| e.data.get(k.2).cloned()),
4283 )
4284 })
4285 .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4286 event
4287 .map(|e| (e, tx_digest, event_seq, timestamp))
4288 .ok_or(IotaError::TransactionEventsNotFound { digest })
4289 })
4290 .collect::<Result<Vec<_>, _>>()?;
4291
4292 let epoch_store = self.load_epoch_store_one_call_per_task();
4293 let backing_store = self.get_backing_package_store().as_ref();
4294 let mut layout_resolver = epoch_store
4295 .executor()
4296 .type_layout_resolver(Box::new(backing_store));
4297 let mut events = vec![];
4298 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4299 events.push(IotaEvent::try_from(
4300 e.clone(),
4301 tx_digest,
4302 event_seq as u64,
4303 Some(timestamp),
4304 layout_resolver.get_annotated_layout(&e.type_)?,
4305 )?)
4306 }
4307 Ok(events)
4308 }
4309
4310 pub async fn insert_genesis_object(&self, object: Object) {
4311 self.get_reconfig_api()
4312 .try_insert_genesis_object(object)
4313 .expect("Cannot insert genesis object")
4314 }
4315
4316 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4317 futures::future::join_all(
4318 objects
4319 .iter()
4320 .map(|o| self.insert_genesis_object(o.clone())),
4321 )
4322 .await;
4323 }
4324
4325 #[instrument(level = "trace", skip_all)]
4327 pub fn get_transaction_status(
4328 &self,
4329 transaction_digest: &TransactionDigest,
4330 epoch_store: &Arc<AuthorityPerEpochStore>,
4331 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4332 if let Some(effects) =
4334 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4335 {
4336 if let Some(transaction) = self
4337 .get_transaction_cache_reader()
4338 .try_get_transaction_block(transaction_digest)?
4339 {
4340 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4341 let events = if let Some(digest) = effects.events_digest() {
4342 self.get_transaction_events(digest)?
4343 } else {
4344 TransactionEvents::default()
4345 };
4346 return Ok(Some((
4347 (*transaction).clone().into_message(),
4348 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4349 )));
4350 } else {
4351 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4356 }
4357 }
4358 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4359 self.metrics.tx_already_processed.inc();
4360 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4361 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4362 } else {
4363 Ok(None)
4364 }
4365 }
4366
4367 #[instrument(level = "trace", skip_all)]
4371 pub fn get_signed_effects_and_maybe_resign(
4372 &self,
4373 transaction_digest: &TransactionDigest,
4374 epoch_store: &Arc<AuthorityPerEpochStore>,
4375 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4376 let effects = self
4377 .get_transaction_cache_reader()
4378 .try_get_executed_effects(transaction_digest)?;
4379 match effects {
4380 Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4381 None => Ok(None),
4382 }
4383 }
4384
4385 #[instrument(level = "trace", skip_all)]
4386 pub(crate) fn sign_effects(
4387 &self,
4388 effects: TransactionEffects,
4389 epoch_store: &Arc<AuthorityPerEpochStore>,
4390 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4391 let tx_digest = *effects.transaction_digest();
4392 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4393 Some(sig) if sig.epoch == epoch_store.epoch() => {
4394 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4395 }
4396 _ => {
4397 debug!(
4420 ?tx_digest,
4421 epoch=?epoch_store.epoch(),
4422 "Re-signing the effects with the current epoch"
4423 );
4424
4425 let sig = AuthoritySignInfo::new(
4426 epoch_store.epoch(),
4427 &effects,
4428 Intent::iota_app(IntentScope::TransactionEffects),
4429 self.name,
4430 &*self.secret,
4431 );
4432
4433 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4434
4435 epoch_store.insert_effects_digest_and_signature(
4436 &tx_digest,
4437 effects.digest(),
4438 &sig,
4439 )?;
4440
4441 effects
4442 }
4443 };
4444
4445 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4446 signed_effects,
4447 ))
4448 }
4449
4450 #[instrument(level = "trace", skip_all)]
4452 fn fullnode_only_get_tx_coins_for_indexing(
4453 &self,
4454 inner_temporary_store: &InnerTemporaryStore,
4455 epoch_store: &Arc<AuthorityPerEpochStore>,
4456 ) -> Option<TxCoins> {
4457 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4458 return None;
4459 }
4460 let written_coin_objects = inner_temporary_store
4461 .written
4462 .iter()
4463 .filter_map(|(k, v)| {
4464 if v.is_coin() {
4465 Some((*k, v.clone()))
4466 } else {
4467 None
4468 }
4469 })
4470 .collect();
4471 let input_coin_objects = inner_temporary_store
4472 .input_objects
4473 .iter()
4474 .filter_map(|(k, v)| {
4475 if v.is_coin() {
4476 Some((*k, v.clone()))
4477 } else {
4478 None
4479 }
4480 })
4481 .collect::<ObjectMap>();
4482 Some((input_coin_objects, written_coin_objects))
4483 }
4484
4485 #[instrument(level = "trace", skip_all)]
4497 pub async fn get_transaction_lock(
4498 &self,
4499 object_ref: &ObjectRef,
4500 epoch_store: &AuthorityPerEpochStore,
4501 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4502 let lock_info = self
4503 .get_object_cache_reader()
4504 .try_get_lock(*object_ref, epoch_store)?;
4505 let lock_info = match lock_info {
4506 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4507 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4508 provided_obj_ref: *object_ref,
4509 current_version: locked_ref.1,
4510 }
4511 .into());
4512 }
4513 ObjectLockStatus::Initialized => {
4514 return Ok(None);
4515 }
4516 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4517 };
4518
4519 epoch_store.get_signed_transaction(&lock_info)
4520 }
4521
4522 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4523 self.get_object_cache_reader().try_get_objects(objects)
4524 }
4525
4526 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4528 self.try_get_objects(objects)
4529 .await
4530 .expect("storage access failed")
4531 }
4532
4533 pub async fn try_get_object_or_tombstone(
4534 &self,
4535 object_id: ObjectID,
4536 ) -> IotaResult<Option<ObjectRef>> {
4537 self.get_object_cache_reader()
4538 .try_get_latest_object_ref_or_tombstone(object_id)
4539 }
4540
4541 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4543 self.try_get_object_or_tombstone(object_id)
4544 .await
4545 .expect("storage access failed")
4546 }
4547
4548 pub fn set_override_protocol_upgrade_buffer_stake(
4558 &self,
4559 expected_epoch: EpochId,
4560 buffer_stake_bps: u64,
4561 ) -> IotaResult {
4562 let epoch_store = self.load_epoch_store_one_call_per_task();
4563 let actual_epoch = epoch_store.epoch();
4564 if actual_epoch != expected_epoch {
4565 return Err(IotaError::WrongEpoch {
4566 expected_epoch,
4567 actual_epoch,
4568 });
4569 }
4570
4571 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4572 }
4573
4574 pub fn clear_override_protocol_upgrade_buffer_stake(
4575 &self,
4576 expected_epoch: EpochId,
4577 ) -> IotaResult {
4578 let epoch_store = self.load_epoch_store_one_call_per_task();
4579 let actual_epoch = epoch_store.epoch();
4580 if actual_epoch != expected_epoch {
4581 return Err(IotaError::WrongEpoch {
4582 expected_epoch,
4583 actual_epoch,
4584 });
4585 }
4586
4587 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4588 }
4589
4590 pub async fn get_available_system_packages(
4594 &self,
4595 binary_config: &BinaryConfig,
4596 ) -> Vec<ObjectRef> {
4597 let mut results = vec![];
4598
4599 let system_packages = BuiltInFramework::iter_system_packages();
4600
4601 #[cfg(msim)]
4603 let extra_packages = framework_injection::get_extra_packages(self.name);
4604 #[cfg(msim)]
4605 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4606
4607 for system_package in system_packages {
4608 let modules = system_package.modules().to_vec();
4609 #[cfg(msim)]
4611 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4612 .unwrap_or(modules);
4613
4614 let Some(obj_ref) = iota_framework::compare_system_package(
4615 &self.get_object_store(),
4616 &system_package.id,
4617 &modules,
4618 system_package.dependencies.to_vec(),
4619 binary_config,
4620 )
4621 .await
4622 else {
4623 return vec![];
4624 };
4625 results.push(obj_ref);
4626 }
4627
4628 results
4629 }
4630
4631 async fn get_system_package_bytes(
4648 &self,
4649 system_packages: Vec<ObjectRef>,
4650 binary_config: &BinaryConfig,
4651 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4652 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4653 let objects = self.get_objects(&ids).await;
4654
4655 let mut res = Vec::with_capacity(system_packages.len());
4656 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4657 let prev_transaction = match object {
4658 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4659 info!("Framework {} does not need updating", system_package_ref.0);
4661 continue;
4662 }
4663
4664 Some(cur_object) => cur_object.previous_transaction,
4665 None => TransactionDigest::genesis_marker(),
4666 };
4667
4668 #[cfg(msim)]
4669 let SystemPackage {
4670 id: _,
4671 bytes,
4672 dependencies,
4673 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4674 .unwrap_or_else(|| {
4675 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4676 });
4677
4678 #[cfg(not(msim))]
4679 let SystemPackage {
4680 id: _,
4681 bytes,
4682 dependencies,
4683 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4684
4685 let modules: Vec<_> = bytes
4686 .iter()
4687 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4688 .collect();
4689
4690 let new_object = Object::new_system_package(
4691 &modules,
4692 system_package_ref.1,
4693 dependencies.clone(),
4694 prev_transaction,
4695 );
4696
4697 let new_ref = new_object.compute_object_reference();
4698 if new_ref != system_package_ref {
4699 error!(
4700 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4701 );
4702 return None;
4703 }
4704
4705 res.push((system_package_ref.1, bytes, dependencies));
4706 }
4707
4708 Some(res)
4709 }
4710
4711 fn is_protocol_version_supported_v1(
4715 proposed_protocol_version: ProtocolVersion,
4716 committee: &Committee,
4717 capabilities: Vec<AuthorityCapabilitiesV1>,
4718 mut buffer_stake_bps: u64,
4719 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4720 if buffer_stake_bps > 10000 {
4721 warn!("clamping buffer_stake_bps to 10000");
4722 buffer_stake_bps = 10000;
4723 }
4724
4725 let mut desired_upgrades: Vec<_> = capabilities
4728 .into_iter()
4729 .filter_map(|mut cap| {
4730 if cap.available_system_packages.is_empty() {
4732 return None;
4733 }
4734
4735 cap.available_system_packages.sort();
4736
4737 info!(
4738 "validator {:?} supports {:?} with system packages: {:?}",
4739 cap.authority.concise(),
4740 cap.supported_protocol_versions,
4741 cap.available_system_packages,
4742 );
4743
4744 cap.supported_protocol_versions
4748 .get_version_digest(proposed_protocol_version)
4749 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4750 })
4751 .collect();
4752
4753 desired_upgrades.sort();
4756 desired_upgrades
4757 .into_iter()
4758 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4759 .into_iter()
4760 .find_map(|((digest, packages), group)| {
4761 assert!(!packages.is_empty());
4763
4764 let mut stake_aggregator: StakeAggregator<(), true> =
4765 StakeAggregator::new(Arc::new(committee.clone()));
4766
4767 for (_, _, authority) in group {
4768 stake_aggregator.insert_generic(authority, ());
4769 }
4770
4771 let total_votes = stake_aggregator.total_votes();
4772 let quorum_threshold = committee.quorum_threshold();
4773 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4774
4775 info!(
4776 protocol_config_digest = ?digest,
4777 ?total_votes,
4778 ?quorum_threshold,
4779 ?buffer_stake_bps,
4780 ?effective_threshold,
4781 ?proposed_protocol_version,
4782 ?packages,
4783 "support for upgrade"
4784 );
4785
4786 let has_support = total_votes >= effective_threshold;
4787 has_support.then_some((proposed_protocol_version, digest, packages))
4788 })
4789 }
4790
4791 fn choose_protocol_version_and_system_packages_v1(
4795 current_protocol_version: ProtocolVersion,
4796 current_protocol_digest: Digest,
4797 committee: &Committee,
4798 capabilities: Vec<AuthorityCapabilitiesV1>,
4799 buffer_stake_bps: u64,
4800 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4801 let mut next_protocol_version = current_protocol_version;
4802 let mut system_packages = vec![];
4803 let mut protocol_version_digest = current_protocol_digest;
4804
4805 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4809 next_protocol_version + 1,
4810 committee,
4811 capabilities.clone(),
4812 buffer_stake_bps,
4813 ) {
4814 next_protocol_version = version;
4815 protocol_version_digest = digest;
4816 system_packages = packages;
4817 }
4818
4819 (
4820 next_protocol_version,
4821 protocol_version_digest,
4822 system_packages,
4823 )
4824 }
4825
4826 fn get_validators_supporting_protocol_version(
4831 target_protocol_version: ProtocolVersion,
4832 target_digest: Digest,
4833 active_validators: &[AuthorityPublicKey],
4834 capabilities: &[AuthorityCapabilitiesV1],
4835 ) -> Vec<u64> {
4836 let mut eligible_validators = Vec::new();
4837
4838 for capability in capabilities {
4839 if let Some(digest) = capability
4841 .supported_protocol_versions
4842 .get_version_digest(target_protocol_version)
4843 {
4844 if digest == target_digest {
4845 if let Some(index) = active_validators
4847 .iter()
4848 .position(|name| AuthorityName::from(name) == capability.authority)
4849 {
4850 eligible_validators.push(index as u64);
4851 }
4852 }
4853 }
4854 }
4855
4856 eligible_validators.sort();
4858 eligible_validators
4859 }
4860
4861 fn calculate_eligible_validators_weight(
4866 eligible_validator_indices: &[u64],
4867 active_validators: &[AuthorityPublicKey],
4868 committee: &Committee,
4869 ) -> u64 {
4870 let mut total_weight = 0u64;
4871
4872 for &index in eligible_validator_indices {
4873 let authority_pubkey = &active_validators[index as usize];
4874 if let Some((_, weight)) = committee
4876 .members()
4877 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4878 {
4879 total_weight += weight;
4880 }
4881 }
4882
4883 total_weight
4884 }
4885
4886 #[instrument(level = "debug", skip_all)]
4887 fn create_authenticator_state_tx(
4888 &self,
4889 epoch_store: &Arc<AuthorityPerEpochStore>,
4890 ) -> Option<EndOfEpochTransactionKind> {
4891 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4892 info!("authenticator state transactions not enabled");
4893 return None;
4894 }
4895
4896 let authenticator_state_exists = epoch_store.authenticator_state_exists();
4897 let tx = if authenticator_state_exists {
4898 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4899 let min_epoch =
4900 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4901 let authenticator_obj_initial_shared_version = epoch_store
4902 .epoch_start_config()
4903 .authenticator_obj_initial_shared_version()
4904 .expect("initial version must exist");
4905
4906 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4907 min_epoch,
4908 authenticator_obj_initial_shared_version,
4909 );
4910
4911 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4912
4913 tx
4914 } else {
4915 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4916 info!("Creating AuthenticatorStateCreate tx");
4917 tx
4918 };
4919 Some(tx)
4920 }
4921
4922 #[instrument(level = "error", skip_all)]
4935 pub async fn create_and_execute_advance_epoch_tx(
4936 &self,
4937 epoch_store: &Arc<AuthorityPerEpochStore>,
4938 gas_cost_summary: &GasCostSummary,
4939 checkpoint: CheckpointSequenceNumber,
4940 epoch_start_timestamp_ms: CheckpointTimestamp,
4941 scores: Vec<u64>,
4942 ) -> anyhow::Result<(
4943 IotaSystemState,
4944 Option<SystemEpochInfoEvent>,
4945 TransactionEffects,
4946 )> {
4947 let mut txns = Vec::new();
4948
4949 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4950 txns.push(tx);
4951 }
4952
4953 let next_epoch = epoch_store.epoch() + 1;
4954
4955 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4956 let authority_capabilities = epoch_store
4957 .get_capabilities_v1()
4958 .expect("read capabilities from db cannot fail");
4959 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4960 Self::choose_protocol_version_and_system_packages_v1(
4961 epoch_store.protocol_version(),
4962 SupportedProtocolVersionsWithHashes::protocol_config_digest(
4963 epoch_store.protocol_config(),
4964 ),
4965 epoch_store.committee(),
4966 authority_capabilities.clone(),
4967 buffer_stake_bps,
4968 );
4969
4970 let config = epoch_store.protocol_config();
4974 let binary_config = to_binary_config(config);
4975 let Some(next_epoch_system_package_bytes) = self
4976 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4977 .await
4978 else {
4979 error!(
4980 "upgraded system packages {:?} are not locally available, cannot create \
4981 ChangeEpochTx. validator binary must be upgraded to the correct version!",
4982 next_epoch_system_packages
4983 );
4984 bail!("missing system packages: cannot form ChangeEpochTx");
4994 };
4995
4996 if config.select_committee_from_eligible_validators() {
4999 let active_validators = epoch_store.epoch_start_state().get_active_validators();
5001
5002 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5003
5004 if config.select_committee_supporting_next_epoch_version() {
5008 eligible_active_validators = Self::get_validators_supporting_protocol_version(
5009 next_epoch_protocol_version,
5010 next_epoch_protocol_digest,
5011 &active_validators,
5012 &authority_capabilities,
5013 );
5014
5015 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5017 &eligible_active_validators,
5018 &active_validators,
5019 epoch_store.committee(),
5020 );
5021
5022 let committee = epoch_store.committee();
5026 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5027
5028 if eligible_validators_weight < effective_threshold {
5029 error!(
5030 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5031 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5032 );
5033 eligible_active_validators = (0..active_validators.len() as u64).collect();
5036 }
5037 }
5038
5039 if config.pass_validator_scores_to_advance_epoch() {
5042 txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5043 next_epoch,
5044 next_epoch_protocol_version,
5045 gas_cost_summary.storage_cost,
5046 gas_cost_summary.computation_cost,
5047 gas_cost_summary.computation_cost_burned,
5048 gas_cost_summary.storage_rebate,
5049 gas_cost_summary.non_refundable_storage_fee,
5050 epoch_start_timestamp_ms,
5051 next_epoch_system_package_bytes,
5052 eligible_active_validators,
5053 scores,
5054 config.adjust_rewards_by_score(),
5055 ));
5056 } else {
5057 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5058 next_epoch,
5059 next_epoch_protocol_version,
5060 gas_cost_summary.storage_cost,
5061 gas_cost_summary.computation_cost,
5062 gas_cost_summary.computation_cost_burned,
5063 gas_cost_summary.storage_rebate,
5064 gas_cost_summary.non_refundable_storage_fee,
5065 epoch_start_timestamp_ms,
5066 next_epoch_system_package_bytes,
5067 eligible_active_validators,
5068 ));
5069 }
5070 } else if config.protocol_defined_base_fee()
5071 && config.max_committee_members_count_as_option().is_some()
5072 {
5073 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5074 next_epoch,
5075 next_epoch_protocol_version,
5076 gas_cost_summary.storage_cost,
5077 gas_cost_summary.computation_cost,
5078 gas_cost_summary.computation_cost_burned,
5079 gas_cost_summary.storage_rebate,
5080 gas_cost_summary.non_refundable_storage_fee,
5081 epoch_start_timestamp_ms,
5082 next_epoch_system_package_bytes,
5083 ));
5084 } else {
5085 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5086 next_epoch,
5087 next_epoch_protocol_version,
5088 gas_cost_summary.storage_cost,
5089 gas_cost_summary.computation_cost,
5090 gas_cost_summary.storage_rebate,
5091 gas_cost_summary.non_refundable_storage_fee,
5092 epoch_start_timestamp_ms,
5093 next_epoch_system_package_bytes,
5094 ));
5095 }
5096
5097 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5098
5099 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5100 tx.clone(),
5101 epoch_store.epoch(),
5102 checkpoint,
5103 );
5104
5105 let tx_digest = executable_tx.digest();
5106
5107 info!(
5108 ?next_epoch,
5109 ?next_epoch_protocol_version,
5110 ?next_epoch_system_packages,
5111 computation_cost=?gas_cost_summary.computation_cost,
5112 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5113 storage_cost=?gas_cost_summary.storage_cost,
5114 storage_rebate=?gas_cost_summary.storage_rebate,
5115 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5116 ?tx_digest,
5117 "Creating advance epoch transaction"
5118 );
5119
5120 fail_point_async!("change_epoch_tx_delay");
5121 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5122
5123 if self
5127 .get_transaction_cache_reader()
5128 .try_is_tx_already_executed(tx_digest)?
5129 {
5130 warn!("change epoch tx has already been executed via state sync");
5131 bail!("change epoch tx has already been executed via state sync",);
5132 }
5133
5134 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5135
5136 epoch_store.assign_shared_object_versions_idempotent(
5140 self.get_object_cache_reader().as_ref(),
5141 std::slice::from_ref(&executable_tx),
5142 )?;
5143
5144 let (input_objects, _, _) =
5145 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5146
5147 let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5148 &execution_guard,
5149 &executable_tx,
5150 input_objects,
5151 None,
5152 None,
5153 epoch_store,
5154 )?;
5155 let system_obj = get_iota_system_state(&temporary_store.written)
5156 .expect("change epoch tx must write to system object");
5157 let system_epoch_info_event = temporary_store
5159 .events
5160 .data
5161 .into_iter()
5162 .find(|event| event.is_system_epoch_info_event())
5163 .map(SystemEpochInfoEvent::from);
5164 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5167
5168 self.get_state_sync_store()
5172 .try_insert_transaction_and_effects(&tx, &effects)
5173 .map_err(|err| {
5174 let err: anyhow::Error = err.into();
5175 err
5176 })?;
5177
5178 info!(
5179 "Effects summary of the change epoch transaction: {:?}",
5180 effects.summary_for_debug()
5181 );
5182 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5183 assert!(effects.status().is_ok());
5185 Ok((system_obj, system_epoch_info_event, effects))
5186 }
5187
5188 #[instrument(level = "error", skip_all)]
5192 async fn revert_uncommitted_epoch_transactions(
5193 &self,
5194 epoch_store: &AuthorityPerEpochStore,
5195 ) -> IotaResult {
5196 {
5197 let state = epoch_store.get_reconfig_state_write_lock_guard();
5198 if state.should_accept_user_certs() {
5199 epoch_store.close_user_certs(state);
5208 }
5209 }
5211 let pending_certificates = epoch_store.pending_consensus_certificates();
5212 info!(
5213 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5214 pending_certificates.len(),
5215 pending_certificates,
5216 );
5217 for digest in pending_certificates {
5218 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5219 info!(
5220 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5221 digest
5222 );
5223 continue;
5224 }
5225 info!("Reverting {:?} at the end of epoch", digest);
5226 epoch_store.revert_executed_transaction(&digest)?;
5227 self.get_reconfig_api().try_revert_state_update(&digest)?;
5228 }
5229 info!("All uncommitted local transactions reverted");
5230 Ok(())
5231 }
5232
5233 #[instrument(level = "error", skip_all)]
5234 async fn reopen_epoch_db(
5235 &self,
5236 cur_epoch_store: &AuthorityPerEpochStore,
5237 new_committee: Committee,
5238 epoch_start_configuration: EpochStartConfiguration,
5239 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5240 epoch_last_checkpoint: CheckpointSequenceNumber,
5241 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5242 let new_epoch = new_committee.epoch;
5243 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5244 assert_eq!(
5245 epoch_start_configuration.epoch_start_state().epoch(),
5246 new_committee.epoch
5247 );
5248 fail_point!("before-open-new-epoch-store");
5249 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5250 self.name,
5251 new_committee,
5252 epoch_start_configuration,
5253 self.get_backing_package_store().clone(),
5254 self.get_object_store().clone(),
5255 expensive_safety_check_config,
5256 epoch_last_checkpoint,
5257 )?;
5258 self.epoch_store.store(new_epoch_store.clone());
5259 Ok(new_epoch_store)
5260 }
5261
5262 fn check_move_account(
5265 &self,
5266 auth_account_object_id: ObjectID,
5267 auth_account_object_seq_number: Option<SequenceNumber>,
5268 auth_account_object_digest: Option<ObjectDigest>,
5269 account_object: ObjectReadResult,
5270 signer: &IotaAddress,
5271 ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5272 let account_object = match account_object.object {
5273 ObjectReadResultKind::Object(object) => Ok(object),
5274 ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5275 Err(UserInputError::AccountObjectDeleted {
5276 account_id: account_object.id(),
5277 account_version: version,
5278 transaction_digest: digest,
5279 })
5280 }
5281 ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5284 Err(UserInputError::AccountObjectInCanceledTransaction {
5285 account_id: account_object.id(),
5286 account_version: version,
5287 })
5288 }
5289 }?;
5290
5291 let account_object_addr = IotaAddress::from(auth_account_object_id);
5292
5293 fp_ensure!(
5294 signer == &account_object_addr,
5295 UserInputError::IncorrectUserSignature {
5296 error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5297 }
5298 .into()
5299 );
5300
5301 fp_ensure!(
5302 account_object.is_shared() || account_object.is_immutable(),
5303 UserInputError::AccountObjectNotSupported {
5304 object_id: auth_account_object_id
5305 }
5306 .into()
5307 );
5308
5309 let auth_account_object_seq_number =
5310 if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5311 let account_object_version = account_object.version();
5312
5313 fp_ensure!(
5314 account_object_version == auth_account_object_seq_number,
5315 UserInputError::AccountObjectVersionMismatch {
5316 object_id: auth_account_object_id,
5317 expected_version: auth_account_object_seq_number,
5318 actual_version: account_object_version,
5319 }
5320 .into()
5321 );
5322
5323 auth_account_object_seq_number
5324 } else {
5325 account_object.version()
5326 };
5327
5328 if let Some(auth_account_object_digest) = auth_account_object_digest {
5329 let expected_digest = account_object.digest();
5330 fp_ensure!(
5331 expected_digest == auth_account_object_digest,
5332 UserInputError::InvalidAccountObjectDigest {
5333 object_id: auth_account_object_id,
5334 expected_digest,
5335 actual_digest: auth_account_object_digest,
5336 }
5337 .into()
5338 );
5339 }
5340
5341 let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5342 auth_account_object_id,
5343 &AuthenticatorFunctionRefV1Key::tag().into(),
5344 &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5345 )
5346 .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5347 account_object_id: auth_account_object_id,
5348 })?;
5349
5350 let authenticator_function_ref_field = self
5351 .get_object_cache_reader()
5352 .try_find_object_lt_or_eq_version(
5353 authenticator_function_ref_field_id,
5354 auth_account_object_seq_number,
5355 )?;
5356
5357 if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5358 let field_move_object = authenticator_function_ref_field_obj
5359 .data
5360 .try_as_move()
5361 .expect("dynamic field should never be a package object");
5362
5363 let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5364 field_move_object.to_rust().ok_or(
5365 UserInputError::InvalidAuthenticatorFunctionRefField {
5366 account_object_id: auth_account_object_id,
5367 },
5368 )?;
5369
5370 Ok(AuthenticatorFunctionRefForExecution::new_v1(
5371 field.value,
5372 authenticator_function_ref_field_obj.compute_object_reference(),
5373 authenticator_function_ref_field_obj.owner,
5374 authenticator_function_ref_field_obj.storage_rebate,
5375 authenticator_function_ref_field_obj.previous_transaction,
5376 ))
5377 } else {
5378 Err(UserInputError::MoveAuthenticatorNotFound {
5379 authenticator_function_ref_id: authenticator_function_ref_field_id,
5380 account_object_id: auth_account_object_id,
5381 account_object_version: auth_account_object_seq_number,
5382 }
5383 .into())
5384 }
5385 }
5386
5387 fn read_objects_for_signing(
5388 &self,
5389 transaction: &VerifiedTransaction,
5390 epoch: u64,
5391 ) -> IotaResult<(
5392 InputObjects,
5393 ReceivingObjects,
5394 Option<InputObjects>,
5395 Option<ObjectReadResult>,
5396 )> {
5397 let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5398 Some(transaction.digest()),
5399 &transaction.collect_all_input_object_kind_for_reading()?,
5400 &transaction.data().transaction_data().receiving_objects(),
5401 epoch,
5402 )?;
5403
5404 transaction
5405 .split_input_objects_into_groups_for_reading(input_objects)
5406 .map(|(tx_input_objects, auth_input_objects, account_object)| {
5407 (
5408 tx_input_objects,
5409 tx_receiving_objects,
5410 auth_input_objects,
5411 account_object,
5412 )
5413 })
5414 }
5415
5416 fn check_transaction_inputs_for_signing(
5417 &self,
5418 protocol_config: &ProtocolConfig,
5419 reference_gas_price: u64,
5420 tx_data: &TransactionData,
5421 tx_input_objects: InputObjects,
5422 tx_receiving_objects: &ReceivingObjects,
5423 move_authenticator: Option<&MoveAuthenticator>,
5424 auth_input_objects: Option<InputObjects>,
5425 account_object: Option<ObjectReadResult>,
5426 ) -> IotaResult<(
5427 IotaGasStatus,
5428 CheckedInputObjects,
5429 Option<CheckedInputObjects>,
5430 Option<AuthenticatorFunctionRef>,
5431 )> {
5432 let (
5433 auth_checked_input_objects_union,
5434 authenticator_function_ref,
5435 authenticator_gas_budget,
5436 ) = if let Some(move_authenticator) = move_authenticator {
5437 let auth_input_objects =
5438 auth_input_objects.expect("MoveAuthenticator input objects must be provided");
5439 let account_object = account_object.expect("Move account object must be provided");
5440
5441 let (
5443 auth_account_object_id,
5444 auth_account_object_seq_number,
5445 auth_account_object_digest,
5446 ) = move_authenticator.object_to_authenticate_components()?;
5447
5448 let AuthenticatorFunctionRefForExecution {
5450 authenticator_function_ref,
5451 ..
5452 } = self.check_move_account(
5453 auth_account_object_id,
5454 auth_account_object_seq_number,
5455 auth_account_object_digest,
5456 account_object,
5457 &tx_data.sender(),
5458 )?;
5459
5460 let auth_checked_input_objects =
5462 iota_transaction_checks::check_move_authenticator_input_for_signing(
5463 auth_input_objects,
5464 )?;
5465
5466 let authenticator_gas_budget = protocol_config.max_auth_gas();
5469
5470 (
5471 Some(auth_checked_input_objects),
5472 Some(authenticator_function_ref),
5473 authenticator_gas_budget,
5474 )
5475 } else {
5476 (None, None, 0)
5477 };
5478
5479 let (gas_status, tx_checked_input_objects) =
5481 iota_transaction_checks::check_transaction_input(
5482 protocol_config,
5483 reference_gas_price,
5484 tx_data,
5485 tx_input_objects,
5486 tx_receiving_objects,
5487 &self.metrics.bytecode_verifier_metrics,
5488 &self.config.verifier_signing_config,
5489 authenticator_gas_budget,
5490 )?;
5491
5492 Ok((
5493 gas_status,
5494 tx_checked_input_objects,
5495 auth_checked_input_objects_union,
5496 authenticator_function_ref,
5497 ))
5498 }
5499
5500 #[cfg(test)]
5501 pub(crate) fn iter_live_object_set_for_testing(
5502 &self,
5503 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5504 self.get_accumulator_store()
5505 .iter_cached_live_object_set_for_testing()
5506 }
5507
5508 #[cfg(test)]
5509 pub(crate) fn shutdown_execution_for_test(&self) {
5510 self.tx_execution_shutdown
5511 .lock()
5512 .take()
5513 .unwrap()
5514 .send(())
5515 .unwrap();
5516 }
5517
5518 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5521 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5522 self.get_object_cache_reader()
5523 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5524 self.get_reconfig_api()
5525 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5526 }
5527}
5528
5529pub struct RandomnessRoundReceiver {
5530 authority_state: Arc<AuthorityState>,
5531 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5532}
5533
5534impl RandomnessRoundReceiver {
5535 pub fn spawn(
5536 authority_state: Arc<AuthorityState>,
5537 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5538 ) -> JoinHandle<()> {
5539 let rrr = RandomnessRoundReceiver {
5540 authority_state,
5541 randomness_rx,
5542 };
5543 spawn_monitored_task!(rrr.run())
5544 }
5545
5546 async fn run(mut self) {
5547 info!("RandomnessRoundReceiver event loop started");
5548
5549 loop {
5550 tokio::select! {
5551 maybe_recv = self.randomness_rx.recv() => {
5552 if let Some((epoch, round, bytes)) = maybe_recv {
5553 self.handle_new_randomness(epoch, round, bytes);
5554 } else {
5555 break;
5556 }
5557 },
5558 }
5559 }
5560
5561 info!("RandomnessRoundReceiver event loop ended");
5562 }
5563
5564 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5565 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5566 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5567 if epoch_store.epoch() != epoch {
5568 warn!(
5569 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5570 epoch_store.epoch()
5571 );
5572 return;
5573 }
5574 let transaction = VerifiedTransaction::new_randomness_state_update(
5575 epoch,
5576 round,
5577 bytes,
5578 epoch_store
5579 .epoch_start_config()
5580 .randomness_obj_initial_shared_version(),
5581 );
5582 debug!(
5583 "created randomness state update transaction with digest: {:?}",
5584 transaction.digest()
5585 );
5586 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5587 let digest = *transaction.digest();
5588
5589 self.authority_state
5594 .get_cache_commit()
5595 .persist_transaction(&transaction);
5596
5597 self.authority_state
5599 .transaction_manager()
5600 .enqueue(vec![transaction], &epoch_store);
5601
5602 let authority_state = self.authority_state.clone();
5603 spawn_monitored_task!(async move {
5604 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5613 let result = tokio::time::timeout(
5614 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5615 authority_state
5616 .get_transaction_cache_reader()
5617 .try_notify_read_executed_effects(&[digest]),
5618 )
5619 .await;
5620 let result = match result {
5621 Ok(result) => result,
5622 Err(_) => {
5623 if cfg!(debug_assertions) {
5624 panic!(
5626 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5627 );
5628 }
5629 warn!(
5630 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5631 );
5632 authority_state
5634 .get_transaction_cache_reader()
5635 .try_notify_read_executed_effects(&[digest])
5636 .await
5637 }
5638 };
5639
5640 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5641 let effects = effects.pop().expect("should return effects");
5642 if *effects.status() != ExecutionStatus::Success {
5643 fatal!(
5644 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5645 );
5646 }
5647 debug!(
5648 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5649 );
5650 });
5651 }
5652}
5653
5654#[async_trait]
5655impl TransactionKeyValueStoreTrait for AuthorityState {
5656 async fn multi_get(
5657 &self,
5658 transaction_keys: &[TransactionDigest],
5659 effects_keys: &[TransactionDigest],
5660 ) -> IotaResult<KVStoreTransactionData> {
5661 let txns = if !transaction_keys.is_empty() {
5662 self.get_transaction_cache_reader()
5663 .try_multi_get_transaction_blocks(transaction_keys)?
5664 .into_iter()
5665 .map(|t| t.map(|t| (*t).clone().into_inner()))
5666 .collect()
5667 } else {
5668 vec![]
5669 };
5670
5671 let fx = if !effects_keys.is_empty() {
5672 self.get_transaction_cache_reader()
5673 .try_multi_get_executed_effects(effects_keys)?
5674 } else {
5675 vec![]
5676 };
5677
5678 Ok((txns, fx))
5679 }
5680
5681 async fn multi_get_checkpoints(
5682 &self,
5683 checkpoint_summaries: &[CheckpointSequenceNumber],
5684 checkpoint_contents: &[CheckpointSequenceNumber],
5685 checkpoint_summaries_by_digest: &[CheckpointDigest],
5686 ) -> IotaResult<(
5687 Vec<Option<CertifiedCheckpointSummary>>,
5688 Vec<Option<CheckpointContents>>,
5689 Vec<Option<CertifiedCheckpointSummary>>,
5690 )> {
5691 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5693 let store = self.get_checkpoint_store();
5694 for seq in checkpoint_summaries {
5695 let checkpoint = store
5696 .get_checkpoint_by_sequence_number(*seq)?
5697 .map(|c| c.into_inner());
5698
5699 summaries.push(checkpoint);
5700 }
5701
5702 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5703 for seq in checkpoint_contents {
5704 let checkpoint = store
5705 .get_checkpoint_by_sequence_number(*seq)?
5706 .and_then(|summary| {
5707 store
5708 .get_checkpoint_contents(&summary.content_digest)
5709 .expect("db read cannot fail")
5710 });
5711 contents.push(checkpoint);
5712 }
5713
5714 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5715 for digest in checkpoint_summaries_by_digest {
5716 let checkpoint = store
5717 .get_checkpoint_by_digest(digest)?
5718 .map(|c| c.into_inner());
5719 summaries_by_digest.push(checkpoint);
5720 }
5721
5722 Ok((summaries, contents, summaries_by_digest))
5723 }
5724
5725 async fn get_transaction_perpetual_checkpoint(
5726 &self,
5727 digest: TransactionDigest,
5728 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5729 self.get_checkpoint_cache()
5730 .try_get_transaction_perpetual_checkpoint(&digest)
5731 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5732 }
5733
5734 async fn get_object(
5735 &self,
5736 object_id: ObjectID,
5737 version: VersionNumber,
5738 ) -> IotaResult<Option<Object>> {
5739 self.get_object_cache_reader()
5740 .try_get_object_by_key(&object_id, version)
5741 }
5742
5743 async fn multi_get_transactions_perpetual_checkpoints(
5744 &self,
5745 digests: &[TransactionDigest],
5746 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5747 let res = self
5748 .get_checkpoint_cache()
5749 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5750
5751 Ok(res
5752 .into_iter()
5753 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5754 .collect())
5755 }
5756
5757 #[instrument(skip(self))]
5758 async fn multi_get_events_by_tx_digests(
5759 &self,
5760 digests: &[TransactionDigest],
5761 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5762 if digests.is_empty() {
5763 return Ok(vec![]);
5764 }
5765 let events_digests: Vec<_> = self
5766 .get_transaction_cache_reader()
5767 .try_multi_get_executed_effects(digests)?
5768 .into_iter()
5769 .map(|t| t.and_then(|t| t.events_digest().cloned()))
5770 .collect();
5771 let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5772 let mut events = self
5773 .get_transaction_cache_reader()
5774 .try_multi_get_events(&non_empty_events)?
5775 .into_iter();
5776 Ok(events_digests
5777 .into_iter()
5778 .map(|ev| ev.and_then(|_| events.next()?))
5779 .collect())
5780 }
5781}
5782
5783#[cfg(msim)]
5784pub mod framework_injection {
5785 use std::{
5786 cell::RefCell,
5787 collections::{BTreeMap, BTreeSet},
5788 };
5789
5790 use iota_framework::{BuiltInFramework, SystemPackage};
5791 use iota_types::{
5792 base_types::{AuthorityName, ObjectID},
5793 is_system_package,
5794 };
5795 use move_binary_format::CompiledModule;
5796
5797 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5798
5799 thread_local! {
5801 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5802 }
5803
5804 type Framework = Vec<CompiledModule>;
5805
5806 pub type PackageUpgradeCallback =
5807 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5808
5809 enum PackageOverrideConfig {
5810 Global(Framework),
5811 PerValidator(PackageUpgradeCallback),
5812 }
5813
5814 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5815 modules
5816 .iter()
5817 .map(|m| {
5818 let mut buf = Vec::new();
5819 m.serialize_with_version(m.version, &mut buf).unwrap();
5820 buf
5821 })
5822 .collect()
5823 }
5824
5825 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5826 OVERRIDE.with(|bs| {
5827 bs.borrow_mut()
5828 .insert(package_id, PackageOverrideConfig::Global(modules))
5829 });
5830 }
5831
5832 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5833 OVERRIDE.with(|bs| {
5834 bs.borrow_mut()
5835 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5836 });
5837 }
5838
5839 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5840 OVERRIDE.with(|cfg| {
5841 cfg.borrow().get(package_id).and_then(|entry| match entry {
5842 PackageOverrideConfig::Global(framework) => {
5843 Some(compiled_modules_to_bytes(framework))
5844 }
5845 PackageOverrideConfig::PerValidator(func) => {
5846 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5847 }
5848 })
5849 })
5850 }
5851
5852 pub fn get_override_modules(
5853 package_id: &ObjectID,
5854 name: AuthorityName,
5855 ) -> Option<Vec<CompiledModule>> {
5856 OVERRIDE.with(|cfg| {
5857 cfg.borrow().get(package_id).and_then(|entry| match entry {
5858 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5859 PackageOverrideConfig::PerValidator(func) => func(name),
5860 })
5861 })
5862 }
5863
5864 pub fn get_override_system_package(
5865 package_id: &ObjectID,
5866 name: AuthorityName,
5867 ) -> Option<SystemPackage> {
5868 let bytes = get_override_bytes(package_id, name)?;
5869 let dependencies = if is_system_package(*package_id) {
5870 BuiltInFramework::get_package_by_id(package_id)
5871 .dependencies
5872 .to_vec()
5873 } else {
5874 BuiltInFramework::all_package_ids()
5877 };
5878 Some(SystemPackage {
5879 id: *package_id,
5880 bytes,
5881 dependencies,
5882 })
5883 }
5884
5885 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5886 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5887 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5888 cfg.borrow()
5889 .keys()
5890 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5891 .collect()
5892 });
5893
5894 extra
5895 .into_iter()
5896 .map(|package| SystemPackage {
5897 id: package,
5898 bytes: get_override_bytes(&package, name).unwrap(),
5899 dependencies: BuiltInFramework::all_package_ids(),
5900 })
5901 .collect()
5902 }
5903}
5904
5905#[derive(Debug, Serialize, Deserialize, Clone)]
5906pub struct ObjDumpFormat {
5907 pub id: ObjectID,
5908 pub version: VersionNumber,
5909 pub digest: ObjectDigest,
5910 pub object: Object,
5911}
5912
5913impl ObjDumpFormat {
5914 fn new(object: Object) -> Self {
5915 let oref = object.compute_object_reference();
5916 Self {
5917 id: oref.0,
5918 version: oref.1,
5919 digest: oref.2,
5920 object,
5921 }
5922 }
5923}
5924
5925#[derive(Debug, Serialize, Deserialize, Clone)]
5926pub struct NodeStateDump {
5927 pub tx_digest: TransactionDigest,
5928 pub sender_signed_data: SenderSignedData,
5929 pub executed_epoch: u64,
5930 pub reference_gas_price: u64,
5931 pub protocol_version: u64,
5932 pub epoch_start_timestamp_ms: u64,
5933 pub computed_effects: TransactionEffects,
5934 pub expected_effects_digest: TransactionEffectsDigest,
5935 pub relevant_system_packages: Vec<ObjDumpFormat>,
5936 pub shared_objects: Vec<ObjDumpFormat>,
5937 pub loaded_child_objects: Vec<ObjDumpFormat>,
5938 pub modified_at_versions: Vec<ObjDumpFormat>,
5939 pub runtime_reads: Vec<ObjDumpFormat>,
5940 pub input_objects: Vec<ObjDumpFormat>,
5941}
5942
5943impl NodeStateDump {
5944 pub fn new(
5945 tx_digest: &TransactionDigest,
5946 effects: &TransactionEffects,
5947 expected_effects_digest: TransactionEffectsDigest,
5948 object_store: &dyn ObjectStore,
5949 epoch_store: &Arc<AuthorityPerEpochStore>,
5950 inner_temporary_store: &InnerTemporaryStore,
5951 certificate: &VerifiedExecutableTransaction,
5952 ) -> IotaResult<Self> {
5953 let executed_epoch = epoch_store.epoch();
5955 let reference_gas_price = epoch_store.reference_gas_price();
5956 let epoch_start_config = epoch_store.epoch_start_config();
5957 let protocol_version = epoch_store.protocol_version().as_u64();
5958 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5959
5960 let mut relevant_system_packages = Vec::new();
5962 for sys_package_id in BuiltInFramework::all_package_ids() {
5963 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5964 relevant_system_packages.push(ObjDumpFormat::new(w))
5965 }
5966 }
5967
5968 let mut shared_objects = Vec::new();
5970 for kind in effects.input_shared_objects() {
5971 match kind {
5972 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5973 if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5974 shared_objects.push(ObjDumpFormat::new(w))
5975 }
5976 }
5977 InputSharedObject::ReadDeleted(..)
5978 | InputSharedObject::MutateDeleted(..)
5979 | InputSharedObject::Cancelled(..) => (), }
5982 }
5983
5984 let mut loaded_child_objects = Vec::new();
5987 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5988 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5989 loaded_child_objects.push(ObjDumpFormat::new(w))
5990 }
5991 }
5992
5993 let mut modified_at_versions = Vec::new();
5995 for (id, ver) in effects.modified_at_versions() {
5996 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5997 modified_at_versions.push(ObjDumpFormat::new(w))
5998 }
5999 }
6000
6001 let mut runtime_reads = Vec::new();
6005 for obj in inner_temporary_store
6006 .runtime_packages_loaded_from_db
6007 .values()
6008 {
6009 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6010 }
6011
6012 Ok(Self {
6015 tx_digest: *tx_digest,
6016 executed_epoch,
6017 reference_gas_price,
6018 epoch_start_timestamp_ms,
6019 protocol_version,
6020 relevant_system_packages,
6021 shared_objects,
6022 loaded_child_objects,
6023 modified_at_versions,
6024 runtime_reads,
6025 sender_signed_data: certificate.clone().into_message(),
6026 input_objects: inner_temporary_store
6027 .input_objects
6028 .values()
6029 .map(|o| ObjDumpFormat::new(o.clone()))
6030 .collect(),
6031 computed_effects: effects.clone(),
6032 expected_effects_digest,
6033 })
6034 }
6035
6036 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6037 let mut objects = Vec::new();
6038 objects.extend(self.relevant_system_packages.clone());
6039 objects.extend(self.shared_objects.clone());
6040 objects.extend(self.loaded_child_objects.clone());
6041 objects.extend(self.modified_at_versions.clone());
6042 objects.extend(self.runtime_reads.clone());
6043 objects.extend(self.input_objects.clone());
6044 objects
6045 }
6046
6047 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6048 let file_name = format!(
6049 "{}_{}_NODE_DUMP.json",
6050 self.tx_digest,
6051 AuthorityState::unixtime_now_ms()
6052 );
6053 let mut path = path.to_path_buf();
6054 path.push(&file_name);
6055 let mut file = File::create(path.clone())?;
6056 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6057 Ok(path)
6058 }
6059
6060 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6061 let file = File::open(path)?;
6062 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6063 }
6064}