1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49 key_value_store::{
50 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51 },
52 key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57 IOTA_SYSTEM_ADDRESS, TypeTag,
58 account_abstraction::{
59 account::AuthenticatorFunctionRefV1Key,
60 authenticator_function::{
61 AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62 AuthenticatorFunctionRefV1,
63 },
64 },
65 authenticator_state::get_authenticator_state,
66 base_types::*,
67 committee::{Committee, EpochId, ProtocolVersion},
68 crypto::{
69 AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70 default_hash,
71 },
72 deny_list_v1::check_coin_deny_list_v1_during_signing,
73 digests::{ChainIdentifier, Digest},
74 dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75 effects::{
76 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77 TransactionEvents, VerifiedSignedTransactionEffects,
78 },
79 error::{ExecutionError, IotaError, IotaResult, UserInputError},
80 event::{Event, EventID, SystemEpochInfoEvent},
81 executable_transaction::VerifiedExecutableTransaction,
82 execution_config_utils::to_binary_config,
83 execution_status::ExecutionStatus,
84 fp_ensure,
85 gas::{GasCostSummary, IotaGasStatus},
86 gas_coin::NANOS_PER_IOTA,
87 inner_temporary_store::{
88 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89 WrittenObjects,
90 },
91 iota_system_state::{
92 IotaSystemState, IotaSystemStateTrait,
93 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94 },
95 is_system_package,
96 layout_resolver::{LayoutResolver, into_struct_layout},
97 message_envelope::Message,
98 messages_checkpoint::{
99 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103 },
104 messages_consensus::AuthorityCapabilitiesV1,
105 messages_grpc::{
106 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108 TransactionStatus,
109 },
110 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111 move_authenticator::MoveAuthenticator,
112 object::{
113 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114 bounded_visitor::BoundedVisitor,
115 },
116 storage::{
117 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118 },
119 supported_protocol_versions::{
120 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121 },
122 transaction::*,
123 transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131 register_histogram_vec_with_registry, register_histogram_with_registry,
132 register_int_counter_vec_with_registry, register_int_counter_with_registry,
133 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139 task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152 authority::{
153 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157 authority_store_tables::AuthorityPrunerTables,
158 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159 },
160 authority_client::NetworkAuthorityClient,
161 checkpoints::CheckpointStore,
162 congestion_tracker::CongestionTracker,
163 consensus_adapter::ConsensusAdapter,
164 epoch::committee_store::CommitteeStore,
165 execution_cache::{
166 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168 TransactionCacheRead,
169 },
170 execution_driver::execution_process,
171 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
172 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
173 metrics::{LatencyObserver, RateTracker},
174 module_cache_metrics::ResolverMetrics,
175 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
176 stake_aggregator::StakeAggregator,
177 state_accumulator::{AccumulatorStore, StateAccumulator},
178 subscription_handler::SubscriptionHandler,
179 transaction_input_loader::TransactionInputLoader,
180 transaction_manager::TransactionManager,
181 transaction_outputs::TransactionOutputs,
182 validator_tx_finalizer::ValidatorTxFinalizer,
183 verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223mod authority_store_migrations;
224pub mod authority_store_pruner;
225pub mod authority_store_tables;
226pub mod authority_store_types;
227pub mod epoch_start_configuration;
228pub mod shared_object_congestion_tracker;
229pub mod shared_object_version_manager;
230pub mod suggested_gas_price_calculator;
231pub mod test_authority_builder;
232pub mod transaction_deferral;
233
234pub(crate) mod authority_store;
235pub mod backpressure;
236
237pub struct AuthorityMetrics {
239 tx_orders: IntCounter,
240 total_certs: IntCounter,
241 total_cert_attempts: IntCounter,
242 total_effects: IntCounter,
243 pub shared_obj_tx: IntCounter,
244 sponsored_tx: IntCounter,
245 tx_already_processed: IntCounter,
246 num_input_objs: Histogram,
247 num_shared_objects: Histogram,
248 batch_size: Histogram,
249
250 authority_state_handle_transaction_latency: Histogram,
251
252 execute_certificate_latency_single_writer: Histogram,
253 execute_certificate_latency_shared_object: Histogram,
254
255 internal_execution_latency: Histogram,
256 execution_load_input_objects_latency: Histogram,
257 prepare_certificate_latency: Histogram,
258 commit_certificate_latency: Histogram,
259 db_checkpoint_latency: Histogram,
260
261 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
262 pub(crate) transaction_manager_num_missing_objects: IntGauge,
263 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
264 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
265 pub(crate) transaction_manager_num_ready: IntGauge,
266 pub(crate) transaction_manager_object_cache_size: IntGauge,
267 pub(crate) transaction_manager_object_cache_hits: IntCounter,
268 pub(crate) transaction_manager_object_cache_misses: IntCounter,
269 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
270 pub(crate) transaction_manager_package_cache_size: IntGauge,
271 pub(crate) transaction_manager_package_cache_hits: IntCounter,
272 pub(crate) transaction_manager_package_cache_misses: IntCounter,
273 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
274 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
275
276 pub(crate) execution_driver_executed_transactions: IntCounter,
277 pub(crate) execution_driver_dispatch_queue: IntGauge,
278 pub(crate) execution_queueing_delay_s: Histogram,
279 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
280 pub(crate) execution_gas_latency_ratio: Histogram,
281
282 pub(crate) skipped_consensus_txns: IntCounter,
283 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
284
285 pub(crate) authority_overload_status: IntGauge,
286 pub(crate) authority_load_shedding_percentage: IntGauge,
287
288 pub(crate) transaction_overload_sources: IntCounterVec,
289
290 post_processing_total_events_emitted: IntCounter,
292 post_processing_total_tx_indexed: IntCounter,
293 post_processing_total_tx_had_event_processed: IntCounter,
294 post_processing_total_failures: IntCounter,
295
296 pub consensus_handler_processed: IntCounterVec,
298 pub consensus_handler_transaction_sizes: HistogramVec,
299 pub consensus_handler_num_low_scoring_authorities: IntGauge,
300 pub consensus_handler_scores: IntGaugeVec,
301 pub consensus_handler_deferred_transactions: IntCounter,
302 pub consensus_handler_congested_transactions: IntCounter,
303 pub consensus_handler_cancelled_transactions: IntCounter,
304 pub consensus_handler_max_object_costs: IntGaugeVec,
305 pub consensus_committed_subdags: IntCounterVec,
306 pub consensus_committed_messages: IntGaugeVec,
307 pub consensus_committed_user_transactions: IntGaugeVec,
308 pub consensus_handler_leader_round: IntGauge,
309 pub consensus_calculated_throughput: IntGauge,
310 pub consensus_calculated_throughput_profile: IntGauge,
311
312 pub limits_metrics: Arc<LimitsMetrics>,
313
314 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
316
317 pub zklogin_sig_count: IntCounter,
319 pub multisig_sig_count: IntCounter,
321
322 pub execution_queueing_latency: LatencyObserver,
325
326 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
333
334 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
337}
338
339const POSITIVE_INT_BUCKETS: &[f64] = &[
341 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
342 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
343 7000000., 10000000.,
344];
345
346const LATENCY_SEC_BUCKETS: &[f64] = &[
347 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
348 10., 20., 30., 60., 90.,
349];
350
351const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
353 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
354 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
355];
356
357const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
358 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
359 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
360];
361
362pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
366 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
367 let execute_certificate_latency = register_histogram_vec_with_registry!(
368 "authority_state_execute_certificate_latency",
369 "Latency of executing certificates, including waiting for inputs",
370 &["tx_type"],
371 LATENCY_SEC_BUCKETS.to_vec(),
372 registry,
373 )
374 .unwrap();
375
376 let execute_certificate_latency_single_writer =
377 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
378 let execute_certificate_latency_shared_object =
379 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
380
381 Self {
382 tx_orders: register_int_counter_with_registry!(
383 "total_transaction_orders",
384 "Total number of transaction orders",
385 registry,
386 )
387 .unwrap(),
388 total_certs: register_int_counter_with_registry!(
389 "total_transaction_certificates",
390 "Total number of transaction certificates handled",
391 registry,
392 )
393 .unwrap(),
394 total_cert_attempts: register_int_counter_with_registry!(
395 "total_handle_certificate_attempts",
396 "Number of calls to handle_certificate",
397 registry,
398 )
399 .unwrap(),
400 total_effects: register_int_counter_with_registry!(
402 "total_transaction_effects",
403 "Total number of transaction effects produced",
404 registry,
405 )
406 .unwrap(),
407
408 shared_obj_tx: register_int_counter_with_registry!(
409 "num_shared_obj_tx",
410 "Number of transactions involving shared objects",
411 registry,
412 )
413 .unwrap(),
414
415 sponsored_tx: register_int_counter_with_registry!(
416 "num_sponsored_tx",
417 "Number of sponsored transactions",
418 registry,
419 )
420 .unwrap(),
421
422 tx_already_processed: register_int_counter_with_registry!(
423 "num_tx_already_processed",
424 "Number of transaction orders already processed previously",
425 registry,
426 )
427 .unwrap(),
428 num_input_objs: register_histogram_with_registry!(
429 "num_input_objects",
430 "Distribution of number of input TX objects per TX",
431 POSITIVE_INT_BUCKETS.to_vec(),
432 registry,
433 )
434 .unwrap(),
435 num_shared_objects: register_histogram_with_registry!(
436 "num_shared_objects",
437 "Number of shared input objects per TX",
438 POSITIVE_INT_BUCKETS.to_vec(),
439 registry,
440 )
441 .unwrap(),
442 batch_size: register_histogram_with_registry!(
443 "batch_size",
444 "Distribution of size of transaction batch",
445 POSITIVE_INT_BUCKETS.to_vec(),
446 registry,
447 )
448 .unwrap(),
449 authority_state_handle_transaction_latency: register_histogram_with_registry!(
450 "authority_state_handle_transaction_latency",
451 "Latency of handling transactions",
452 LATENCY_SEC_BUCKETS.to_vec(),
453 registry,
454 )
455 .unwrap(),
456 execute_certificate_latency_single_writer,
457 execute_certificate_latency_shared_object,
458 internal_execution_latency: register_histogram_with_registry!(
459 "authority_state_internal_execution_latency",
460 "Latency of actual certificate executions",
461 LATENCY_SEC_BUCKETS.to_vec(),
462 registry,
463 )
464 .unwrap(),
465 execution_load_input_objects_latency: register_histogram_with_registry!(
466 "authority_state_execution_load_input_objects_latency",
467 "Latency of loading input objects for execution",
468 LOW_LATENCY_SEC_BUCKETS.to_vec(),
469 registry,
470 )
471 .unwrap(),
472 prepare_certificate_latency: register_histogram_with_registry!(
473 "authority_state_prepare_certificate_latency",
474 "Latency of executing certificates, before committing the results",
475 LATENCY_SEC_BUCKETS.to_vec(),
476 registry,
477 )
478 .unwrap(),
479 commit_certificate_latency: register_histogram_with_registry!(
480 "authority_state_commit_certificate_latency",
481 "Latency of committing certificate execution results",
482 LATENCY_SEC_BUCKETS.to_vec(),
483 registry,
484 )
485 .unwrap(),
486 db_checkpoint_latency: register_histogram_with_registry!(
487 "db_checkpoint_latency",
488 "Latency of checkpointing dbs",
489 LATENCY_SEC_BUCKETS.to_vec(),
490 registry,
491 ).unwrap(),
492 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
493 "transaction_manager_num_enqueued_certificates",
494 "Current number of certificates enqueued to TransactionManager",
495 &["result"],
496 registry,
497 )
498 .unwrap(),
499 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
500 "transaction_manager_num_missing_objects",
501 "Current number of missing objects in TransactionManager",
502 registry,
503 )
504 .unwrap(),
505 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
506 "transaction_manager_num_pending_certificates",
507 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
508 registry,
509 )
510 .unwrap(),
511 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
512 "transaction_manager_num_executing_certificates",
513 "Number of executing certificates, including queued and actually running certificates",
514 registry,
515 )
516 .unwrap(),
517 transaction_manager_num_ready: register_int_gauge_with_registry!(
518 "transaction_manager_num_ready",
519 "Number of ready transactions in TransactionManager",
520 registry,
521 )
522 .unwrap(),
523 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
524 "transaction_manager_object_cache_size",
525 "Current size of object-availability cache in TransactionManager",
526 registry,
527 )
528 .unwrap(),
529 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
530 "transaction_manager_object_cache_hits",
531 "Number of object-availability cache hits in TransactionManager",
532 registry,
533 )
534 .unwrap(),
535 authority_overload_status: register_int_gauge_with_registry!(
536 "authority_overload_status",
537 "Whether authority is current experiencing overload and enters load shedding mode.",
538 registry)
539 .unwrap(),
540 authority_load_shedding_percentage: register_int_gauge_with_registry!(
541 "authority_load_shedding_percentage",
542 "The percentage of transactions is shed when the authority is in load shedding mode.",
543 registry)
544 .unwrap(),
545 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
546 "transaction_manager_object_cache_misses",
547 "Number of object-availability cache misses in TransactionManager",
548 registry,
549 )
550 .unwrap(),
551 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
552 "transaction_manager_object_cache_evictions",
553 "Number of object-availability cache evictions in TransactionManager",
554 registry,
555 )
556 .unwrap(),
557 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
558 "transaction_manager_package_cache_size",
559 "Current size of package-availability cache in TransactionManager",
560 registry,
561 )
562 .unwrap(),
563 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
564 "transaction_manager_package_cache_hits",
565 "Number of package-availability cache hits in TransactionManager",
566 registry,
567 )
568 .unwrap(),
569 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
570 "transaction_manager_package_cache_misses",
571 "Number of package-availability cache misses in TransactionManager",
572 registry,
573 )
574 .unwrap(),
575 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
576 "transaction_manager_package_cache_evictions",
577 "Number of package-availability cache evictions in TransactionManager",
578 registry,
579 )
580 .unwrap(),
581 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
582 "transaction_manager_transaction_queue_age_s",
583 "Time spent in waiting for transaction in the queue",
584 LATENCY_SEC_BUCKETS.to_vec(),
585 registry,
586 )
587 .unwrap(),
588 transaction_overload_sources: register_int_counter_vec_with_registry!(
589 "transaction_overload_sources",
590 "Number of times each source indicates transaction overload.",
591 &["source"],
592 registry)
593 .unwrap(),
594 execution_driver_executed_transactions: register_int_counter_with_registry!(
595 "execution_driver_executed_transactions",
596 "Cumulative number of transaction executed by execution driver",
597 registry,
598 )
599 .unwrap(),
600 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
601 "execution_driver_dispatch_queue",
602 "Number of transaction pending in execution driver dispatch queue",
603 registry,
604 )
605 .unwrap(),
606 execution_queueing_delay_s: register_histogram_with_registry!(
607 "execution_queueing_delay_s",
608 "Queueing delay between a transaction is ready for execution until it starts executing.",
609 LATENCY_SEC_BUCKETS.to_vec(),
610 registry
611 )
612 .unwrap(),
613 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
614 "prepare_cert_gas_latency_ratio",
615 "The ratio of computation gas divided by VM execution latency.",
616 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
617 registry
618 )
619 .unwrap(),
620 execution_gas_latency_ratio: register_histogram_with_registry!(
621 "execution_gas_latency_ratio",
622 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
623 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
624 registry
625 )
626 .unwrap(),
627 skipped_consensus_txns: register_int_counter_with_registry!(
628 "skipped_consensus_txns",
629 "Total number of consensus transactions skipped",
630 registry,
631 )
632 .unwrap(),
633 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
634 "skipped_consensus_txns_cache_hit",
635 "Total number of consensus transactions skipped because of local cache hit",
636 registry,
637 )
638 .unwrap(),
639 post_processing_total_events_emitted: register_int_counter_with_registry!(
640 "post_processing_total_events_emitted",
641 "Total number of events emitted in post processing",
642 registry,
643 )
644 .unwrap(),
645 post_processing_total_tx_indexed: register_int_counter_with_registry!(
646 "post_processing_total_tx_indexed",
647 "Total number of txes indexed in post processing",
648 registry,
649 )
650 .unwrap(),
651 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
652 "post_processing_total_tx_had_event_processed",
653 "Total number of txes finished event processing in post processing",
654 registry,
655 )
656 .unwrap(),
657 post_processing_total_failures: register_int_counter_with_registry!(
658 "post_processing_total_failures",
659 "Total number of failure in post processing",
660 registry,
661 )
662 .unwrap(),
663 consensus_handler_processed: register_int_counter_vec_with_registry!(
664 "consensus_handler_processed",
665 "Number of transactions processed by consensus handler",
666 &["class"],
667 registry
668 ).unwrap(),
669 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
670 "consensus_handler_transaction_sizes",
671 "Sizes of each type of transactions processed by consensus handler",
672 &["class"],
673 POSITIVE_INT_BUCKETS.to_vec(),
674 registry
675 ).unwrap(),
676 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
677 "consensus_handler_num_low_scoring_authorities",
678 "Number of low scoring authorities based on reputation scores from consensus",
679 registry
680 ).unwrap(),
681 consensus_handler_scores: register_int_gauge_vec_with_registry!(
682 "consensus_handler_scores",
683 "scores from consensus for each authority",
684 &["authority"],
685 registry,
686 ).unwrap(),
687 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
688 "consensus_handler_deferred_transactions",
689 "Number of transactions deferred by consensus handler",
690 registry,
691 ).unwrap(),
692 consensus_handler_congested_transactions: register_int_counter_with_registry!(
693 "consensus_handler_congested_transactions",
694 "Number of transactions deferred by consensus handler due to congestion",
695 registry,
696 ).unwrap(),
697 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
698 "consensus_handler_cancelled_transactions",
699 "Number of transactions cancelled by consensus handler",
700 registry,
701 ).unwrap(),
702 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
703 "consensus_handler_max_congestion_control_object_costs",
704 "Max object costs for congestion control in the current consensus commit",
705 &["commit_type"],
706 registry,
707 ).unwrap(),
708 consensus_committed_subdags: register_int_counter_vec_with_registry!(
709 "consensus_committed_subdags",
710 "Number of committed subdags, sliced by leader",
711 &["authority"],
712 registry,
713 ).unwrap(),
714 consensus_committed_messages: register_int_gauge_vec_with_registry!(
715 "consensus_committed_messages",
716 "Total number of committed consensus messages, sliced by author",
717 &["authority"],
718 registry,
719 ).unwrap(),
720 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
721 "consensus_committed_user_transactions",
722 "Number of committed user transactions, sliced by submitter",
723 &["authority"],
724 registry,
725 ).unwrap(),
726 consensus_handler_leader_round: register_int_gauge_with_registry!(
727 "consensus_handler_leader_round",
728 "The leader round of the current consensus output being processed in the consensus handler",
729 registry,
730 ).unwrap(),
731 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
732 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
733 zklogin_sig_count: register_int_counter_with_registry!(
734 "zklogin_sig_count",
735 "Count of zkLogin signatures",
736 registry,
737 )
738 .unwrap(),
739 multisig_sig_count: register_int_counter_with_registry!(
740 "multisig_sig_count",
741 "Count of zkLogin signatures",
742 registry,
743 )
744 .unwrap(),
745 consensus_calculated_throughput: register_int_gauge_with_registry!(
746 "consensus_calculated_throughput",
747 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
748 registry,
749 ).unwrap(),
750 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
751 "consensus_calculated_throughput_profile",
752 "The current active calculated throughput profile",
753 registry
754 ).unwrap(),
755 execution_queueing_latency: LatencyObserver::new(),
756 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
758 }
759 }
760
761 pub fn reset_on_reconfigure(&self) {
765 self.consensus_committed_messages.reset();
766 self.consensus_handler_scores.reset();
767 self.consensus_committed_user_transactions.reset();
768 }
769}
770
771pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
778
779pub struct AuthorityState {
780 pub name: AuthorityName,
783 pub secret: StableSyncAuthoritySigner,
785
786 input_loader: TransactionInputLoader,
788 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
789
790 epoch_store: ArcSwap<AuthorityPerEpochStore>,
791
792 execution_lock: RwLock<EpochId>,
798
799 pub indexes: Option<Arc<IndexStore>>,
800 pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
801
802 pub subscription_handler: Arc<SubscriptionHandler>,
803 pub checkpoint_store: Arc<CheckpointStore>,
804
805 committee_store: Arc<CommitteeStore>,
806
807 transaction_manager: Arc<TransactionManager>,
809
810 #[cfg_attr(not(test), expect(unused))]
812 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
813
814 pub metrics: Arc<AuthorityMetrics>,
815 _pruner: AuthorityStorePruner,
816 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
817
818 db_checkpoint_config: DBCheckpointConfig,
820
821 pub config: NodeConfig,
822
823 pub overload_info: AuthorityOverloadInfo,
825
826 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
827
828 chain_identifier: ChainIdentifier,
831
832 pub(crate) congestion_tracker: Arc<CongestionTracker>,
833}
834
835impl AuthorityState {
844 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
845 epoch_store.committee().authority_exists(&self.name)
846 }
847
848 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
849 epoch_store
850 .active_validators()
851 .iter()
852 .any(|a| AuthorityName::from(a) == self.name)
853 }
854
855 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
856 !self.is_committee_validator(epoch_store)
857 }
858
859 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
860 &self.committee_store
861 }
862
863 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
864 self.committee_store.clone()
865 }
866
867 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
868 &self.config.authority_overload_config
869 }
870
871 pub fn get_epoch_state_commitments(
872 &self,
873 epoch: EpochId,
874 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
875 self.checkpoint_store.get_epoch_state_commitments(epoch)
876 }
877
878 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
882 async fn handle_transaction_impl(
883 &self,
884 transaction: VerifiedTransaction,
885 epoch_store: &Arc<AuthorityPerEpochStore>,
886 ) -> IotaResult<VerifiedSignedTransaction> {
887 let _execution_lock = self.execution_lock_for_signing()?;
889
890 let protocol_config = epoch_store.protocol_config();
891 let reference_gas_price = epoch_store.reference_gas_price();
892
893 let epoch = epoch_store.epoch();
894
895 let tx_data = transaction.data().transaction_data();
896
897 iota_transaction_checks::deny::check_transaction_for_signing(
901 tx_data,
902 transaction.tx_signatures(),
903 &transaction.input_objects()?,
904 &tx_data.receiving_objects(),
905 &self.config.transaction_deny_config,
906 self.get_backing_package_store().as_ref(),
907 )?;
908
909 let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
913 self.read_objects_for_signing(&transaction, epoch)?;
914
915 let move_authenticators = transaction.move_authenticators();
917
918 let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
924 .check_transaction_inputs_for_signing(
925 protocol_config,
926 reference_gas_price,
927 tx_data,
928 tx_input_objects,
929 &tx_receiving_objects,
930 &move_authenticators,
931 per_authenticator_inputs,
932 )?;
933
934 let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
937 .iter()
938 .map(|i| &i.0)
939 .collect();
940
941 check_coin_deny_list_v1_during_signing(
945 tx_data.sender(),
946 &tx_checked_input_objects,
947 &tx_receiving_objects,
948 &per_authenticator_checked_input_objects,
949 &self.get_object_store(),
950 )?;
951
952 if !move_authenticators.is_empty() {
955 let aggregated_authenticator_input_objects =
956 iota_transaction_checks::aggregate_authenticator_input_objects(
957 &per_authenticator_checked_input_objects,
958 )?;
959
960 debug_assert_eq!(
961 move_authenticators.len(),
962 per_authenticator_checked_inputs.len(),
963 "Move authenticators amount must match the number of checked authenticator inputs"
964 );
965
966 let move_authenticators = move_authenticators
967 .into_iter()
968 .zip(per_authenticator_checked_inputs)
969 .map(
970 |(
971 move_authenticator,
972 (authenticator_checked_input_objects, authenticator_function_ref),
973 )| {
974 (
975 move_authenticator.to_owned(),
976 authenticator_function_ref,
977 authenticator_checked_input_objects,
978 )
979 },
980 )
981 .collect();
982
983 let tx_data_bytes =
988 bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
989
990 let (kind, signer, gas_data) = tx_data.execution_parts();
991
992 let validation_result = epoch_store.executor().authenticate_transaction(
994 self.get_backing_store().as_ref(),
995 protocol_config,
996 self.metrics.limits_metrics.clone(),
997 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
998 epoch_store
999 .epoch_start_config()
1000 .epoch_data()
1001 .epoch_start_timestamp(),
1002 gas_data,
1003 gas_status,
1004 move_authenticators,
1005 aggregated_authenticator_input_objects,
1006 kind,
1007 signer,
1008 transaction.digest().to_owned(),
1009 tx_data_bytes,
1010 &mut None,
1011 );
1012
1013 if let Err(validation_error) = validation_result {
1014 return Err(IotaError::MoveAuthenticatorExecutionFailure {
1015 error: validation_error.to_string(),
1016 });
1017 }
1018 }
1019
1020 let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
1021
1022 let signed_transaction =
1023 VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1024
1025 self.get_cache_writer().try_acquire_transaction_locks(
1030 epoch_store,
1031 &owned_objects,
1032 signed_transaction.clone(),
1033 )?;
1034
1035 Ok(signed_transaction)
1036 }
1037
1038 #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()))]
1040 pub async fn handle_transaction(
1041 &self,
1042 epoch_store: &Arc<AuthorityPerEpochStore>,
1043 transaction: VerifiedTransaction,
1044 ) -> IotaResult<HandleTransactionResponse> {
1045 let tx_digest = *transaction.digest();
1046 debug!("handle_transaction");
1047
1048 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1050 return Ok(HandleTransactionResponse { status });
1051 }
1052
1053 let _metrics_guard = self
1054 .metrics
1055 .authority_state_handle_transaction_latency
1056 .start_timer();
1057 self.metrics.tx_orders.inc();
1058
1059 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1060 match signed {
1061 Ok(s) => {
1062 if self.is_committee_validator(epoch_store) {
1063 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1064 let tx = s.clone();
1065 let validator_tx_finalizer = validator_tx_finalizer.clone();
1066 let cache_reader = self.get_transaction_cache_reader().clone();
1067 let epoch_store = epoch_store.clone();
1068 spawn_monitored_task!(epoch_store.within_alive_epoch(
1069 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1070 ));
1071 }
1072 }
1073 Ok(HandleTransactionResponse {
1074 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1075 })
1076 }
1077 Err(err) => Ok(HandleTransactionResponse {
1081 status: self
1082 .get_transaction_status(&tx_digest, epoch_store)?
1083 .ok_or(err)?
1084 .1,
1085 }),
1086 }
1087 }
1088
1089 pub fn check_system_overload_at_signing(&self) -> bool {
1090 self.config
1091 .authority_overload_config
1092 .check_system_overload_at_signing
1093 }
1094
1095 pub fn check_system_overload_at_execution(&self) -> bool {
1096 self.config
1097 .authority_overload_config
1098 .check_system_overload_at_execution
1099 }
1100
1101 pub(crate) fn check_system_overload(
1102 &self,
1103 consensus_adapter: &Arc<ConsensusAdapter>,
1104 tx_data: &SenderSignedData,
1105 do_authority_overload_check: bool,
1106 ) -> IotaResult {
1107 if do_authority_overload_check {
1108 self.check_authority_overload(tx_data).tap_err(|_| {
1109 self.update_overload_metrics("execution_queue");
1110 })?;
1111 }
1112 self.transaction_manager
1113 .check_execution_overload(self.overload_config(), tx_data)
1114 .tap_err(|_| {
1115 self.update_overload_metrics("execution_pending");
1116 })?;
1117 consensus_adapter.check_consensus_overload().tap_err(|_| {
1118 self.update_overload_metrics("consensus");
1119 })?;
1120
1121 let pending_tx_count = self
1122 .get_cache_commit()
1123 .approximate_pending_transaction_count();
1124 if pending_tx_count
1125 > self
1126 .config
1127 .execution_cache_config
1128 .writeback_cache
1129 .backpressure_threshold_for_rpc()
1130 {
1131 return Err(IotaError::ValidatorOverloadedRetryAfter {
1132 retry_after_secs: 10,
1133 });
1134 }
1135
1136 Ok(())
1137 }
1138
1139 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1140 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1141 return Ok(());
1142 }
1143
1144 let load_shedding_percentage = self
1145 .overload_info
1146 .load_shedding_percentage
1147 .load(Ordering::Relaxed);
1148 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1149 }
1150
1151 fn update_overload_metrics(&self, source: &str) {
1152 self.metrics
1153 .transaction_overload_sources
1154 .with_label_values(&[source])
1155 .inc();
1156 }
1157
1158 #[instrument(level = "trace", skip_all)]
1160 pub async fn execute_certificate(
1161 &self,
1162 certificate: &VerifiedCertificate,
1163 epoch_store: &Arc<AuthorityPerEpochStore>,
1164 ) -> IotaResult<TransactionEffects> {
1165 let _metrics_guard = if certificate.contains_shared_object() {
1166 self.metrics
1167 .execute_certificate_latency_shared_object
1168 .start_timer()
1169 } else {
1170 self.metrics
1171 .execute_certificate_latency_single_writer
1172 .start_timer()
1173 };
1174 trace!("execute_certificate");
1175
1176 self.metrics.total_cert_attempts.inc();
1177
1178 if !certificate.contains_shared_object() {
1179 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1184 }
1185
1186 epoch_store
1189 .within_alive_epoch(self.notify_read_effects(certificate))
1190 .await
1191 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1192 .and_then(|r| r)
1193 }
1194
1195 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1210 pub fn try_execute_immediately(
1211 &self,
1212 certificate: &VerifiedExecutableTransaction,
1213 expected_effects_digest: Option<TransactionEffectsDigest>,
1214 epoch_store: &Arc<AuthorityPerEpochStore>,
1215 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1216 let _scope = monitored_scope("Execution::try_execute_immediately");
1217 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1218
1219 let tx_digest = certificate.digest();
1220
1221 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1223
1224 if let Some(effects) = self
1227 .get_transaction_cache_reader()
1228 .try_get_executed_effects(tx_digest)?
1229 {
1230 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1231 assert_eq!(
1232 effects.digest(),
1233 expected_effects_digest_inner,
1234 "Unexpected effects digest for transaction {tx_digest:?}"
1235 );
1236 }
1237 tx_guard.release();
1238 return Ok((effects, None));
1239 }
1240
1241 let (tx_input_objects, per_authenticator_inputs) =
1242 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1243
1244 let expected_effects_digest =
1250 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1251
1252 self.process_certificate(
1253 tx_guard,
1254 certificate,
1255 tx_input_objects,
1256 per_authenticator_inputs,
1257 expected_effects_digest,
1258 epoch_store,
1259 )
1260 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1261 .tap_ok(
1262 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1263 )
1264 }
1265
1266 pub fn read_objects_for_execution(
1267 &self,
1268 tx_lock: &CertLockGuard,
1269 certificate: &VerifiedExecutableTransaction,
1270 epoch_store: &Arc<AuthorityPerEpochStore>,
1271 ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1272 let _scope = monitored_scope("Execution::load_input_objects");
1273 let _metrics_guard = self
1274 .metrics
1275 .execution_load_input_objects_latency
1276 .start_timer();
1277
1278 let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1279
1280 let input_objects = self.input_loader.read_objects_for_execution(
1281 epoch_store,
1282 &certificate.key(),
1283 tx_lock,
1284 &input_objects,
1285 epoch_store.epoch(),
1286 )?;
1287
1288 certificate.split_input_objects_into_groups_for_reading(input_objects)
1289 }
1290
1291 pub fn try_execute_for_test(
1295 &self,
1296 certificate: &VerifiedCertificate,
1297 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1298 let epoch_store = self.epoch_store_for_testing();
1299 let (effects, execution_error_opt) = self.try_execute_immediately(
1300 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1301 None,
1302 &epoch_store,
1303 )?;
1304 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1305 Ok((signed_effects, execution_error_opt))
1306 }
1307
1308 pub fn execute_for_test(
1310 &self,
1311 certificate: &VerifiedCertificate,
1312 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1313 self.try_execute_for_test(certificate)
1314 .expect("try_execute_for_test should not fail")
1315 }
1316
1317 pub async fn notify_read_effects(
1318 &self,
1319 certificate: &VerifiedCertificate,
1320 ) -> IotaResult<TransactionEffects> {
1321 self.get_transaction_cache_reader()
1322 .try_notify_read_executed_effects(&[*certificate.digest()])
1323 .await
1324 .map(|mut r| r.pop().expect("must return correct number of effects"))
1325 }
1326
1327 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1328 self.get_object_cache_reader()
1329 .try_check_owned_objects_are_live(owned_object_refs)
1330 }
1331
1332 pub(crate) fn debug_dump_transaction_state(
1337 &self,
1338 tx_digest: &TransactionDigest,
1339 effects: &TransactionEffects,
1340 expected_effects_digest: TransactionEffectsDigest,
1341 inner_temporary_store: &InnerTemporaryStore,
1342 certificate: &VerifiedExecutableTransaction,
1343 debug_dump_config: &StateDebugDumpConfig,
1344 ) -> IotaResult<PathBuf> {
1345 let dump_dir = debug_dump_config
1348 .dump_file_directory
1349 .as_ref()
1350 .cloned()
1351 .unwrap_or(std::env::temp_dir());
1352 let epoch_store = self.load_epoch_store_one_call_per_task();
1353
1354 NodeStateDump::new(
1355 tx_digest,
1356 effects,
1357 expected_effects_digest,
1358 self.get_object_store().as_ref(),
1359 &epoch_store,
1360 inner_temporary_store,
1361 certificate,
1362 )?
1363 .write_to_file(&dump_dir)
1364 .map_err(|e| IotaError::FileIO(e.to_string()))
1365 }
1366
1367 #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?certificate.digest(), sender = ?certificate.data().transaction_data().gas_owner().to_string()))]
1368 pub(crate) fn process_certificate(
1369 &self,
1370 tx_guard: CertTxGuard,
1371 certificate: &VerifiedExecutableTransaction,
1372 tx_input_objects: InputObjects,
1373 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1374 expected_effects_digest: Option<TransactionEffectsDigest>,
1375 epoch_store: &Arc<AuthorityPerEpochStore>,
1376 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1377 let process_certificate_start_time = tokio::time::Instant::now();
1378 let digest = *certificate.digest();
1379
1380 let _scope = monitored_scope("Execution::process_certificate");
1381
1382 fail_point_if!("correlated-crash-process-certificate", || {
1383 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1384 iota_simulator::task::kill_current_node(None);
1385 }
1386 });
1387
1388 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1389 let execution_guard = match execution_guard {
1395 Ok(execution_guard) => execution_guard,
1396 Err(err) => {
1397 tx_guard.release();
1398 return Err(err);
1399 }
1400 };
1401 if *execution_guard != epoch_store.epoch() {
1405 tx_guard.release();
1406 info!("The epoch of the execution_guard doesn't match the epoch store");
1407 return Err(IotaError::WrongEpoch {
1408 expected_epoch: epoch_store.epoch(),
1409 actual_epoch: *execution_guard,
1410 });
1411 }
1412
1413 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1419 &execution_guard,
1420 certificate,
1421 tx_input_objects,
1422 per_authenticator_inputs,
1423 epoch_store,
1424 ) {
1425 Err(e) => {
1426 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1427 tx_guard.release();
1428 return Err(e);
1429 }
1430 Ok(res) => res,
1431 };
1432
1433 if let Some(expected_effects_digest) = expected_effects_digest {
1434 if effects.digest() != expected_effects_digest {
1435 match self.debug_dump_transaction_state(
1437 &digest,
1438 &effects,
1439 expected_effects_digest,
1440 &inner_temporary_store,
1441 certificate,
1442 &self.config.state_debug_dump_config,
1443 ) {
1444 Ok(out_path) => {
1445 info!(
1446 "Dumped node state for transaction {} to {}",
1447 digest,
1448 out_path.as_path().display().to_string()
1449 );
1450 }
1451 Err(e) => {
1452 error!("Error dumping state for transaction {}: {e}", digest);
1453 }
1454 }
1455 error!(
1456 tx_digest = ?digest,
1457 ?expected_effects_digest,
1458 actual_effects = ?effects,
1459 "fork detected!"
1460 );
1461 panic!(
1462 "Transaction {} is expected to have effects digest {}, but got {}!",
1463 digest,
1464 expected_effects_digest,
1465 effects.digest(),
1466 );
1467 }
1468 }
1469
1470 fail_point!("crash");
1471
1472 self.commit_certificate(
1473 certificate,
1474 inner_temporary_store,
1475 &effects,
1476 tx_guard,
1477 execution_guard,
1478 epoch_store,
1479 )?;
1480
1481 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1482 certificate.data().transaction_data().kind()
1483 {
1484 if let Some(err) = &execution_error_opt {
1485 debug_fatal!("Authenticator state update failed: {:?}", err);
1486 }
1487 epoch_store.update_authenticator_state(auth_state);
1488
1489 if cfg!(debug_assertions) {
1492 let authenticator_state = get_authenticator_state(self.get_object_store())
1493 .expect("Read cannot fail")
1494 .expect("Authenticator state must exist");
1495
1496 let mut sys_jwks: Vec<_> = authenticator_state
1497 .active_jwks
1498 .into_iter()
1499 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1500 .collect();
1501 let mut active_jwks: Vec<_> = epoch_store
1502 .signature_verifier
1503 .get_jwks()
1504 .into_iter()
1505 .collect();
1506 sys_jwks.sort();
1507 active_jwks.sort();
1508
1509 assert_eq!(sys_jwks, active_jwks);
1510 }
1511 }
1512
1513 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1514 if elapsed > 0.0 {
1515 self.metrics
1516 .execution_gas_latency_ratio
1517 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1518 };
1519 Ok((effects, execution_error_opt))
1520 }
1521
1522 #[instrument(level = "trace", skip_all)]
1523 fn commit_certificate(
1524 &self,
1525 certificate: &VerifiedExecutableTransaction,
1526 inner_temporary_store: InnerTemporaryStore,
1527 effects: &TransactionEffects,
1528 tx_guard: CertTxGuard,
1529 _execution_guard: ExecutionLockReadGuard<'_>,
1530 epoch_store: &Arc<AuthorityPerEpochStore>,
1531 ) -> IotaResult {
1532 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1533 monitored_scope("Execution::commit_certificate");
1534 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1535
1536 let tx_key = certificate.key();
1537 let tx_digest = certificate.digest();
1538 let input_object_count = inner_temporary_store.input_objects.len();
1539 let shared_object_count = effects.input_shared_objects().len();
1540
1541 let output_keys = inner_temporary_store.get_output_keys(effects);
1542
1543 let _ = self
1545 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1546 .tap_err(|e| {
1547 self.metrics.post_processing_total_failures.inc();
1548 error!(?tx_digest, "tx post processing failed: {e}");
1549 });
1550
1551 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1555
1556 fail_point!("crash");
1558
1559 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1560 certificate.clone().into_unsigned(),
1561 effects.clone(),
1562 inner_temporary_store,
1563 );
1564 self.get_cache_writer()
1565 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1566
1567 if certificate.transaction_data().is_end_of_epoch_tx() {
1568 self.get_object_cache_reader()
1571 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1572 }
1573
1574 tx_guard.commit_tx();
1576
1577 self.transaction_manager
1581 .notify_commit(tx_digest, output_keys, epoch_store);
1582
1583 self.update_metrics(certificate, input_object_count, shared_object_count);
1584
1585 Ok(())
1586 }
1587
1588 fn update_metrics(
1589 &self,
1590 certificate: &VerifiedExecutableTransaction,
1591 input_object_count: usize,
1592 shared_object_count: usize,
1593 ) {
1594 if certificate.has_zklogin_sig() {
1596 self.metrics.zklogin_sig_count.inc();
1597 } else if certificate.has_upgraded_multisig() {
1598 self.metrics.multisig_sig_count.inc();
1599 }
1600
1601 self.metrics.total_effects.inc();
1602 self.metrics.total_certs.inc();
1603
1604 if shared_object_count > 0 {
1605 self.metrics.shared_obj_tx.inc();
1606 }
1607
1608 if certificate.is_sponsored_tx() {
1609 self.metrics.sponsored_tx.inc();
1610 }
1611
1612 self.metrics
1613 .num_input_objs
1614 .observe(input_object_count as f64);
1615 self.metrics
1616 .num_shared_objects
1617 .observe(shared_object_count as f64);
1618 self.metrics.batch_size.observe(
1619 certificate
1620 .data()
1621 .intent_message()
1622 .value
1623 .kind()
1624 .num_commands() as f64,
1625 );
1626 }
1627
1628 #[instrument(level = "trace", skip_all)]
1640 fn prepare_certificate(
1641 &self,
1642 _execution_guard: &ExecutionLockReadGuard<'_>,
1643 certificate: &VerifiedExecutableTransaction,
1644 tx_input_objects: InputObjects,
1645 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1646 epoch_store: &Arc<AuthorityPerEpochStore>,
1647 ) -> IotaResult<(
1648 InnerTemporaryStore,
1649 TransactionEffects,
1650 Option<ExecutionError>,
1651 )> {
1652 let _scope = monitored_scope("Execution::prepare_certificate");
1653 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1654 let prepare_certificate_start_time = tokio::time::Instant::now();
1655
1656 let protocol_config = epoch_store.protocol_config();
1657
1658 let reference_gas_price = epoch_store.reference_gas_price();
1659
1660 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1661 let epoch_start_timestamp = epoch_store
1662 .epoch_start_config()
1663 .epoch_data()
1664 .epoch_start_timestamp();
1665
1666 let backing_store = self.get_backing_store().as_ref();
1667
1668 let tx_digest = *certificate.digest();
1669
1670 let tx_data = certificate.data().transaction_data();
1673 tx_data.validity_check(protocol_config)?;
1674
1675 let (kind, signer, gas_data) = tx_data.execution_parts();
1676
1677 let move_authenticators = certificate.move_authenticators();
1678
1679 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1680 let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1681 .is_empty()
1682 {
1683 let (tx_gas_status, tx_checked_input_objects) =
1688 iota_transaction_checks::check_certificate_input(
1689 certificate,
1690 tx_input_objects,
1691 protocol_config,
1692 reference_gas_price,
1693 )?;
1694
1695 let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1696 self.check_owned_locks(&owned_object_refs)?;
1697 epoch_store.executor().execute_transaction_to_effects(
1698 backing_store,
1699 protocol_config,
1700 self.metrics.limits_metrics.clone(),
1701 self.config
1704 .expensive_safety_check_config
1705 .enable_deep_per_tx_iota_conservation_check(),
1706 self.config.certificate_deny_config.certificate_deny_set(),
1707 &epoch_id,
1708 epoch_start_timestamp,
1709 tx_checked_input_objects,
1710 gas_data,
1711 tx_gas_status,
1712 kind,
1713 signer,
1714 tx_digest,
1715 &mut None,
1716 )
1717 } else {
1718 debug_assert_eq!(
1724 move_authenticators.len(),
1725 per_authenticator_inputs.len(),
1726 "Move authenticators amount must match the number of authenticator inputs"
1727 );
1728
1729 let per_authenticator_inputs = move_authenticators
1730 .iter()
1731 .zip(per_authenticator_inputs)
1732 .map(
1733 |(move_authenticator, (authenticator_input_objects, account_object))| {
1734 let (
1737 auth_account_object_id,
1738 auth_account_object_seq_number,
1739 auth_account_object_digest,
1740 ) = move_authenticator.object_to_authenticate_components()?;
1741
1742 let signer = move_authenticator.address()?;
1743
1744 let authenticator_function_ref_for_execution = self.check_move_account(
1745 auth_account_object_id,
1746 auth_account_object_seq_number,
1747 auth_account_object_digest,
1748 account_object,
1749 &signer,
1750 )?;
1751
1752 Ok((
1753 authenticator_input_objects,
1754 authenticator_function_ref_for_execution,
1755 ))
1756 },
1757 )
1758 .collect::<IotaResult<Vec<_>>>()?;
1759
1760 let per_authenticator_input_objects = per_authenticator_inputs
1761 .iter()
1762 .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1763 .collect::<Vec<_>>();
1764
1765 let tx_data_bytes =
1767 bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1768
1769 let authenticator_gas_budget = protocol_config.max_auth_gas();
1774 let (
1775 gas_status,
1776 per_authenticator_checked_input_objects,
1777 authenticator_and_tx_checked_input_objects,
1778 ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1779 certificate,
1780 tx_input_objects,
1781 per_authenticator_input_objects,
1782 authenticator_gas_budget,
1783 protocol_config,
1784 reference_gas_price,
1785 )?;
1786
1787 debug_assert_eq!(
1788 move_authenticators.len(),
1789 per_authenticator_checked_input_objects.len(),
1790 "Move authenticators amount must match the number of checked authenticator inputs"
1791 );
1792
1793 let move_authenticators = move_authenticators
1794 .into_iter()
1795 .zip(per_authenticator_inputs)
1796 .zip(per_authenticator_checked_input_objects)
1797 .map(
1798 |(
1799 (move_authenticator, (_, authenticator_function_ref_for_execution)),
1800 authenticator_checked_input_objects,
1801 )| {
1802 (
1803 move_authenticator.to_owned(),
1804 authenticator_function_ref_for_execution,
1805 authenticator_checked_input_objects,
1806 )
1807 },
1808 )
1809 .collect::<Vec<_>>();
1810
1811 let owned_object_refs = authenticator_and_tx_checked_input_objects
1812 .inner()
1813 .filter_owned_objects();
1814 self.check_owned_locks(&owned_object_refs)?;
1815
1816 epoch_store
1817 .executor()
1818 .authenticate_then_execute_transaction_to_effects(
1819 backing_store,
1820 protocol_config,
1821 self.metrics.limits_metrics.clone(),
1822 self.config
1823 .expensive_safety_check_config
1824 .enable_deep_per_tx_iota_conservation_check(),
1825 self.config.certificate_deny_config.certificate_deny_set(),
1826 &epoch_id,
1827 epoch_start_timestamp,
1828 gas_data,
1829 gas_status,
1830 move_authenticators,
1831 authenticator_and_tx_checked_input_objects,
1832 kind,
1833 signer,
1834 tx_digest,
1835 tx_data_bytes,
1836 &mut None,
1837 )
1838 };
1839
1840 fail_point_if!("cp_execution_nondeterminism", || {
1841 #[cfg(msim)]
1842 self.create_fail_state(certificate, epoch_store, &mut effects);
1843 });
1844
1845 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1846 if elapsed > 0.0 {
1847 self.metrics
1848 .prepare_cert_gas_latency_ratio
1849 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1850 }
1851
1852 Ok((inner_temp_store, effects, execution_error_opt.err()))
1853 }
1854
1855 pub fn prepare_certificate_for_benchmark(
1856 &self,
1857 certificate: &VerifiedExecutableTransaction,
1858 input_objects: InputObjects,
1859 epoch_store: &Arc<AuthorityPerEpochStore>,
1860 ) -> IotaResult<(
1861 InnerTemporaryStore,
1862 TransactionEffects,
1863 Option<ExecutionError>,
1864 )> {
1865 let lock = RwLock::new(epoch_store.epoch());
1866 let execution_guard = lock.try_read().unwrap();
1867
1868 self.prepare_certificate(
1869 &execution_guard,
1870 certificate,
1871 input_objects,
1872 vec![],
1873 epoch_store,
1874 )
1875 }
1876
1877 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1880 #[allow(clippy::type_complexity)]
1881 pub fn dry_exec_transaction(
1882 &self,
1883 transaction: TransactionData,
1884 transaction_digest: TransactionDigest,
1885 ) -> IotaResult<(
1886 DryRunTransactionBlockResponse,
1887 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1888 TransactionEffects,
1889 Option<ObjectID>,
1890 )> {
1891 let epoch_store = self.load_epoch_store_one_call_per_task();
1892 if !self.is_fullnode(&epoch_store) {
1893 return Err(IotaError::UnsupportedFeature {
1894 error: "dry-exec is only supported on fullnodes".to_string(),
1895 });
1896 }
1897
1898 if transaction.kind().is_system_tx() {
1899 return Err(IotaError::UnsupportedFeature {
1900 error: "dry-exec does not support system transactions".to_string(),
1901 });
1902 }
1903
1904 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1905 }
1906
1907 #[allow(clippy::type_complexity)]
1908 pub fn dry_exec_transaction_for_benchmark(
1909 &self,
1910 transaction: TransactionData,
1911 transaction_digest: TransactionDigest,
1912 ) -> IotaResult<(
1913 DryRunTransactionBlockResponse,
1914 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1915 TransactionEffects,
1916 Option<ObjectID>,
1917 )> {
1918 let epoch_store = self.load_epoch_store_one_call_per_task();
1919 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1920 }
1921
1922 #[instrument(level = "trace", skip_all)]
1923 #[allow(clippy::type_complexity)]
1924 fn dry_exec_transaction_impl(
1925 &self,
1926 epoch_store: &AuthorityPerEpochStore,
1927 transaction: TransactionData,
1928 transaction_digest: TransactionDigest,
1929 ) -> IotaResult<(
1930 DryRunTransactionBlockResponse,
1931 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1932 TransactionEffects,
1933 Option<ObjectID>,
1934 )> {
1935 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1937
1938 let input_object_kinds = transaction.input_objects()?;
1939 let receiving_object_refs = transaction.receiving_objects();
1940
1941 iota_transaction_checks::deny::check_transaction_for_signing(
1942 &transaction,
1943 &[],
1944 &input_object_kinds,
1945 &receiving_object_refs,
1946 &self.config.transaction_deny_config,
1947 self.get_backing_package_store().as_ref(),
1948 )?;
1949
1950 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1951 None,
1953 &input_object_kinds,
1954 &receiving_object_refs,
1955 epoch_store.epoch(),
1956 )?;
1957
1958 let mut transaction = transaction;
1960 let reference_gas_price = epoch_store.reference_gas_price();
1961 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1962 let sender = transaction.gas_owner();
1963 let gas_object_id = ObjectID::random();
1964 let gas_object = Object::new_move(
1965 MoveObject::new_gas_coin(
1966 OBJECT_START_VERSION,
1967 gas_object_id,
1968 SIMULATION_GAS_COIN_VALUE,
1969 ),
1970 Owner::AddressOwner(sender),
1971 TransactionDigest::genesis_marker(),
1972 );
1973 let gas_object_ref = gas_object.compute_object_reference();
1974 transaction.gas_data_mut().payment = vec![gas_object_ref];
1976 (
1977 iota_transaction_checks::check_transaction_input_with_given_gas(
1978 epoch_store.protocol_config(),
1979 reference_gas_price,
1980 &transaction,
1981 input_objects,
1982 receiving_objects,
1983 gas_object,
1984 &self.metrics.bytecode_verifier_metrics,
1985 &self.config.verifier_signing_config,
1986 )?,
1987 Some(gas_object_id),
1988 )
1989 } else {
1990 let authenticator_gas_budget = 0;
1993
1994 (
1995 iota_transaction_checks::check_transaction_input(
1996 epoch_store.protocol_config(),
1997 reference_gas_price,
1998 &transaction,
1999 input_objects,
2000 &receiving_objects,
2001 &self.metrics.bytecode_verifier_metrics,
2002 &self.config.verifier_signing_config,
2003 authenticator_gas_budget,
2004 )?,
2005 None,
2006 )
2007 };
2008
2009 let protocol_config = epoch_store.protocol_config();
2010 let (kind, signer, gas_data) = transaction.execution_parts();
2011
2012 let silent = true;
2013 let executor = iota_execution::executor(protocol_config, silent, None)
2014 .expect("Creating an executor should not fail here");
2015
2016 let expensive_checks = false;
2017 let (inner_temp_store, _, effects, execution_error) = executor
2018 .execute_transaction_to_effects(
2019 self.get_backing_store().as_ref(),
2020 protocol_config,
2021 self.metrics.limits_metrics.clone(),
2022 expensive_checks,
2023 self.config.certificate_deny_config.certificate_deny_set(),
2024 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2025 epoch_store
2026 .epoch_start_config()
2027 .epoch_data()
2028 .epoch_start_timestamp(),
2029 checked_input_objects,
2030 gas_data,
2031 gas_status,
2032 kind,
2033 signer,
2034 transaction_digest,
2035 &mut None,
2036 );
2037 let tx_digest = *effects.transaction_digest();
2038
2039 let module_cache =
2040 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2041
2042 let mut layout_resolver =
2043 epoch_store
2044 .executor()
2045 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2046 &inner_temp_store,
2047 self.get_backing_package_store(),
2048 )));
2049 let object_changes = Vec::new();
2051
2052 let balance_changes = Vec::new();
2054
2055 let written_with_kind = effects
2056 .created()
2057 .into_iter()
2058 .map(|(oref, _)| (oref, WriteKind::Create))
2059 .chain(
2060 effects
2061 .unwrapped()
2062 .into_iter()
2063 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2064 )
2065 .chain(
2066 effects
2067 .mutated()
2068 .into_iter()
2069 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2070 )
2071 .map(|(oref, kind)| {
2072 let obj = inner_temp_store.written.get(&oref.0).unwrap();
2073 (oref.0, (oref, obj.clone(), kind))
2075 })
2076 .collect();
2077
2078 let execution_error_source = execution_error
2079 .as_ref()
2080 .err()
2081 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2082
2083 Ok((
2084 DryRunTransactionBlockResponse {
2085 suggested_gas_price: self
2087 .congestion_tracker
2088 .get_prediction_suggested_gas_price(&transaction),
2089 input: IotaTransactionBlockData::try_from_with_module_cache(
2090 transaction,
2091 &module_cache,
2092 tx_digest,
2093 )
2094 .map_err(|e| IotaError::TransactionSerialization {
2095 error: format!(
2096 "Failed to convert transaction to IotaTransactionBlockData: {e}",
2097 ),
2098 })?, effects: effects.clone().try_into()?,
2100 events: IotaTransactionBlockEvents::try_from(
2101 inner_temp_store.events.clone(),
2102 tx_digest,
2103 None,
2104 layout_resolver.as_mut(),
2105 )?,
2106 object_changes,
2107 balance_changes,
2108 execution_error_source,
2109 },
2110 written_with_kind,
2111 effects,
2112 mock_gas,
2113 ))
2114 }
2115
2116 pub fn simulate_transaction(
2117 &self,
2118 mut transaction: TransactionData,
2119 checks: VmChecks,
2120 ) -> IotaResult<SimulateTransactionResult> {
2121 if transaction.kind().is_system_tx() {
2122 return Err(IotaError::UnsupportedFeature {
2123 error: "simulate does not support system transactions".to_string(),
2124 });
2125 }
2126
2127 let epoch_store = self.load_epoch_store_one_call_per_task();
2128 if !self.is_fullnode(&epoch_store) {
2129 return Err(IotaError::UnsupportedFeature {
2130 error: "simulate is only supported on fullnodes".to_string(),
2131 });
2132 }
2133
2134 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2138
2139 let input_object_kinds = transaction.input_objects()?;
2140 let receiving_object_refs = transaction.receiving_objects();
2141
2142 iota_transaction_checks::deny::check_transaction_for_signing(
2145 &transaction,
2146 &[],
2147 &input_object_kinds,
2148 &receiving_object_refs,
2149 &self.config.transaction_deny_config,
2150 self.get_backing_package_store().as_ref(),
2151 )?;
2152
2153 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2155 None,
2157 &input_object_kinds,
2158 &receiving_object_refs,
2159 epoch_store.epoch(),
2160 )?;
2161
2162 let mock_gas_id = if transaction.gas().is_empty() {
2164 let mock_gas_object = Object::new_move(
2165 MoveObject::new_gas_coin(
2166 OBJECT_START_VERSION,
2167 ObjectID::MAX,
2168 SIMULATION_GAS_COIN_VALUE,
2169 ),
2170 Owner::AddressOwner(transaction.gas_data().owner),
2171 TransactionDigest::genesis_marker(),
2172 );
2173 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2174 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2175 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2176 Some(mock_gas_object.id())
2177 } else {
2178 None
2179 };
2180
2181 let protocol_config = epoch_store.protocol_config();
2182
2183 let authenticator_gas_budget = 0;
2186
2187 let (gas_status, checked_input_objects) = if checks.enabled() {
2190 iota_transaction_checks::check_transaction_input(
2191 protocol_config,
2192 epoch_store.reference_gas_price(),
2193 &transaction,
2194 input_objects,
2195 &receiving_objects,
2196 &self.metrics.bytecode_verifier_metrics,
2197 &self.config.verifier_signing_config,
2198 authenticator_gas_budget,
2199 )?
2200 } else {
2201 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2202 protocol_config,
2203 transaction.kind(),
2204 input_objects,
2205 receiving_objects,
2206 )?;
2207 let gas_status = IotaGasStatus::new(
2208 transaction.gas_budget(),
2209 transaction.gas_price(),
2210 epoch_store.reference_gas_price(),
2211 protocol_config,
2212 )?;
2213
2214 (gas_status, checked_input_objects)
2215 };
2216
2217 let executor = iota_execution::executor(
2219 protocol_config,
2220 true, None,
2222 )
2223 .expect("Creating an executor should not fail here");
2224
2225 let (kind, signer, gas_data) = transaction.execution_parts();
2227 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2228 self.get_backing_store().as_ref(),
2229 protocol_config,
2230 self.metrics.limits_metrics.clone(),
2231 false, self.config.certificate_deny_config.certificate_deny_set(),
2233 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2234 epoch_store
2235 .epoch_start_config()
2236 .epoch_data()
2237 .epoch_start_timestamp(),
2238 checked_input_objects,
2239 gas_data,
2240 gas_status,
2241 kind,
2242 signer,
2243 transaction.digest(),
2244 checks.disabled(),
2245 );
2246
2247 Ok(SimulateTransactionResult {
2250 input_objects: inner_temp_store.input_objects,
2251 output_objects: inner_temp_store.written,
2252 events: effects.events_digest().map(|_| inner_temp_store.events),
2253 effects,
2254 execution_result,
2255 suggested_gas_price: self
2256 .congestion_tracker
2257 .get_prediction_suggested_gas_price(&transaction),
2258 mock_gas_id,
2259 })
2260 }
2261
2262 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2267 pub async fn dev_inspect_transaction_block(
2268 &self,
2269 sender: IotaAddress,
2270 transaction_kind: TransactionKind,
2271 gas_price: Option<u64>,
2272 gas_budget: Option<u64>,
2273 gas_sponsor: Option<IotaAddress>,
2274 gas_objects: Option<Vec<ObjectRef>>,
2275 show_raw_txn_data_and_effects: Option<bool>,
2276 skip_checks: Option<bool>,
2277 ) -> IotaResult<DevInspectResults> {
2278 let epoch_store = self.load_epoch_store_one_call_per_task();
2279
2280 if !self.is_fullnode(&epoch_store) {
2281 return Err(IotaError::UnsupportedFeature {
2282 error: "dev-inspect is only supported on fullnodes".to_string(),
2283 });
2284 }
2285
2286 if transaction_kind.is_system_tx() {
2287 return Err(IotaError::UnsupportedFeature {
2288 error: "system transactions are not supported".to_string(),
2289 });
2290 }
2291
2292 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2293 let skip_checks = skip_checks.unwrap_or(true);
2294 let reference_gas_price = epoch_store.reference_gas_price();
2295 let protocol_config = epoch_store.protocol_config();
2296 let max_tx_gas = protocol_config.max_tx_gas();
2297
2298 let price = gas_price.unwrap_or(reference_gas_price);
2299 let budget = gas_budget.unwrap_or(max_tx_gas);
2300 let owner = gas_sponsor.unwrap_or(sender);
2301 let payment = gas_objects.unwrap_or_default();
2304 let mut transaction = TransactionData::V1(TransactionDataV1 {
2305 kind: transaction_kind.clone(),
2306 sender,
2307 gas_data: GasData {
2308 payment,
2309 owner,
2310 price,
2311 budget,
2312 },
2313 expiration: TransactionExpiration::None,
2314 });
2315
2316 let raw_txn_data = if show_raw_txn_data_and_effects {
2317 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2318 error: "Failed to serialize transaction during dev inspect".to_string(),
2319 })?
2320 } else {
2321 vec![]
2322 };
2323
2324 transaction.validity_check_no_gas_check(protocol_config)?;
2325
2326 let input_object_kinds = transaction.input_objects()?;
2327 let receiving_object_refs = transaction.receiving_objects();
2328
2329 iota_transaction_checks::deny::check_transaction_for_signing(
2330 &transaction,
2331 &[],
2332 &input_object_kinds,
2333 &receiving_object_refs,
2334 &self.config.transaction_deny_config,
2335 self.get_backing_package_store().as_ref(),
2336 )?;
2337
2338 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2339 None,
2341 &input_object_kinds,
2342 &receiving_object_refs,
2343 epoch_store.epoch(),
2344 )?;
2345
2346 let (gas_status, checked_input_objects) = if skip_checks {
2347 if transaction.gas().is_empty() {
2352 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2354 SIMULATION_GAS_COIN_VALUE,
2355 transaction.gas_owner(),
2356 );
2357 let gas_object_ref = dummy_gas_object.compute_object_reference();
2358 transaction.gas_data_mut().payment = vec![gas_object_ref];
2359 input_objects.push(ObjectReadResult::new(
2360 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2361 dummy_gas_object.into(),
2362 ));
2363 }
2364 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2365 protocol_config,
2366 &transaction_kind,
2367 input_objects,
2368 receiving_objects,
2369 )?;
2370 let gas_status = IotaGasStatus::new(
2371 max_tx_gas,
2372 transaction.gas_price(),
2373 reference_gas_price,
2374 protocol_config,
2375 )?;
2376
2377 (gas_status, checked_input_objects)
2378 } else {
2379 if transaction.gas().is_empty() {
2383 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2385 SIMULATION_GAS_COIN_VALUE,
2386 transaction.gas_owner(),
2387 );
2388 let gas_object_ref = dummy_gas_object.compute_object_reference();
2389 transaction.gas_data_mut().payment = vec![gas_object_ref];
2390 iota_transaction_checks::check_transaction_input_with_given_gas(
2391 epoch_store.protocol_config(),
2392 reference_gas_price,
2393 &transaction,
2394 input_objects,
2395 receiving_objects,
2396 dummy_gas_object,
2397 &self.metrics.bytecode_verifier_metrics,
2398 &self.config.verifier_signing_config,
2399 )?
2400 } else {
2401 let authenticator_gas_budget = 0;
2404
2405 iota_transaction_checks::check_transaction_input(
2406 epoch_store.protocol_config(),
2407 reference_gas_price,
2408 &transaction,
2409 input_objects,
2410 &receiving_objects,
2411 &self.metrics.bytecode_verifier_metrics,
2412 &self.config.verifier_signing_config,
2413 authenticator_gas_budget,
2414 )?
2415 }
2416 };
2417
2418 let executor = iota_execution::executor(protocol_config, true, None)
2419 .expect("Creating an executor should not fail here");
2420 let gas_data = transaction.gas_data().clone();
2421 let intent_msg = IntentMessage::new(
2422 Intent {
2423 version: IntentVersion::V0,
2424 scope: IntentScope::TransactionData,
2425 app_id: IntentAppId::Iota,
2426 },
2427 transaction,
2428 );
2429 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2430 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2431 self.get_backing_store().as_ref(),
2432 protocol_config,
2433 self.metrics.limits_metrics.clone(),
2434 false,
2436 self.config.certificate_deny_config.certificate_deny_set(),
2437 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2438 epoch_store
2439 .epoch_start_config()
2440 .epoch_data()
2441 .epoch_start_timestamp(),
2442 checked_input_objects,
2443 gas_data,
2444 gas_status,
2445 transaction_kind,
2446 sender,
2447 transaction_digest,
2448 skip_checks,
2449 );
2450
2451 let raw_effects = if show_raw_txn_data_and_effects {
2452 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2453 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2454 })?
2455 } else {
2456 vec![]
2457 };
2458
2459 let mut layout_resolver =
2460 epoch_store
2461 .executor()
2462 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2463 &inner_temp_store,
2464 self.get_backing_package_store(),
2465 )));
2466
2467 DevInspectResults::new(
2468 effects,
2469 inner_temp_store.events.clone(),
2470 execution_result,
2471 raw_txn_data,
2472 raw_effects,
2473 layout_resolver.as_mut(),
2474 )
2475 }
2476
2477 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2479 let epoch_store = self.epoch_store_for_testing();
2480 Ok(epoch_store.reference_gas_price())
2481 }
2482
2483 #[instrument(level = "trace", skip_all)]
2484 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2485 self.get_transaction_cache_reader()
2486 .try_is_tx_already_executed(digest)
2487 }
2488
2489 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2491 self.try_is_tx_already_executed(digest)
2492 .expect("storage access failed")
2493 }
2494
2495 #[instrument(level = "debug", skip_all, err)]
2497 fn index_tx(
2498 &self,
2499 indexes: &IndexStore,
2500 digest: &TransactionDigest,
2501 cert: &VerifiedExecutableTransaction,
2503 effects: &TransactionEffects,
2504 events: &TransactionEvents,
2505 timestamp_ms: u64,
2506 tx_coins: Option<TxCoins>,
2507 written: &WrittenObjects,
2508 inner_temporary_store: &InnerTemporaryStore,
2509 ) -> IotaResult<u64> {
2510 let changes = self
2511 .process_object_index(effects, written, inner_temporary_store)
2512 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2513
2514 indexes.index_tx(
2515 cert.data().intent_message().value.sender(),
2516 cert.data()
2517 .intent_message()
2518 .value
2519 .input_objects()?
2520 .iter()
2521 .map(|o| o.object_id()),
2522 effects
2523 .all_changed_objects()
2524 .into_iter()
2525 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2526 cert.data()
2527 .intent_message()
2528 .value
2529 .move_calls()
2530 .into_iter()
2531 .map(|(package, module, function)| {
2532 (*package, module.to_owned(), function.to_owned())
2533 }),
2534 events,
2535 changes,
2536 digest,
2537 timestamp_ms,
2538 tx_coins,
2539 )
2540 }
2541
2542 #[cfg(msim)]
2543 fn create_fail_state(
2544 &self,
2545 certificate: &VerifiedExecutableTransaction,
2546 epoch_store: &Arc<AuthorityPerEpochStore>,
2547 effects: &mut TransactionEffects,
2548 ) {
2549 use std::cell::RefCell;
2550 thread_local! {
2551 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2552 }
2553 if !certificate.data().intent_message().value.is_system_tx() {
2554 let committee = epoch_store.committee();
2555 let cur_stake = (**committee).weight(&self.name);
2556 if cur_stake > 0 {
2557 FAIL_STATE.with_borrow_mut(|fail_state| {
2558 if fail_state.0 < committee.validity_threshold() {
2560 fail_state.0 += cur_stake;
2561 fail_state.1.insert(self.name);
2562 }
2563
2564 if fail_state.1.contains(&self.name) {
2565 info!("cp_exec failing tx");
2566 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2567 }
2568 });
2569 }
2570 }
2571 }
2572
2573 fn process_object_index(
2574 &self,
2575 effects: &TransactionEffects,
2576 written: &WrittenObjects,
2577 inner_temporary_store: &InnerTemporaryStore,
2578 ) -> IotaResult<ObjectIndexChanges> {
2579 let epoch_store = self.load_epoch_store_one_call_per_task();
2580 let mut layout_resolver =
2581 epoch_store
2582 .executor()
2583 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2584 inner_temporary_store,
2585 self.get_backing_package_store(),
2586 )));
2587
2588 let modified_at_version = effects
2589 .modified_at_versions()
2590 .into_iter()
2591 .collect::<HashMap<_, _>>();
2592
2593 let tx_digest = effects.transaction_digest();
2594 let mut deleted_owners = vec![];
2595 let mut deleted_dynamic_fields = vec![];
2596 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2597 let old_version = modified_at_version.get(&id).unwrap();
2598 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2601 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2602 ) {
2603 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2604 Owner::ObjectOwner(object_id) => {
2605 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2606 }
2607 _ => {}
2608 }
2609 }
2610
2611 let mut new_owners = vec![];
2612 let mut new_dynamic_fields = vec![];
2613
2614 for (oref, owner, kind) in effects.all_changed_objects() {
2615 let id = &oref.0;
2616 if let WriteKind::Mutate = kind {
2619 let Some(old_version) = modified_at_version.get(id) else {
2620 panic!(
2621 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2622 );
2623 };
2624 let Some(old_object) = self
2627 .get_object_store()
2628 .try_get_object_by_key(id, *old_version)?
2629 else {
2630 panic!(
2631 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2632 );
2633 };
2634 if old_object.owner != owner {
2635 match old_object.owner {
2636 Owner::AddressOwner(addr) => {
2637 deleted_owners.push((addr, *id));
2638 }
2639 Owner::ObjectOwner(object_id) => {
2640 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2641 }
2642 _ => {}
2643 }
2644 }
2645 }
2646
2647 match owner {
2648 Owner::AddressOwner(addr) => {
2649 let new_object = written.get(id).unwrap_or_else(
2652 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2653 );
2654 assert_eq!(
2655 new_object.version(),
2656 oref.1,
2657 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2658 tx_digest,
2659 id,
2660 new_object.version(),
2661 oref.1
2662 );
2663
2664 let type_ = new_object
2665 .type_()
2666 .map(|type_| ObjectType::Struct(type_.clone()))
2667 .unwrap_or(ObjectType::Package);
2668
2669 new_owners.push((
2670 (addr, *id),
2671 ObjectInfo {
2672 object_id: *id,
2673 version: oref.1,
2674 digest: oref.2,
2675 type_,
2676 owner,
2677 previous_transaction: *effects.transaction_digest(),
2678 },
2679 ));
2680 }
2681 Owner::ObjectOwner(owner) => {
2682 let new_object = written.get(id).unwrap_or_else(
2683 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2684 );
2685 assert_eq!(
2686 new_object.version(),
2687 oref.1,
2688 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2689 tx_digest,
2690 id,
2691 new_object.version(),
2692 oref.1
2693 );
2694
2695 let Some(df_info) = self
2696 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2697 .unwrap_or_else(|e| {
2698 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2699 None
2700 }
2701 )
2702 else {
2703 continue;
2705 };
2706 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2707 }
2708 _ => {}
2709 }
2710 }
2711
2712 Ok(ObjectIndexChanges {
2713 deleted_owners,
2714 deleted_dynamic_fields,
2715 new_owners,
2716 new_dynamic_fields,
2717 })
2718 }
2719
2720 fn try_create_dynamic_field_info(
2721 &self,
2722 o: &Object,
2723 written: &WrittenObjects,
2724 resolver: &mut dyn LayoutResolver,
2725 ) -> IotaResult<Option<DynamicFieldInfo>> {
2726 let Some(move_object) = o.data.try_as_move().cloned() else {
2728 return Ok(None);
2729 };
2730
2731 if !move_object.type_().is_dynamic_field() {
2733 return Ok(None);
2734 }
2735
2736 let layout = resolver
2737 .get_annotated_layout(&move_object.type_().clone().into())?
2738 .into_layout();
2739
2740 let field =
2741 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2742 IotaError::ObjectDeserialization {
2743 error: e.to_string(),
2744 }
2745 })?;
2746
2747 let type_ = field.kind;
2748 let name_type: TypeTag = field.name_layout.into();
2749 let bcs_name = field.name_bytes.to_owned();
2750
2751 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2752 .map_err(|e| {
2753 warn!("{e}");
2754 IotaError::ObjectDeserialization {
2755 error: e.to_string(),
2756 }
2757 })?;
2758
2759 let name = DynamicFieldName {
2760 type_: name_type,
2761 value: IotaMoveValue::from(name_value).to_json_value(),
2762 };
2763
2764 let value_metadata = field.value_metadata().map_err(|e| {
2765 warn!("{e}");
2766 IotaError::ObjectDeserialization {
2767 error: e.to_string(),
2768 }
2769 })?;
2770
2771 Ok(Some(match value_metadata {
2772 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2773 name,
2774 bcs_name,
2775 type_,
2776 object_type: object_type.to_canonical_string(true),
2777 object_id: o.id(),
2778 version: o.version(),
2779 digest: o.digest(),
2780 },
2781
2782 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2783 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2788 (
2789 object.version(),
2790 object.digest(),
2791 object.data.type_().unwrap().clone(),
2792 )
2793 } else {
2794 let object = self
2796 .get_object_store()
2797 .try_get_object_by_key(&object_id, o.version())?
2798 .ok_or_else(|| UserInputError::ObjectNotFound {
2799 object_id,
2800 version: Some(o.version()),
2801 })?;
2802 let version = object.version();
2803 let digest = object.digest();
2804 let object_type = object.data.type_().unwrap().clone();
2805 (version, digest, object_type)
2806 };
2807
2808 DynamicFieldInfo {
2809 name,
2810 bcs_name,
2811 type_,
2812 object_type: object_type.to_string(),
2813 object_id,
2814 version,
2815 digest,
2816 }
2817 }
2818 }))
2819 }
2820
2821 #[instrument(level = "trace", skip_all, err)]
2822 fn post_process_one_tx(
2823 &self,
2824 certificate: &VerifiedExecutableTransaction,
2825 effects: &TransactionEffects,
2826 inner_temporary_store: &InnerTemporaryStore,
2827 epoch_store: &Arc<AuthorityPerEpochStore>,
2828 ) -> IotaResult {
2829 if self.indexes.is_none() {
2830 return Ok(());
2831 }
2832
2833 let _scope = monitored_scope("Execution::post_process_one_tx");
2834
2835 let tx_digest = certificate.digest();
2836 let timestamp_ms = Self::unixtime_now_ms();
2837 let events = &inner_temporary_store.events;
2838 let written = &inner_temporary_store.written;
2839 let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2840 effects,
2841 inner_temporary_store,
2842 epoch_store,
2843 );
2844
2845 if let Some(indexes) = &self.indexes {
2847 let _ = self
2848 .index_tx(
2849 indexes.as_ref(),
2850 tx_digest,
2851 certificate,
2852 effects,
2853 events,
2854 timestamp_ms,
2855 tx_coins,
2856 written,
2857 inner_temporary_store,
2858 )
2859 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2860 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2861 .expect("Indexing tx should not fail");
2862
2863 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2864 let events = self.make_transaction_block_events(
2865 events.clone(),
2866 *tx_digest,
2867 timestamp_ms,
2868 epoch_store,
2869 inner_temporary_store,
2870 )?;
2871 self.subscription_handler
2873 .process_tx(certificate.data().transaction_data(), &effects, &events)
2874 .tap_ok(|_| {
2875 self.metrics
2876 .post_processing_total_tx_had_event_processed
2877 .inc()
2878 })
2879 .tap_err(|e| {
2880 warn!(
2881 ?tx_digest,
2882 "Post processing - Couldn't process events for tx: {}", e
2883 )
2884 })?;
2885
2886 self.metrics
2887 .post_processing_total_events_emitted
2888 .inc_by(events.data.len() as u64);
2889 };
2890 Ok(())
2891 }
2892
2893 fn make_transaction_block_events(
2894 &self,
2895 transaction_events: TransactionEvents,
2896 digest: TransactionDigest,
2897 timestamp_ms: u64,
2898 epoch_store: &Arc<AuthorityPerEpochStore>,
2899 inner_temporary_store: &InnerTemporaryStore,
2900 ) -> IotaResult<IotaTransactionBlockEvents> {
2901 let mut layout_resolver =
2902 epoch_store
2903 .executor()
2904 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2905 inner_temporary_store,
2906 self.get_backing_package_store(),
2907 )));
2908 IotaTransactionBlockEvents::try_from(
2909 transaction_events,
2910 digest,
2911 Some(timestamp_ms),
2912 layout_resolver.as_mut(),
2913 )
2914 }
2915
2916 pub fn unixtime_now_ms() -> u64 {
2917 let now = SystemTime::now()
2918 .duration_since(UNIX_EPOCH)
2919 .expect("Time went backwards")
2920 .as_millis();
2921 u64::try_from(now).expect("Travelling in time machine")
2922 }
2923
2924 #[instrument(level = "trace", skip_all)]
2925 pub async fn handle_transaction_info_request(
2926 &self,
2927 request: TransactionInfoRequest,
2928 ) -> IotaResult<TransactionInfoResponse> {
2929 let epoch_store = self.load_epoch_store_one_call_per_task();
2930 let (transaction, status) = self
2931 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2932 .ok_or(IotaError::TransactionNotFound {
2933 digest: request.transaction_digest,
2934 })?;
2935 Ok(TransactionInfoResponse {
2936 transaction,
2937 status,
2938 })
2939 }
2940
2941 #[instrument(level = "trace", skip_all)]
2942 pub async fn handle_object_info_request(
2943 &self,
2944 request: ObjectInfoRequest,
2945 ) -> IotaResult<ObjectInfoResponse> {
2946 let epoch_store = self.load_epoch_store_one_call_per_task();
2947
2948 let requested_object_seq = match request.request_kind {
2949 ObjectInfoRequestKind::LatestObjectInfo => {
2950 let (_, seq, _) = self
2951 .try_get_object_or_tombstone(request.object_id)
2952 .await?
2953 .ok_or_else(|| {
2954 IotaError::from(UserInputError::ObjectNotFound {
2955 object_id: request.object_id,
2956 version: None,
2957 })
2958 })?;
2959 seq
2960 }
2961 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2962 };
2963
2964 let object = self
2965 .get_object_store()
2966 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2967 .ok_or_else(|| {
2968 IotaError::from(UserInputError::ObjectNotFound {
2969 object_id: request.object_id,
2970 version: Some(requested_object_seq),
2971 })
2972 })?;
2973
2974 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2975 (request.generate_layout, object.data.try_as_move())
2976 {
2977 Some(into_struct_layout(
2978 epoch_store
2979 .executor()
2980 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2981 .get_annotated_layout(&move_obj.type_().clone().into())?,
2982 )?)
2983 } else {
2984 None
2985 };
2986
2987 let lock = if !object.is_address_owned() {
2988 None
2990 } else {
2991 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2992 .await?
2993 .map(|s| s.into_inner())
2994 };
2995
2996 Ok(ObjectInfoResponse {
2997 object,
2998 layout,
2999 lock_for_debugging: lock,
3000 })
3001 }
3002
3003 #[instrument(level = "trace", skip_all)]
3004 pub fn handle_checkpoint_request(
3005 &self,
3006 request: &CheckpointRequest,
3007 ) -> IotaResult<CheckpointResponse> {
3008 let summary = if request.certified {
3009 let summary = match request.sequence_number {
3010 Some(seq) => self
3011 .checkpoint_store
3012 .get_checkpoint_by_sequence_number(seq)?,
3013 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3014 }
3015 .map(|v| v.into_inner());
3016 summary.map(CheckpointSummaryResponse::Certified)
3017 } else {
3018 let summary = match request.sequence_number {
3019 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3020 None => self
3021 .checkpoint_store
3022 .get_latest_locally_computed_checkpoint()?,
3023 };
3024 summary.map(CheckpointSummaryResponse::Pending)
3025 };
3026 let contents = match &summary {
3027 Some(s) => self
3028 .checkpoint_store
3029 .get_checkpoint_contents(&s.content_digest())?,
3030 None => None,
3031 };
3032 Ok(CheckpointResponse {
3033 checkpoint: summary,
3034 contents,
3035 })
3036 }
3037
3038 fn check_protocol_version(
3039 supported_protocol_versions: SupportedProtocolVersions,
3040 current_version: ProtocolVersion,
3041 ) {
3042 info!("current protocol version is now {:?}", current_version);
3043 info!("supported versions are: {:?}", supported_protocol_versions);
3044 if !supported_protocol_versions.is_version_supported(current_version) {
3045 let msg = format!(
3046 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3047 );
3048
3049 error!("{}", msg);
3050 eprintln!("{msg}");
3051
3052 #[cfg(not(msim))]
3053 std::process::exit(1);
3054
3055 #[cfg(msim)]
3056 iota_simulator::task::shutdown_current_node();
3057 }
3058 }
3059
3060 #[expect(clippy::disallowed_methods)] pub async fn new(
3062 name: AuthorityName,
3063 secret: StableSyncAuthoritySigner,
3064 supported_protocol_versions: SupportedProtocolVersions,
3065 store: Arc<AuthorityStore>,
3066 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3067 epoch_store: Arc<AuthorityPerEpochStore>,
3068 committee_store: Arc<CommitteeStore>,
3069 indexes: Option<Arc<IndexStore>>,
3070 grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3071 checkpoint_store: Arc<CheckpointStore>,
3072 prometheus_registry: &Registry,
3073 genesis_objects: &[Object],
3074 db_checkpoint_config: &DBCheckpointConfig,
3075 config: NodeConfig,
3076 archive_readers: ArchiveReaderBalancer,
3077 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3078 chain_identifier: ChainIdentifier,
3079 pruner_db: Option<Arc<AuthorityPrunerTables>>,
3080 ) -> Arc<Self> {
3081 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3082
3083 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3084 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3085 let transaction_manager = Arc::new(TransactionManager::new(
3086 execution_cache_trait_pointers.object_cache_reader.clone(),
3087 execution_cache_trait_pointers
3088 .transaction_cache_reader
3089 .clone(),
3090 &epoch_store,
3091 tx_ready_certificates,
3092 metrics.clone(),
3093 ));
3094 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3095
3096 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3097 epoch_store.get_parent_path(),
3098 &config.authority_store_pruning_config,
3099 );
3100 let _pruner = AuthorityStorePruner::new(
3101 store.perpetual_tables.clone(),
3102 checkpoint_store.clone(),
3103 grpc_indexes_store.clone(),
3104 indexes.clone(),
3105 config.authority_store_pruning_config.clone(),
3106 epoch_store.committee().authority_exists(&name),
3107 epoch_store.epoch_start_state().epoch_duration_ms(),
3108 prometheus_registry,
3109 archive_readers,
3110 pruner_db,
3111 );
3112 let input_loader =
3113 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3114 let epoch = epoch_store.epoch();
3115 let rgp = epoch_store.reference_gas_price();
3116 let state = Arc::new(AuthorityState {
3117 name,
3118 secret,
3119 execution_lock: RwLock::new(epoch),
3120 epoch_store: ArcSwap::new(epoch_store.clone()),
3121 input_loader,
3122 execution_cache_trait_pointers,
3123 indexes,
3124 grpc_indexes_store,
3125 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3126 checkpoint_store,
3127 committee_store,
3128 transaction_manager,
3129 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3130 metrics,
3131 _pruner,
3132 _authority_per_epoch_pruner,
3133 db_checkpoint_config: db_checkpoint_config.clone(),
3134 config,
3135 overload_info: AuthorityOverloadInfo::default(),
3136 validator_tx_finalizer,
3137 chain_identifier,
3138 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3139 });
3140
3141 let authority_state = Arc::downgrade(&state);
3143 spawn_monitored_task!(execution_process(
3144 authority_state,
3145 rx_ready_certificates,
3146 rx_execution_shutdown,
3147 ));
3148 spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3149
3150 state
3152 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3153 .expect("Error indexing genesis objects.");
3154
3155 state
3156 }
3157
3158 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3160 &self.execution_cache_trait_pointers.object_cache_reader
3161 }
3162
3163 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3164 &self.execution_cache_trait_pointers.transaction_cache_reader
3165 }
3166
3167 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3168 &self.execution_cache_trait_pointers.cache_writer
3169 }
3170
3171 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3172 &self.execution_cache_trait_pointers.backing_store
3173 }
3174
3175 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3176 &self.execution_cache_trait_pointers.backing_package_store
3177 }
3178
3179 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3180 &self.execution_cache_trait_pointers.object_store
3181 }
3182
3183 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3184 &self.execution_cache_trait_pointers.reconfig_api
3185 }
3186
3187 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3188 &self.execution_cache_trait_pointers.accumulator_store
3189 }
3190
3191 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3192 &self.execution_cache_trait_pointers.checkpoint_cache
3193 }
3194
3195 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3196 &self.execution_cache_trait_pointers.state_sync_store
3197 }
3198
3199 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3200 &self.execution_cache_trait_pointers.cache_commit
3201 }
3202
3203 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3204 self.execution_cache_trait_pointers
3205 .testing_api
3206 .database_for_testing()
3207 }
3208
3209 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3210 &self,
3211 config: NodeConfig,
3212 metrics: Arc<AuthorityStorePruningMetrics>,
3213 ) -> anyhow::Result<()> {
3214 let archive_readers =
3215 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3216 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3217 &self.database_for_testing().perpetual_tables,
3218 &self.checkpoint_store,
3219 self.grpc_indexes_store.as_deref(),
3220 None,
3221 config.authority_store_pruning_config,
3222 metrics,
3223 archive_readers,
3224 EPOCH_DURATION_MS_FOR_TESTING,
3225 )
3226 .await
3227 }
3228
3229 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3230 &self.transaction_manager
3231 }
3232
3233 pub fn enqueue_transactions_for_execution(
3236 &self,
3237 txns: Vec<VerifiedExecutableTransaction>,
3238 epoch_store: &Arc<AuthorityPerEpochStore>,
3239 ) {
3240 self.transaction_manager.enqueue(txns, epoch_store)
3241 }
3242
3243 pub fn enqueue_certificates_for_execution(
3245 &self,
3246 certs: Vec<VerifiedCertificate>,
3247 epoch_store: &Arc<AuthorityPerEpochStore>,
3248 ) {
3249 self.transaction_manager
3250 .enqueue_certificates(certs, epoch_store)
3251 }
3252
3253 pub fn enqueue_with_expected_effects_digest(
3254 &self,
3255 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3256 epoch_store: &AuthorityPerEpochStore,
3257 ) {
3258 self.transaction_manager
3259 .enqueue_with_expected_effects_digest(certs, epoch_store)
3260 }
3261
3262 fn create_owner_index_if_empty(
3263 &self,
3264 genesis_objects: &[Object],
3265 epoch_store: &Arc<AuthorityPerEpochStore>,
3266 ) -> IotaResult {
3267 let Some(index_store) = &self.indexes else {
3268 return Ok(());
3269 };
3270 if !index_store.is_empty() {
3271 return Ok(());
3272 }
3273
3274 let mut new_owners = vec![];
3275 let mut new_dynamic_fields = vec![];
3276 let mut layout_resolver = epoch_store
3277 .executor()
3278 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3279 for o in genesis_objects.iter() {
3280 match o.owner {
3281 Owner::AddressOwner(addr) => new_owners.push((
3282 (addr, o.id()),
3283 ObjectInfo::new(&o.compute_object_reference(), o),
3284 )),
3285 Owner::ObjectOwner(object_id) => {
3286 let id = o.id();
3287 let info = match self.try_create_dynamic_field_info(
3288 o,
3289 &BTreeMap::new(),
3290 layout_resolver.as_mut(),
3291 ) {
3292 Ok(Some(info)) => info,
3293 Ok(None) => continue,
3294 Err(IotaError::UserInput {
3295 error:
3296 UserInputError::ObjectNotFound {
3297 object_id: not_found_id,
3298 version,
3299 },
3300 }) => {
3301 warn!(
3302 ?not_found_id,
3303 ?version,
3304 object_owner=?object_id,
3305 field=?id,
3306 "Skipping dynamic field: referenced genesis object not found"
3307 );
3308 continue;
3309 }
3310 Err(e) => return Err(e),
3311 };
3312 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3313 }
3314 _ => {}
3315 }
3316 }
3317
3318 index_store.insert_genesis_objects(ObjectIndexChanges {
3319 deleted_owners: vec![],
3320 deleted_dynamic_fields: vec![],
3321 new_owners,
3322 new_dynamic_fields,
3323 })
3324 }
3325
3326 pub fn execution_lock_for_executable_transaction(
3330 &self,
3331 transaction: &VerifiedExecutableTransaction,
3332 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3333 let lock = self
3334 .execution_lock
3335 .try_read()
3336 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3337 if *lock == transaction.auth_sig().epoch() {
3338 Ok(lock)
3339 } else {
3340 Err(IotaError::WrongEpoch {
3341 expected_epoch: *lock,
3342 actual_epoch: transaction.auth_sig().epoch(),
3343 })
3344 }
3345 }
3346
3347 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3353 self.execution_lock
3354 .try_read()
3355 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3356 }
3357
3358 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3359 self.execution_lock.write().await
3360 }
3361
3362 #[instrument(level = "error", skip_all)]
3363 pub async fn reconfigure(
3364 &self,
3365 cur_epoch_store: &AuthorityPerEpochStore,
3366 supported_protocol_versions: SupportedProtocolVersions,
3367 new_committee: Committee,
3368 epoch_start_configuration: EpochStartConfiguration,
3369 accumulator: Arc<StateAccumulator>,
3370 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3371 epoch_supply_change: i64,
3372 epoch_last_checkpoint: CheckpointSequenceNumber,
3373 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3374 Self::check_protocol_version(
3375 supported_protocol_versions,
3376 epoch_start_configuration
3377 .epoch_start_state()
3378 .protocol_version(),
3379 );
3380 self.metrics.reset_on_reconfigure();
3381 self.committee_store.insert_new_committee(&new_committee)?;
3382
3383 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3385
3386 cur_epoch_store.epoch_terminated().await;
3388
3389 let highest_locally_built_checkpoint_seq = self
3390 .checkpoint_store
3391 .get_latest_locally_computed_checkpoint()?
3392 .map(|c| *c.sequence_number())
3393 .unwrap_or(0);
3394
3395 assert!(
3396 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3397 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3398 );
3399 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3400 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3404 if num_shared_version_assignments > 1 {
3407 debug_fatal!(
3409 "all shared_version_assignments should have been removed \
3410 (num_shared_version_assignments: {num_shared_version_assignments})"
3411 );
3412 }
3413 }
3414
3415 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3421 .await?;
3422 self.get_reconfig_api()
3423 .clear_state_end_of_epoch(&execution_lock);
3424 self.check_system_consistency(
3425 cur_epoch_store,
3426 accumulator,
3427 expensive_safety_check_config,
3428 epoch_supply_change,
3429 )?;
3430 self.get_reconfig_api()
3431 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3432 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3433 if self
3434 .db_checkpoint_config
3435 .perform_db_checkpoints_at_epoch_end
3436 {
3437 let checkpoint_indexes = self
3438 .db_checkpoint_config
3439 .perform_index_db_checkpoints_at_epoch_end
3440 .unwrap_or(false);
3441 let current_epoch = cur_epoch_store.epoch();
3442 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3443 self.checkpoint_all_dbs(
3444 &epoch_checkpoint_path,
3445 cur_epoch_store,
3446 checkpoint_indexes,
3447 )?;
3448 }
3449 }
3450
3451 self.get_reconfig_api()
3452 .reconfigure_cache(&epoch_start_configuration)
3453 .await;
3454
3455 let new_epoch = new_committee.epoch;
3456 let new_epoch_store = self
3457 .reopen_epoch_db(
3458 cur_epoch_store,
3459 new_committee,
3460 epoch_start_configuration,
3461 expensive_safety_check_config,
3462 epoch_last_checkpoint,
3463 )
3464 .await?;
3465 assert_eq!(new_epoch_store.epoch(), new_epoch);
3466 self.transaction_manager.reconfigure(new_epoch);
3467 *execution_lock = new_epoch;
3468 Ok(new_epoch_store)
3472 }
3473
3474 pub async fn reconfigure_for_testing(&self) {
3479 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3480 let epoch_store = self.epoch_store_for_testing().clone();
3481 let protocol_config = epoch_store.protocol_config().clone();
3482 let _guard =
3490 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3491 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3492 self.get_backing_package_store().clone(),
3493 self.get_object_store().clone(),
3494 &self.config.expensive_safety_check_config,
3495 self.checkpoint_store
3496 .get_epoch_last_checkpoint(epoch_store.epoch())
3497 .unwrap()
3498 .map(|c| *c.sequence_number())
3499 .unwrap_or_default(),
3500 );
3501 let new_epoch = new_epoch_store.epoch();
3502 self.transaction_manager.reconfigure(new_epoch);
3503 self.epoch_store.store(new_epoch_store);
3504 epoch_store.epoch_terminated().await;
3505 *execution_lock = new_epoch;
3506 }
3507
3508 #[instrument(level = "error", skip_all)]
3509 fn check_system_consistency(
3510 &self,
3511 cur_epoch_store: &AuthorityPerEpochStore,
3512 accumulator: Arc<StateAccumulator>,
3513 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3514 epoch_supply_change: i64,
3515 ) -> IotaResult<()> {
3516 info!(
3517 "Performing iota conservation consistency check for epoch {}",
3518 cur_epoch_store.epoch()
3519 );
3520
3521 if cfg!(debug_assertions) {
3522 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3523 }
3524
3525 self.get_reconfig_api()
3526 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3527
3528 if expensive_safety_check_config.enable_state_consistency_check() {
3530 info!(
3531 "Performing state consistency check for epoch {}",
3532 cur_epoch_store.epoch()
3533 );
3534 self.expensive_check_is_consistent_state(
3535 accumulator,
3536 cur_epoch_store,
3537 cfg!(debug_assertions), );
3539 }
3540
3541 if expensive_safety_check_config.enable_secondary_index_checks() {
3542 if let Some(indexes) = self.indexes.clone() {
3543 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3544 .expect("secondary indexes are inconsistent");
3545 }
3546 }
3547
3548 Ok(())
3549 }
3550
3551 fn expensive_check_is_consistent_state(
3552 &self,
3553 accumulator: Arc<StateAccumulator>,
3554 cur_epoch_store: &AuthorityPerEpochStore,
3555 panic: bool,
3556 ) {
3557 let live_object_set_hash = accumulator.digest_live_object_set();
3558
3559 let root_state_hash: ECMHLiveObjectSetDigest = self
3560 .get_accumulator_store()
3561 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3562 .expect("Retrieving root state hash cannot fail")
3563 .expect("Root state hash for epoch must exist")
3564 .1
3565 .digest()
3566 .into();
3567
3568 let is_inconsistent = root_state_hash != live_object_set_hash;
3569 if is_inconsistent {
3570 if panic {
3571 panic!(
3572 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3573 );
3574 } else {
3575 error!(
3576 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3577 root_state_hash, live_object_set_hash
3578 );
3579 }
3580 } else {
3581 info!("State consistency check passed");
3582 }
3583
3584 if !panic {
3585 accumulator.set_inconsistent_state(is_inconsistent);
3586 }
3587 }
3588
3589 pub fn current_epoch_for_testing(&self) -> EpochId {
3590 self.epoch_store_for_testing().epoch()
3591 }
3592
3593 #[instrument(level = "error", skip_all)]
3594 pub fn checkpoint_all_dbs(
3595 &self,
3596 checkpoint_path: &Path,
3597 cur_epoch_store: &AuthorityPerEpochStore,
3598 checkpoint_indexes: bool,
3599 ) -> IotaResult {
3600 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3601 let current_epoch = cur_epoch_store.epoch();
3602
3603 if checkpoint_path.exists() {
3604 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3605 return Ok(());
3606 }
3607
3608 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3609 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3610
3611 if checkpoint_path_tmp.exists() {
3612 fs::remove_dir_all(&checkpoint_path_tmp)
3613 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3614 }
3615
3616 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3617 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3618
3619 self.checkpoint_store
3622 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3623
3624 self.get_reconfig_api()
3625 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3626
3627 self.committee_store
3628 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3629
3630 if checkpoint_indexes {
3631 if let Some(indexes) = self.indexes.as_ref() {
3632 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3633 }
3634 if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3635 grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3636 }
3637 }
3638
3639 fs::rename(checkpoint_path_tmp, checkpoint_path)
3640 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3641 Ok(())
3642 }
3643
3644 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3651 self.epoch_store.load()
3652 }
3653
3654 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3656 self.load_epoch_store_one_call_per_task()
3657 }
3658
3659 pub fn clone_committee_for_testing(&self) -> Committee {
3660 Committee::clone(self.epoch_store_for_testing().committee())
3661 }
3662
3663 #[instrument(level = "trace", skip_all)]
3664 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3665 self.get_object_store()
3666 .try_get_object(object_id)
3667 .map_err(Into::into)
3668 }
3669
3670 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3672 self.try_get_object(object_id)
3673 .await
3674 .expect("storage access failed")
3675 }
3676
3677 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3678 Ok(self
3679 .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3680 .await?
3681 .expect("framework object should always exist")
3682 .compute_object_reference())
3683 }
3684
3685 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3687 self.get_object_cache_reader()
3688 .try_get_iota_system_state_object_unsafe()
3689 }
3690
3691 #[instrument(level = "trace", skip_all)]
3692 pub fn get_checkpoint_by_sequence_number(
3693 &self,
3694 sequence_number: CheckpointSequenceNumber,
3695 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3696 Ok(self
3697 .checkpoint_store
3698 .get_checkpoint_by_sequence_number(sequence_number)?)
3699 }
3700
3701 pub async fn wait_for_checkpoint_inclusion(
3708 &self,
3709 digests: &[TransactionDigest],
3710 timeout: Duration,
3711 ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3712 let epoch_store = self.load_epoch_store_one_call_per_task();
3713
3714 let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3717
3718 let results = epoch_store
3719 .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3720 *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3721 self.get_checkpoint_by_sequence_number(seq)
3722 .ok()
3723 .flatten()
3724 .map(|c| c.timestamp_ms)
3725 .unwrap_or(0)
3726 })
3727 })
3728 .await?;
3729
3730 Ok(digests
3731 .iter()
3732 .copied()
3733 .zip(results)
3734 .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3735 .collect())
3736 }
3737
3738 #[instrument(level = "trace", skip_all)]
3739 pub fn get_transaction_checkpoint_for_tests(
3740 &self,
3741 digest: &TransactionDigest,
3742 epoch_store: &AuthorityPerEpochStore,
3743 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3744 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3745 let Some(checkpoint) = checkpoint else {
3746 return Ok(None);
3747 };
3748 let checkpoint = self
3749 .checkpoint_store
3750 .get_checkpoint_by_sequence_number(checkpoint)?;
3751 Ok(checkpoint)
3752 }
3753
3754 #[instrument(level = "trace", skip_all)]
3755 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3756 Ok(
3757 match self
3758 .get_object_cache_reader()
3759 .try_get_latest_object_or_tombstone(*object_id)?
3760 {
3761 Some((_, ObjectOrTombstone::Object(object))) => {
3762 let layout = self.get_object_layout(&object)?;
3763 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3764 }
3765 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3766 None => ObjectRead::NotExists(*object_id),
3767 },
3768 )
3769 }
3770
3771 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3773 self.chain_identifier
3774 }
3775
3776 #[instrument(level = "trace", skip_all)]
3777 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3778 where
3779 T: DeserializeOwned,
3780 {
3781 let o = self.get_object_read(object_id)?.into_object()?;
3782 if let Some(move_object) = o.data.try_as_move() {
3783 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3784 IotaError::ObjectDeserialization {
3785 error: format!("{e}"),
3786 }
3787 })?)
3788 } else {
3789 Err(IotaError::ObjectDeserialization {
3790 error: format!("Provided object : [{object_id}] is not a Move object."),
3791 })
3792 }
3793 }
3794
3795 #[instrument(level = "trace", skip_all)]
3801 pub fn get_past_object_read(
3802 &self,
3803 object_id: &ObjectID,
3804 version: SequenceNumber,
3805 ) -> IotaResult<PastObjectRead> {
3806 let Some(obj_ref) = self
3808 .get_object_cache_reader()
3809 .try_get_latest_object_ref_or_tombstone(*object_id)?
3810 else {
3811 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3812 };
3813
3814 if version > obj_ref.1 {
3815 return Ok(PastObjectRead::VersionTooHigh {
3816 object_id: *object_id,
3817 asked_version: version,
3818 latest_version: obj_ref.1,
3819 });
3820 }
3821
3822 if version < obj_ref.1 {
3823 return Ok(match self.read_object_at_version(object_id, version)? {
3825 Some((object, layout)) => {
3826 let obj_ref = object.compute_object_reference();
3827 PastObjectRead::VersionFound(obj_ref, object, layout)
3828 }
3829
3830 None => PastObjectRead::VersionNotFound(*object_id, version),
3831 });
3832 }
3833
3834 if !obj_ref.2.is_alive() {
3835 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3836 }
3837
3838 match self.read_object_at_version(object_id, obj_ref.1)? {
3839 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3840 None => {
3841 error!(
3842 "Object with in parent_entry is missing from object store, datastore is \
3843 inconsistent",
3844 );
3845 Err(UserInputError::ObjectNotFound {
3846 object_id: *object_id,
3847 version: Some(obj_ref.1),
3848 }
3849 .into())
3850 }
3851 }
3852 }
3853
3854 #[instrument(level = "trace", skip_all)]
3855 fn read_object_at_version(
3856 &self,
3857 object_id: &ObjectID,
3858 version: SequenceNumber,
3859 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3860 let Some(object) = self
3861 .get_object_cache_reader()
3862 .try_get_object_by_key(object_id, version)?
3863 else {
3864 return Ok(None);
3865 };
3866
3867 let layout = self.get_object_layout(&object)?;
3868 Ok(Some((object, layout)))
3869 }
3870
3871 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3872 let layout = object
3873 .data
3874 .try_as_move()
3875 .map(|object| {
3876 into_struct_layout(
3877 self.load_epoch_store_one_call_per_task()
3878 .executor()
3879 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3881 .get_annotated_layout(&object.type_().clone().into())?,
3882 )
3883 })
3884 .transpose()?;
3885 Ok(layout)
3886 }
3887
3888 fn get_owner_at_version(
3889 &self,
3890 object_id: &ObjectID,
3891 version: SequenceNumber,
3892 ) -> IotaResult<Owner> {
3893 self.get_object_store()
3894 .try_get_object_by_key(object_id, version)?
3895 .ok_or_else(|| {
3896 IotaError::from(UserInputError::ObjectNotFound {
3897 object_id: *object_id,
3898 version: Some(version),
3899 })
3900 })
3901 .map(|o| o.owner)
3902 }
3903
3904 #[instrument(level = "trace", skip_all)]
3905 pub fn get_owner_objects(
3906 &self,
3907 owner: IotaAddress,
3908 cursor: Option<ObjectID>,
3910 limit: usize,
3911 filter: Option<IotaObjectDataFilter>,
3912 ) -> IotaResult<Vec<ObjectInfo>> {
3913 if let Some(indexes) = &self.indexes {
3914 indexes.get_owner_objects(owner, cursor, limit, filter)
3915 } else {
3916 Err(IotaError::IndexStoreNotAvailable)
3917 }
3918 }
3919
3920 #[instrument(level = "trace", skip_all)]
3921 pub fn get_owned_coins_iterator_with_cursor(
3922 &self,
3923 owner: IotaAddress,
3924 cursor: (String, ObjectID),
3926 limit: usize,
3927 one_coin_type_only: bool,
3928 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3929 if let Some(indexes) = &self.indexes {
3930 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3931 } else {
3932 Err(IotaError::IndexStoreNotAvailable)
3933 }
3934 }
3935
3936 #[instrument(level = "trace", skip_all)]
3937 pub fn get_owner_objects_iterator(
3938 &self,
3939 owner: IotaAddress,
3940 cursor: Option<ObjectID>,
3942 filter: Option<IotaObjectDataFilter>,
3943 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3944 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3945 if let Some(indexes) = &self.indexes {
3946 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3947 } else {
3948 Err(IotaError::IndexStoreNotAvailable)
3949 }
3950 }
3951
3952 #[instrument(level = "trace", skip_all)]
3953 pub async fn get_move_objects<T>(
3954 &self,
3955 owner: IotaAddress,
3956 type_: MoveObjectType,
3957 ) -> IotaResult<Vec<T>>
3958 where
3959 T: DeserializeOwned,
3960 {
3961 let object_ids = self
3962 .get_owner_objects_iterator(owner, None, None)?
3963 .filter(|o| match &o.type_ {
3964 ObjectType::Struct(s) => &type_ == s,
3965 ObjectType::Package => false,
3966 })
3967 .map(|info| ObjectKey(info.object_id, info.version))
3968 .collect::<Vec<_>>();
3969 let mut move_objects = vec![];
3970
3971 let objects = self
3972 .get_object_store()
3973 .try_multi_get_objects_by_key(&object_ids)?;
3974
3975 for (o, id) in objects.into_iter().zip(object_ids) {
3976 let object = o.ok_or_else(|| {
3977 IotaError::from(UserInputError::ObjectNotFound {
3978 object_id: id.0,
3979 version: Some(id.1),
3980 })
3981 })?;
3982 let move_object = object.data.try_as_move().ok_or_else(|| {
3983 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3984 })?;
3985 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3986 IotaError::ObjectDeserialization {
3987 error: format!("{e}"),
3988 }
3989 })?);
3990 }
3991 Ok(move_objects)
3992 }
3993
3994 #[instrument(level = "trace", skip_all)]
3995 pub fn get_dynamic_fields(
3996 &self,
3997 owner: ObjectID,
3998 cursor: Option<ObjectID>,
4000 limit: usize,
4001 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4002 Ok(self
4003 .get_dynamic_fields_iterator(owner, cursor)?
4004 .take(limit)
4005 .collect::<Result<Vec<_>, _>>()?)
4006 }
4007
4008 fn get_dynamic_fields_iterator(
4009 &self,
4010 owner: ObjectID,
4011 cursor: Option<ObjectID>,
4013 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4014 {
4015 if let Some(indexes) = &self.indexes {
4016 indexes.get_dynamic_fields_iterator(owner, cursor)
4017 } else {
4018 Err(IotaError::IndexStoreNotAvailable)
4019 }
4020 }
4021
4022 #[instrument(level = "trace", skip_all)]
4023 pub fn get_dynamic_field_object_id(
4024 &self,
4025 owner: ObjectID,
4026 name_type: TypeTag,
4027 name_bcs_bytes: &[u8],
4028 ) -> IotaResult<Option<ObjectID>> {
4029 if let Some(indexes) = &self.indexes {
4030 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4031 } else {
4032 Err(IotaError::IndexStoreNotAvailable)
4033 }
4034 }
4035
4036 #[instrument(level = "trace", skip_all)]
4037 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4038 Ok(self.get_indexes()?.next_sequence_number())
4039 }
4040
4041 #[instrument(level = "trace", skip_all)]
4042 pub async fn get_executed_transaction_and_effects(
4043 &self,
4044 digest: TransactionDigest,
4045 kv_store: Arc<TransactionKeyValueStore>,
4046 ) -> IotaResult<(Transaction, TransactionEffects)> {
4047 let transaction = kv_store.get_tx(digest).await?;
4048 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4049 Ok((transaction, effects))
4050 }
4051
4052 #[instrument(level = "trace", skip_all)]
4053 pub fn multi_get_checkpoint_by_sequence_number(
4054 &self,
4055 sequence_numbers: &[CheckpointSequenceNumber],
4056 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4057 Ok(self
4058 .checkpoint_store
4059 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4060 }
4061
4062 #[instrument(level = "trace", skip_all)]
4063 pub fn get_transaction_events(
4064 &self,
4065 digest: &TransactionDigest,
4066 ) -> IotaResult<TransactionEvents> {
4067 self.get_transaction_cache_reader()
4068 .try_get_events(digest)?
4069 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4070 }
4071
4072 pub fn get_transaction_input_objects(
4073 &self,
4074 effects: &TransactionEffects,
4075 ) -> anyhow::Result<Vec<Object>> {
4076 iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4077 .map_err(Into::into)
4078 }
4079
4080 pub fn get_transaction_output_objects(
4081 &self,
4082 effects: &TransactionEffects,
4083 ) -> anyhow::Result<Vec<Object>> {
4084 iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4085 .map_err(Into::into)
4086 }
4087
4088 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4089 match &self.indexes {
4090 Some(i) => Ok(i.clone()),
4091 None => Err(IotaError::UnsupportedFeature {
4092 error: "extended object indexing is not enabled on this server".into(),
4093 }),
4094 }
4095 }
4096
4097 pub async fn get_transactions_for_tests(
4098 self: &Arc<Self>,
4099 filter: Option<TransactionFilter>,
4100 cursor: Option<TransactionDigest>,
4101 limit: Option<usize>,
4102 reverse: bool,
4103 ) -> IotaResult<Vec<TransactionDigest>> {
4104 let metrics = KeyValueStoreMetrics::new_for_tests();
4105 let kv_store = Arc::new(TransactionKeyValueStore::new(
4106 "rocksdb",
4107 metrics,
4108 self.clone(),
4109 ));
4110 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4111 .await
4112 }
4113
4114 #[instrument(level = "trace", skip_all)]
4115 pub async fn get_transactions(
4116 &self,
4117 kv_store: &Arc<TransactionKeyValueStore>,
4118 filter: Option<TransactionFilter>,
4119 cursor: Option<TransactionDigest>,
4121 limit: Option<usize>,
4122 reverse: bool,
4123 ) -> IotaResult<Vec<TransactionDigest>> {
4124 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4125 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4126 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4127 if reverse {
4128 let iter = iter
4129 .rev()
4130 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4131 .skip(usize::from(cursor.is_some()));
4132 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4133 } else {
4134 let iter = iter
4135 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4136 .skip(usize::from(cursor.is_some()));
4137 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4138 }
4139 }
4140 self.get_indexes()?
4141 .get_transactions(filter, cursor, limit, reverse)
4142 }
4143
4144 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4145 &self.checkpoint_store
4146 }
4147
4148 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4149 self.get_checkpoint_store()
4150 .get_highest_executed_checkpoint_seq_number()?
4151 .ok_or(IotaError::UserInput {
4152 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4153 })
4154 }
4155
4156 #[cfg(msim)]
4157 pub fn get_highest_pruned_checkpoint_for_testing(
4158 &self,
4159 ) -> IotaResult<CheckpointSequenceNumber> {
4160 self.database_for_testing()
4161 .perpetual_tables
4162 .get_highest_pruned_checkpoint()
4163 .map(|c| c.unwrap_or(0))
4164 .map_err(Into::into)
4165 }
4166
4167 #[instrument(level = "trace", skip_all)]
4168 pub fn get_checkpoint_summary_by_sequence_number(
4169 &self,
4170 sequence_number: CheckpointSequenceNumber,
4171 ) -> IotaResult<CheckpointSummary> {
4172 let verified_checkpoint = self
4173 .get_checkpoint_store()
4174 .get_checkpoint_by_sequence_number(sequence_number)?;
4175 match verified_checkpoint {
4176 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4177 None => Err(IotaError::UserInput {
4178 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4179 }),
4180 }
4181 }
4182
4183 #[instrument(level = "trace", skip_all)]
4184 pub fn get_checkpoint_summary_by_digest(
4185 &self,
4186 digest: CheckpointDigest,
4187 ) -> IotaResult<CheckpointSummary> {
4188 let verified_checkpoint = self
4189 .get_checkpoint_store()
4190 .get_checkpoint_by_digest(&digest)?;
4191 match verified_checkpoint {
4192 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4193 None => Err(IotaError::UserInput {
4194 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4195 }),
4196 }
4197 }
4198
4199 #[instrument(level = "trace", skip_all)]
4200 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4201 if is_system_package(package_id) {
4202 return self.find_genesis_txn_digest();
4203 }
4204 Ok(self
4205 .get_object_read(&package_id)?
4206 .into_object()?
4207 .previous_transaction)
4208 }
4209
4210 #[instrument(level = "trace", skip_all)]
4211 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4212 let summary = self
4213 .get_verified_checkpoint_by_sequence_number(0)?
4214 .into_message();
4215 let content = self.get_checkpoint_contents(summary.content_digest)?;
4216 let genesis_transaction = content.enumerate_transactions(&summary).next();
4217 Ok(genesis_transaction
4218 .ok_or(IotaError::UserInput {
4219 error: UserInputError::GenesisTransactionNotFound,
4220 })?
4221 .1
4222 .transaction)
4223 }
4224
4225 #[instrument(level = "trace", skip_all)]
4226 pub fn get_verified_checkpoint_by_sequence_number(
4227 &self,
4228 sequence_number: CheckpointSequenceNumber,
4229 ) -> IotaResult<VerifiedCheckpoint> {
4230 let verified_checkpoint = self
4231 .get_checkpoint_store()
4232 .get_checkpoint_by_sequence_number(sequence_number)?;
4233 match verified_checkpoint {
4234 Some(verified_checkpoint) => Ok(verified_checkpoint),
4235 None => Err(IotaError::UserInput {
4236 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4237 }),
4238 }
4239 }
4240
4241 #[instrument(level = "trace", skip_all)]
4242 pub fn get_verified_checkpoint_summary_by_digest(
4243 &self,
4244 digest: CheckpointDigest,
4245 ) -> IotaResult<VerifiedCheckpoint> {
4246 let verified_checkpoint = self
4247 .get_checkpoint_store()
4248 .get_checkpoint_by_digest(&digest)?;
4249 match verified_checkpoint {
4250 Some(verified_checkpoint) => Ok(verified_checkpoint),
4251 None => Err(IotaError::UserInput {
4252 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4253 }),
4254 }
4255 }
4256
4257 #[instrument(level = "trace", skip_all)]
4258 pub fn get_checkpoint_contents(
4259 &self,
4260 digest: CheckpointContentsDigest,
4261 ) -> IotaResult<CheckpointContents> {
4262 self.get_checkpoint_store()
4263 .get_checkpoint_contents(&digest)?
4264 .ok_or(IotaError::UserInput {
4265 error: UserInputError::CheckpointContentsNotFound(digest),
4266 })
4267 }
4268
4269 #[instrument(level = "trace", skip_all)]
4270 pub fn get_checkpoint_contents_by_sequence_number(
4271 &self,
4272 sequence_number: CheckpointSequenceNumber,
4273 ) -> IotaResult<CheckpointContents> {
4274 let verified_checkpoint = self
4275 .get_checkpoint_store()
4276 .get_checkpoint_by_sequence_number(sequence_number)?;
4277 match verified_checkpoint {
4278 Some(verified_checkpoint) => {
4279 let content_digest = verified_checkpoint.into_inner().content_digest;
4280 self.get_checkpoint_contents(content_digest)
4281 }
4282 None => Err(IotaError::UserInput {
4283 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4284 }),
4285 }
4286 }
4287
4288 #[instrument(level = "trace", skip_all)]
4289 pub async fn query_events(
4290 &self,
4291 kv_store: &Arc<TransactionKeyValueStore>,
4292 query: EventFilter,
4293 cursor: Option<EventID>,
4295 limit: usize,
4296 descending: bool,
4297 ) -> IotaResult<Vec<IotaEvent>> {
4298 let index_store = self.get_indexes()?;
4299
4300 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4302 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4303 IotaError::TransactionNotFound {
4304 digest: cursor.tx_digest,
4305 },
4306 )?;
4307 (tx_seq, cursor.event_seq as usize)
4308 } else if descending {
4309 (u64::MAX, usize::MAX)
4310 } else {
4311 (0, 0)
4312 };
4313
4314 let limit = limit + 1;
4315 let mut event_keys = match query {
4316 EventFilter::All(filters) => {
4317 if filters.is_empty() {
4318 index_store.all_events(tx_num, event_num, limit, descending)?
4319 } else {
4320 return Err(IotaError::UserInput {
4321 error: UserInputError::Unsupported(
4322 "This query type does not currently support filter combinations"
4323 .to_string(),
4324 ),
4325 });
4326 }
4327 }
4328 EventFilter::Transaction(digest) => {
4329 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4330 }
4331 EventFilter::MoveModule { package, module } => {
4332 let module_id = ModuleId::new(package.into(), module);
4333 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4334 }
4335 EventFilter::MoveEventType(struct_name) => index_store
4336 .events_by_move_event_struct_name(
4337 &struct_name,
4338 tx_num,
4339 event_num,
4340 limit,
4341 descending,
4342 )?,
4343 EventFilter::Sender(sender) => {
4344 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4345 }
4346 EventFilter::TimeRange {
4347 start_time,
4348 end_time,
4349 } => index_store
4350 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4351 EventFilter::MoveEventModule { package, module } => index_store
4352 .events_by_move_event_module(
4353 &ModuleId::new(package.into(), module),
4354 tx_num,
4355 event_num,
4356 limit,
4357 descending,
4358 )?,
4359 EventFilter::Package(_)
4361 | EventFilter::MoveEventField { .. }
4362 | EventFilter::Any(_)
4363 | EventFilter::And(_, _)
4364 | EventFilter::Or(_, _) => {
4365 return Err(IotaError::UserInput {
4366 error: UserInputError::Unsupported(
4367 "This query type is not supported by the full node.".to_string(),
4368 ),
4369 });
4370 }
4371 };
4372
4373 if cursor.is_some() {
4376 if !event_keys.is_empty() {
4377 event_keys.remove(0);
4378 }
4379 } else {
4380 event_keys.truncate(limit - 1);
4381 }
4382
4383 let transaction_digests = event_keys
4385 .iter()
4386 .map(|(_, digest, _, _)| *digest)
4387 .collect::<HashSet<_>>()
4388 .into_iter()
4389 .collect::<Vec<_>>();
4390
4391 let events = kv_store
4392 .multi_get_events_by_tx_digests(&transaction_digests)
4393 .await?;
4394
4395 let events_map: HashMap<_, _> =
4396 transaction_digests.iter().zip(events.into_iter()).collect();
4397
4398 let stored_events = event_keys
4399 .into_iter()
4400 .map(|k| {
4401 (
4402 k,
4403 events_map
4404 .get(&k.1)
4405 .expect("fetched digest is missing")
4406 .clone()
4407 .and_then(|e| e.data.get(k.2).cloned()),
4408 )
4409 })
4410 .map(
4411 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4412 event
4413 .map(|e| (e, tx_digest, event_seq, timestamp))
4414 .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4415 },
4416 )
4417 .collect::<Result<Vec<_>, _>>()?;
4418
4419 let epoch_store = self.load_epoch_store_one_call_per_task();
4420 let backing_store = self.get_backing_package_store().as_ref();
4421 let mut layout_resolver = epoch_store
4422 .executor()
4423 .type_layout_resolver(Box::new(backing_store));
4424 let mut events = vec![];
4425 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4426 events.push(IotaEvent::try_from(
4427 e.clone(),
4428 tx_digest,
4429 event_seq as u64,
4430 Some(timestamp),
4431 layout_resolver.get_annotated_layout(&e.type_)?,
4432 )?)
4433 }
4434 Ok(events)
4435 }
4436
4437 pub async fn insert_genesis_object(&self, object: Object) {
4438 self.get_reconfig_api()
4439 .try_insert_genesis_object(object)
4440 .expect("Cannot insert genesis object")
4441 }
4442
4443 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4444 futures::future::join_all(
4445 objects
4446 .iter()
4447 .map(|o| self.insert_genesis_object(o.clone())),
4448 )
4449 .await;
4450 }
4451
4452 #[instrument(level = "trace", skip_all)]
4454 pub fn get_transaction_status(
4455 &self,
4456 transaction_digest: &TransactionDigest,
4457 epoch_store: &Arc<AuthorityPerEpochStore>,
4458 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4459 if let Some(effects) =
4461 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4462 {
4463 if let Some(transaction) = self
4464 .get_transaction_cache_reader()
4465 .try_get_transaction_block(transaction_digest)?
4466 {
4467 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4468 let events = if effects.events_digest().is_some() {
4469 self.get_transaction_events(effects.transaction_digest())?
4470 } else {
4471 TransactionEvents::default()
4472 };
4473 return Ok(Some((
4474 (*transaction).clone().into_message(),
4475 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4476 )));
4477 } else {
4478 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4483 }
4484 }
4485 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4486 self.metrics.tx_already_processed.inc();
4487 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4488 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4489 } else {
4490 Ok(None)
4491 }
4492 }
4493
4494 #[instrument(level = "trace", skip_all)]
4498 pub fn get_signed_effects_and_maybe_resign(
4499 &self,
4500 transaction_digest: &TransactionDigest,
4501 epoch_store: &Arc<AuthorityPerEpochStore>,
4502 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4503 let effects = self
4504 .get_transaction_cache_reader()
4505 .try_get_executed_effects(transaction_digest)?;
4506 match effects {
4507 Some(effects) => {
4508 if effects.executed_epoch() != epoch_store.epoch() {
4531 debug!(
4532 tx_digest=?transaction_digest,
4533 effects_epoch=?effects.executed_epoch(),
4534 epoch=?epoch_store.epoch(),
4535 "Re-signing the effects with the current epoch"
4536 );
4537 }
4538 Ok(Some(self.sign_effects(effects, epoch_store)?))
4539 }
4540 None => Ok(None),
4541 }
4542 }
4543
4544 #[instrument(level = "trace", skip_all)]
4545 pub(crate) fn sign_effects(
4546 &self,
4547 effects: TransactionEffects,
4548 epoch_store: &Arc<AuthorityPerEpochStore>,
4549 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4550 let tx_digest = *effects.transaction_digest();
4551 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4552 Some(sig) => {
4553 debug_assert!(sig.epoch == epoch_store.epoch());
4554 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4555 }
4556 _ => {
4557 let sig = AuthoritySignInfo::new(
4558 epoch_store.epoch(),
4559 &effects,
4560 Intent::iota_app(IntentScope::TransactionEffects),
4561 self.name,
4562 &*self.secret,
4563 );
4564
4565 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4566
4567 epoch_store.insert_effects_digest_and_signature(
4568 &tx_digest,
4569 effects.digest(),
4570 &sig,
4571 )?;
4572
4573 effects
4574 }
4575 };
4576
4577 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4578 signed_effects,
4579 ))
4580 }
4581
4582 #[instrument(level = "trace", skip_all)]
4584 fn fullnode_only_get_tx_coins_for_indexing(
4585 &self,
4586 effects: &TransactionEffects,
4587 inner_temporary_store: &InnerTemporaryStore,
4588 epoch_store: &Arc<AuthorityPerEpochStore>,
4589 ) -> Option<TxCoins> {
4590 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4591 return None;
4592 }
4593 let written_coin_objects = inner_temporary_store
4594 .written
4595 .iter()
4596 .filter_map(|(k, v)| {
4597 if v.is_coin() {
4598 Some((*k, v.clone()))
4599 } else {
4600 None
4601 }
4602 })
4603 .collect();
4604 let mut input_coin_objects = inner_temporary_store
4605 .input_objects
4606 .iter()
4607 .filter_map(|(k, v)| {
4608 if v.is_coin() {
4609 Some((*k, v.clone()))
4610 } else {
4611 None
4612 }
4613 })
4614 .collect::<ObjectMap>();
4615
4616 for (object_id, version) in effects.modified_at_versions() {
4621 if inner_temporary_store
4622 .loaded_runtime_objects
4623 .contains_key(&object_id)
4624 {
4625 if let Some(object) = self
4626 .get_object_store()
4627 .get_object_by_key(&object_id, version)
4628 {
4629 if object.is_coin() {
4630 input_coin_objects.insert(object_id, object);
4631 }
4632 }
4633 }
4634 }
4635
4636 Some((input_coin_objects, written_coin_objects))
4637 }
4638
4639 #[instrument(level = "trace", skip_all)]
4651 pub async fn get_transaction_lock(
4652 &self,
4653 object_ref: &ObjectRef,
4654 epoch_store: &AuthorityPerEpochStore,
4655 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4656 let lock_info = self
4657 .get_object_cache_reader()
4658 .try_get_lock(*object_ref, epoch_store)?;
4659 let lock_info = match lock_info {
4660 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4661 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4662 provided_obj_ref: *object_ref,
4663 current_version: locked_ref.1,
4664 }
4665 .into());
4666 }
4667 ObjectLockStatus::Initialized => {
4668 return Ok(None);
4669 }
4670 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4671 };
4672
4673 epoch_store.get_signed_transaction(&lock_info)
4674 }
4675
4676 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4677 self.get_object_cache_reader().try_get_objects(objects)
4678 }
4679
4680 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4682 self.try_get_objects(objects)
4683 .await
4684 .expect("storage access failed")
4685 }
4686
4687 pub async fn try_get_object_or_tombstone(
4688 &self,
4689 object_id: ObjectID,
4690 ) -> IotaResult<Option<ObjectRef>> {
4691 self.get_object_cache_reader()
4692 .try_get_latest_object_ref_or_tombstone(object_id)
4693 }
4694
4695 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4697 self.try_get_object_or_tombstone(object_id)
4698 .await
4699 .expect("storage access failed")
4700 }
4701
4702 pub fn set_override_protocol_upgrade_buffer_stake(
4712 &self,
4713 expected_epoch: EpochId,
4714 buffer_stake_bps: u64,
4715 ) -> IotaResult {
4716 let epoch_store = self.load_epoch_store_one_call_per_task();
4717 let actual_epoch = epoch_store.epoch();
4718 if actual_epoch != expected_epoch {
4719 return Err(IotaError::WrongEpoch {
4720 expected_epoch,
4721 actual_epoch,
4722 });
4723 }
4724
4725 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4726 }
4727
4728 pub fn clear_override_protocol_upgrade_buffer_stake(
4729 &self,
4730 expected_epoch: EpochId,
4731 ) -> IotaResult {
4732 let epoch_store = self.load_epoch_store_one_call_per_task();
4733 let actual_epoch = epoch_store.epoch();
4734 if actual_epoch != expected_epoch {
4735 return Err(IotaError::WrongEpoch {
4736 expected_epoch,
4737 actual_epoch,
4738 });
4739 }
4740
4741 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4742 }
4743
4744 pub async fn get_available_system_packages(
4748 &self,
4749 binary_config: &BinaryConfig,
4750 ) -> Vec<ObjectRef> {
4751 let mut results = vec![];
4752
4753 let system_packages = BuiltInFramework::iter_system_packages();
4754
4755 #[cfg(msim)]
4757 let extra_packages = framework_injection::get_extra_packages(self.name);
4758 #[cfg(msim)]
4759 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4760
4761 for system_package in system_packages {
4762 let modules = system_package.modules().to_vec();
4763 #[cfg(msim)]
4765 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4766 .unwrap_or(modules);
4767
4768 let Some(obj_ref) = iota_framework::compare_system_package(
4769 &self.get_object_store(),
4770 &system_package.id,
4771 &modules,
4772 system_package.dependencies.to_vec(),
4773 binary_config,
4774 )
4775 .await
4776 else {
4777 return vec![];
4778 };
4779 results.push(obj_ref);
4780 }
4781
4782 results
4783 }
4784
4785 async fn get_system_package_bytes(
4802 &self,
4803 system_packages: Vec<ObjectRef>,
4804 binary_config: &BinaryConfig,
4805 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4806 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4807 let objects = self.get_objects(&ids).await;
4808
4809 let mut res = Vec::with_capacity(system_packages.len());
4810 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4811 let prev_transaction = match object {
4812 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4813 info!("Framework {} does not need updating", system_package_ref.0);
4815 continue;
4816 }
4817
4818 Some(cur_object) => cur_object.previous_transaction,
4819 None => TransactionDigest::genesis_marker(),
4820 };
4821
4822 #[cfg(msim)]
4823 let SystemPackage {
4824 id: _,
4825 bytes,
4826 dependencies,
4827 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4828 .unwrap_or_else(|| {
4829 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4830 });
4831
4832 #[cfg(not(msim))]
4833 let SystemPackage {
4834 id: _,
4835 bytes,
4836 dependencies,
4837 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4838
4839 let modules: Vec<_> = bytes
4840 .iter()
4841 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4842 .collect();
4843
4844 let new_object = Object::new_system_package(
4845 &modules,
4846 system_package_ref.1,
4847 dependencies.clone(),
4848 prev_transaction,
4849 );
4850
4851 let new_ref = new_object.compute_object_reference();
4852 if new_ref != system_package_ref {
4853 error!(
4854 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4855 );
4856 return None;
4857 }
4858
4859 res.push((system_package_ref.1, bytes, dependencies));
4860 }
4861
4862 Some(res)
4863 }
4864
4865 fn is_protocol_version_supported_v1(
4869 proposed_protocol_version: ProtocolVersion,
4870 committee: &Committee,
4871 capabilities: Vec<AuthorityCapabilitiesV1>,
4872 mut buffer_stake_bps: u64,
4873 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4874 if buffer_stake_bps > 10000 {
4875 warn!("clamping buffer_stake_bps to 10000");
4876 buffer_stake_bps = 10000;
4877 }
4878
4879 let mut desired_upgrades: Vec<_> = capabilities
4882 .into_iter()
4883 .filter_map(|mut cap| {
4884 if cap.available_system_packages.is_empty() {
4886 return None;
4887 }
4888
4889 cap.available_system_packages.sort();
4890
4891 info!(
4892 "validator {:?} supports {:?} with system packages: {:?}",
4893 cap.authority.concise(),
4894 cap.supported_protocol_versions,
4895 cap.available_system_packages,
4896 );
4897
4898 cap.supported_protocol_versions
4902 .get_version_digest(proposed_protocol_version)
4903 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4904 })
4905 .collect();
4906
4907 desired_upgrades.sort();
4910 desired_upgrades
4911 .into_iter()
4912 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4913 .into_iter()
4914 .find_map(|((digest, packages), group)| {
4915 assert!(!packages.is_empty());
4917
4918 let mut stake_aggregator: StakeAggregator<(), true> =
4919 StakeAggregator::new(Arc::new(committee.clone()));
4920
4921 for (_, _, authority) in group {
4922 stake_aggregator.insert_generic(authority, ());
4923 }
4924
4925 let total_votes = stake_aggregator.total_votes();
4926 let quorum_threshold = committee.quorum_threshold();
4927 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4928
4929 info!(
4930 protocol_config_digest = ?digest,
4931 ?total_votes,
4932 ?quorum_threshold,
4933 ?buffer_stake_bps,
4934 ?effective_threshold,
4935 ?proposed_protocol_version,
4936 ?packages,
4937 "support for upgrade"
4938 );
4939
4940 let has_support = total_votes >= effective_threshold;
4941 has_support.then_some((proposed_protocol_version, digest, packages))
4942 })
4943 }
4944
4945 fn choose_protocol_version_and_system_packages_v1(
4949 current_protocol_version: ProtocolVersion,
4950 current_protocol_digest: Digest,
4951 committee: &Committee,
4952 capabilities: Vec<AuthorityCapabilitiesV1>,
4953 buffer_stake_bps: u64,
4954 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4955 let mut next_protocol_version = current_protocol_version;
4956 let mut system_packages = vec![];
4957 let mut protocol_version_digest = current_protocol_digest;
4958
4959 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4963 next_protocol_version + 1,
4964 committee,
4965 capabilities.clone(),
4966 buffer_stake_bps,
4967 ) {
4968 next_protocol_version = version;
4969 protocol_version_digest = digest;
4970 system_packages = packages;
4971 }
4972
4973 (
4974 next_protocol_version,
4975 protocol_version_digest,
4976 system_packages,
4977 )
4978 }
4979
4980 fn get_validators_supporting_protocol_version(
4985 target_protocol_version: ProtocolVersion,
4986 target_digest: Digest,
4987 active_validators: &[AuthorityPublicKey],
4988 capabilities: &[AuthorityCapabilitiesV1],
4989 ) -> Vec<u64> {
4990 let mut eligible_validators = Vec::new();
4991
4992 for capability in capabilities {
4993 if let Some(digest) = capability
4995 .supported_protocol_versions
4996 .get_version_digest(target_protocol_version)
4997 {
4998 if digest == target_digest {
4999 if let Some(index) = active_validators
5001 .iter()
5002 .position(|name| AuthorityName::from(name) == capability.authority)
5003 {
5004 eligible_validators.push(index as u64);
5005 }
5006 }
5007 }
5008 }
5009
5010 eligible_validators.sort();
5012 eligible_validators
5013 }
5014
5015 fn calculate_eligible_validators_weight(
5020 eligible_validator_indices: &[u64],
5021 active_validators: &[AuthorityPublicKey],
5022 committee: &Committee,
5023 ) -> u64 {
5024 let mut total_weight = 0u64;
5025
5026 for &index in eligible_validator_indices {
5027 let authority_pubkey = &active_validators[index as usize];
5028 if let Some((_, weight)) = committee
5030 .members()
5031 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5032 {
5033 total_weight += weight;
5034 }
5035 }
5036
5037 total_weight
5038 }
5039
5040 #[instrument(level = "debug", skip_all)]
5041 fn create_authenticator_state_tx(
5042 &self,
5043 epoch_store: &Arc<AuthorityPerEpochStore>,
5044 ) -> Option<EndOfEpochTransactionKind> {
5045 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5046 info!("authenticator state transactions not enabled");
5047 return None;
5048 }
5049
5050 let authenticator_state_exists = epoch_store.authenticator_state_exists();
5051 let tx = if authenticator_state_exists {
5052 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5053 let min_epoch =
5054 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5055 let authenticator_obj_initial_shared_version = epoch_store
5056 .epoch_start_config()
5057 .authenticator_obj_initial_shared_version()
5058 .expect("initial version must exist");
5059
5060 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5061 min_epoch,
5062 authenticator_obj_initial_shared_version,
5063 );
5064
5065 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5066
5067 tx
5068 } else {
5069 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5070 info!("Creating AuthenticatorStateCreate tx");
5071 tx
5072 };
5073 Some(tx)
5074 }
5075
5076 #[instrument(level = "error", skip_all)]
5089 pub async fn create_and_execute_advance_epoch_tx(
5090 &self,
5091 epoch_store: &Arc<AuthorityPerEpochStore>,
5092 gas_cost_summary: &GasCostSummary,
5093 checkpoint: CheckpointSequenceNumber,
5094 epoch_start_timestamp_ms: CheckpointTimestamp,
5095 scores: Vec<u64>,
5096 ) -> anyhow::Result<(
5097 IotaSystemState,
5098 Option<SystemEpochInfoEvent>,
5099 TransactionEffects,
5100 )> {
5101 let mut txns = Vec::new();
5102
5103 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
5104 txns.push(tx);
5105 }
5106
5107 let next_epoch = epoch_store.epoch() + 1;
5108
5109 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5110 let authority_capabilities = epoch_store
5111 .get_capabilities_v1()
5112 .expect("read capabilities from db cannot fail");
5113 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5114 Self::choose_protocol_version_and_system_packages_v1(
5115 epoch_store.protocol_version(),
5116 SupportedProtocolVersionsWithHashes::protocol_config_digest(
5117 epoch_store.protocol_config(),
5118 ),
5119 epoch_store.committee(),
5120 authority_capabilities.clone(),
5121 buffer_stake_bps,
5122 );
5123
5124 let config = epoch_store.protocol_config();
5128 let binary_config = to_binary_config(config);
5129 let Some(next_epoch_system_package_bytes) = self
5130 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5131 .await
5132 else {
5133 error!(
5134 "upgraded system packages {:?} are not locally available, cannot create \
5135 ChangeEpochTx. validator binary must be upgraded to the correct version!",
5136 next_epoch_system_packages
5137 );
5138 bail!("missing system packages: cannot form ChangeEpochTx");
5148 };
5149
5150 if config.select_committee_from_eligible_validators() {
5153 let active_validators = epoch_store.epoch_start_state().get_active_validators();
5155
5156 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5157
5158 if config.select_committee_supporting_next_epoch_version() {
5162 eligible_active_validators = Self::get_validators_supporting_protocol_version(
5163 next_epoch_protocol_version,
5164 next_epoch_protocol_digest,
5165 &active_validators,
5166 &authority_capabilities,
5167 );
5168
5169 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5171 &eligible_active_validators,
5172 &active_validators,
5173 epoch_store.committee(),
5174 );
5175
5176 let committee = epoch_store.committee();
5180 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5181
5182 if eligible_validators_weight < effective_threshold {
5183 error!(
5184 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5185 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5186 );
5187 eligible_active_validators = (0..active_validators.len() as u64).collect();
5190 }
5191 }
5192
5193 if config.pass_validator_scores_to_advance_epoch() {
5196 txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5197 next_epoch,
5198 next_epoch_protocol_version,
5199 gas_cost_summary.storage_cost,
5200 gas_cost_summary.computation_cost,
5201 gas_cost_summary.computation_cost_burned,
5202 gas_cost_summary.storage_rebate,
5203 gas_cost_summary.non_refundable_storage_fee,
5204 epoch_start_timestamp_ms,
5205 next_epoch_system_package_bytes,
5206 eligible_active_validators,
5207 scores,
5208 config.adjust_rewards_by_score(),
5209 ));
5210 } else {
5211 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5212 next_epoch,
5213 next_epoch_protocol_version,
5214 gas_cost_summary.storage_cost,
5215 gas_cost_summary.computation_cost,
5216 gas_cost_summary.computation_cost_burned,
5217 gas_cost_summary.storage_rebate,
5218 gas_cost_summary.non_refundable_storage_fee,
5219 epoch_start_timestamp_ms,
5220 next_epoch_system_package_bytes,
5221 eligible_active_validators,
5222 ));
5223 }
5224 } else if config.protocol_defined_base_fee()
5225 && config.max_committee_members_count_as_option().is_some()
5226 {
5227 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5228 next_epoch,
5229 next_epoch_protocol_version,
5230 gas_cost_summary.storage_cost,
5231 gas_cost_summary.computation_cost,
5232 gas_cost_summary.computation_cost_burned,
5233 gas_cost_summary.storage_rebate,
5234 gas_cost_summary.non_refundable_storage_fee,
5235 epoch_start_timestamp_ms,
5236 next_epoch_system_package_bytes,
5237 ));
5238 } else {
5239 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5240 next_epoch,
5241 next_epoch_protocol_version,
5242 gas_cost_summary.storage_cost,
5243 gas_cost_summary.computation_cost,
5244 gas_cost_summary.storage_rebate,
5245 gas_cost_summary.non_refundable_storage_fee,
5246 epoch_start_timestamp_ms,
5247 next_epoch_system_package_bytes,
5248 ));
5249 }
5250
5251 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5252
5253 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5254 tx.clone(),
5255 epoch_store.epoch(),
5256 checkpoint,
5257 );
5258
5259 let tx_digest = executable_tx.digest();
5260
5261 info!(
5262 ?next_epoch,
5263 ?next_epoch_protocol_version,
5264 ?next_epoch_system_packages,
5265 computation_cost=?gas_cost_summary.computation_cost,
5266 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5267 storage_cost=?gas_cost_summary.storage_cost,
5268 storage_rebate=?gas_cost_summary.storage_rebate,
5269 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5270 ?tx_digest,
5271 "Creating advance epoch transaction"
5272 );
5273
5274 fail_point_async!("change_epoch_tx_delay");
5275 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5276
5277 if self
5281 .get_transaction_cache_reader()
5282 .try_is_tx_already_executed(tx_digest)?
5283 {
5284 warn!("change epoch tx has already been executed via state sync");
5285 bail!("change epoch tx has already been executed via state sync",);
5286 }
5287
5288 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5289
5290 epoch_store.assign_shared_object_versions_idempotent(
5294 self.get_object_cache_reader().as_ref(),
5295 std::slice::from_ref(&executable_tx),
5296 )?;
5297
5298 let (input_objects, _) =
5299 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5300
5301 let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5302 &execution_guard,
5303 &executable_tx,
5304 input_objects,
5305 vec![],
5306 epoch_store,
5307 )?;
5308 let system_obj = get_iota_system_state(&temporary_store.written)
5309 .expect("change epoch tx must write to system object");
5310 let system_epoch_info_event = temporary_store
5312 .events
5313 .data
5314 .into_iter()
5315 .find(|event| event.is_system_epoch_info_event())
5316 .map(SystemEpochInfoEvent::from);
5317 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5320
5321 self.get_state_sync_store()
5325 .try_insert_transaction_and_effects(&tx, &effects)
5326 .map_err(|err| {
5327 let err: anyhow::Error = err.into();
5328 err
5329 })?;
5330
5331 info!(
5332 "Effects summary of the change epoch transaction: {:?}",
5333 effects.summary_for_debug()
5334 );
5335 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5336 assert!(effects.status().is_ok());
5338 Ok((system_obj, system_epoch_info_event, effects))
5339 }
5340
5341 #[instrument(level = "error", skip_all)]
5345 async fn revert_uncommitted_epoch_transactions(
5346 &self,
5347 epoch_store: &AuthorityPerEpochStore,
5348 ) -> IotaResult {
5349 {
5350 let state = epoch_store.get_reconfig_state_write_lock_guard();
5351 if state.should_accept_user_certs() {
5352 epoch_store.close_user_certs(state);
5361 }
5362 }
5364 let pending_certificates = epoch_store.pending_consensus_certificates();
5365 info!(
5366 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5367 pending_certificates.len(),
5368 pending_certificates,
5369 );
5370 for digest in pending_certificates {
5371 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5372 info!(
5373 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5374 digest
5375 );
5376 continue;
5377 }
5378 info!("Reverting {:?} at the end of epoch", digest);
5379 epoch_store.revert_executed_transaction(&digest)?;
5380 self.get_reconfig_api().try_revert_state_update(&digest)?;
5381 }
5382 info!("All uncommitted local transactions reverted");
5383 Ok(())
5384 }
5385
5386 #[instrument(level = "error", skip_all)]
5387 async fn reopen_epoch_db(
5388 &self,
5389 cur_epoch_store: &AuthorityPerEpochStore,
5390 new_committee: Committee,
5391 epoch_start_configuration: EpochStartConfiguration,
5392 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5393 epoch_last_checkpoint: CheckpointSequenceNumber,
5394 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5395 let new_epoch = new_committee.epoch;
5396 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5397 assert_eq!(
5398 epoch_start_configuration.epoch_start_state().epoch(),
5399 new_committee.epoch
5400 );
5401 fail_point!("before-open-new-epoch-store");
5402 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5403 self.name,
5404 new_committee,
5405 epoch_start_configuration,
5406 self.get_backing_package_store().clone(),
5407 self.get_object_store().clone(),
5408 expensive_safety_check_config,
5409 epoch_last_checkpoint,
5410 )?;
5411 self.epoch_store.store(new_epoch_store.clone());
5412 Ok(new_epoch_store)
5413 }
5414
5415 fn check_move_account(
5418 &self,
5419 auth_account_object_id: ObjectID,
5420 auth_account_object_seq_number: Option<SequenceNumber>,
5421 auth_account_object_digest: Option<ObjectDigest>,
5422 account_object: ObjectReadResult,
5423 signer: &IotaAddress,
5424 ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5425 let account_object = match account_object.object {
5426 ObjectReadResultKind::Object(object) => Ok(object),
5427 ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5428 Err(UserInputError::AccountObjectDeleted {
5429 account_id: account_object.id(),
5430 account_version: version,
5431 transaction_digest: digest,
5432 })
5433 }
5434 ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5437 Err(UserInputError::AccountObjectInCanceledTransaction {
5438 account_id: account_object.id(),
5439 account_version: version,
5440 })
5441 }
5442 }?;
5443
5444 let account_object_addr = IotaAddress::from(auth_account_object_id);
5445
5446 fp_ensure!(
5447 signer == &account_object_addr,
5448 UserInputError::IncorrectUserSignature {
5449 error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5450 }
5451 .into()
5452 );
5453
5454 fp_ensure!(
5455 account_object.is_shared() || account_object.is_immutable(),
5456 UserInputError::AccountObjectNotSupported {
5457 object_id: auth_account_object_id
5458 }
5459 .into()
5460 );
5461
5462 let auth_account_object_seq_number =
5463 if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5464 let account_object_version = account_object.version();
5465
5466 fp_ensure!(
5467 account_object_version == auth_account_object_seq_number,
5468 UserInputError::AccountObjectVersionMismatch {
5469 object_id: auth_account_object_id,
5470 expected_version: auth_account_object_seq_number,
5471 actual_version: account_object_version,
5472 }
5473 .into()
5474 );
5475
5476 auth_account_object_seq_number
5477 } else {
5478 account_object.version()
5479 };
5480
5481 if let Some(auth_account_object_digest) = auth_account_object_digest {
5482 let expected_digest = account_object.digest();
5483 fp_ensure!(
5484 expected_digest == auth_account_object_digest,
5485 UserInputError::InvalidAccountObjectDigest {
5486 object_id: auth_account_object_id,
5487 expected_digest,
5488 actual_digest: auth_account_object_digest,
5489 }
5490 .into()
5491 );
5492 }
5493
5494 let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5495 auth_account_object_id,
5496 &AuthenticatorFunctionRefV1Key::tag().into(),
5497 &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5498 )
5499 .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5500 account_object_id: auth_account_object_id,
5501 })?;
5502
5503 let authenticator_function_ref_field = self
5504 .get_object_cache_reader()
5505 .try_find_object_lt_or_eq_version(
5506 authenticator_function_ref_field_id,
5507 auth_account_object_seq_number,
5508 )?;
5509
5510 if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5511 let field_move_object = authenticator_function_ref_field_obj
5512 .data
5513 .try_as_move()
5514 .expect("dynamic field should never be a package object");
5515
5516 let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5517 field_move_object.to_rust().ok_or(
5518 UserInputError::InvalidAuthenticatorFunctionRefField {
5519 account_object_id: auth_account_object_id,
5520 },
5521 )?;
5522
5523 Ok(AuthenticatorFunctionRefForExecution::new_v1(
5524 field.value,
5525 authenticator_function_ref_field_obj.compute_object_reference(),
5526 authenticator_function_ref_field_obj.owner,
5527 authenticator_function_ref_field_obj.storage_rebate,
5528 authenticator_function_ref_field_obj.previous_transaction,
5529 ))
5530 } else {
5531 Err(UserInputError::MoveAuthenticatorNotFound {
5532 authenticator_function_ref_id: authenticator_function_ref_field_id,
5533 account_object_id: auth_account_object_id,
5534 account_object_version: auth_account_object_seq_number,
5535 }
5536 .into())
5537 }
5538 }
5539
5540 #[allow(clippy::type_complexity)]
5541 fn read_objects_for_signing(
5542 &self,
5543 transaction: &VerifiedTransaction,
5544 epoch: u64,
5545 ) -> IotaResult<(
5546 InputObjects,
5547 ReceivingObjects,
5548 Vec<(InputObjects, ObjectReadResult)>,
5549 )> {
5550 let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5551 Some(transaction.digest()),
5552 &transaction.collect_all_input_object_kind_for_reading()?,
5553 &transaction.data().transaction_data().receiving_objects(),
5554 epoch,
5555 )?;
5556
5557 transaction
5558 .split_input_objects_into_groups_for_reading(input_objects)
5559 .map(|(tx_input_objects, per_authenticator_inputs)| {
5560 (
5561 tx_input_objects,
5562 tx_receiving_objects,
5563 per_authenticator_inputs,
5564 )
5565 })
5566 }
5567
5568 #[allow(clippy::type_complexity)]
5569 fn check_transaction_inputs_for_signing(
5570 &self,
5571 protocol_config: &ProtocolConfig,
5572 reference_gas_price: u64,
5573 tx_data: &TransactionData,
5574 tx_input_objects: InputObjects,
5575 tx_receiving_objects: &ReceivingObjects,
5576 move_authenticators: &Vec<&MoveAuthenticator>,
5577 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5578 ) -> IotaResult<(
5579 IotaGasStatus,
5580 CheckedInputObjects,
5581 Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5582 )> {
5583 let authenticator_gas_budget = if move_authenticators.is_empty() {
5584 0
5585 } else {
5586 protocol_config.max_auth_gas()
5589 };
5590
5591 debug_assert_eq!(
5592 move_authenticators.len(),
5593 per_authenticator_inputs.len(),
5594 "Move authenticators amount must match the number of authenticator inputs"
5595 );
5596
5597 let per_authenticator_checked_inputs = move_authenticators
5598 .iter()
5599 .zip(per_authenticator_inputs)
5600 .map(
5601 |(move_authenticator, (authenticator_input_objects, account_object))| {
5602 let (
5604 auth_account_object_id,
5605 auth_account_object_seq_number,
5606 auth_account_object_digest,
5607 ) = move_authenticator.object_to_authenticate_components()?;
5608
5609 let signer = move_authenticator.address()?;
5610
5611 let AuthenticatorFunctionRefForExecution {
5613 authenticator_function_ref,
5614 ..
5615 } = self.check_move_account(
5616 auth_account_object_id,
5617 auth_account_object_seq_number,
5618 auth_account_object_digest,
5619 account_object,
5620 &signer,
5621 )?;
5622
5623 let authenticator_checked_input_objects =
5625 iota_transaction_checks::check_move_authenticator_input_for_signing(
5626 authenticator_input_objects,
5627 )?;
5628
5629 Ok((
5630 authenticator_checked_input_objects,
5631 authenticator_function_ref,
5632 ))
5633 },
5634 )
5635 .collect::<IotaResult<Vec<_>>>()?;
5636
5637 let (gas_status, tx_checked_input_objects) =
5639 iota_transaction_checks::check_transaction_input(
5640 protocol_config,
5641 reference_gas_price,
5642 tx_data,
5643 tx_input_objects,
5644 tx_receiving_objects,
5645 &self.metrics.bytecode_verifier_metrics,
5646 &self.config.verifier_signing_config,
5647 authenticator_gas_budget,
5648 )?;
5649
5650 Ok((
5651 gas_status,
5652 tx_checked_input_objects,
5653 per_authenticator_checked_inputs,
5654 ))
5655 }
5656
5657 #[cfg(test)]
5658 pub(crate) fn iter_live_object_set_for_testing(
5659 &self,
5660 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5661 self.get_accumulator_store()
5662 .iter_cached_live_object_set_for_testing()
5663 }
5664
5665 #[cfg(test)]
5666 pub(crate) fn shutdown_execution_for_test(&self) {
5667 self.tx_execution_shutdown
5668 .lock()
5669 .take()
5670 .unwrap()
5671 .send(())
5672 .unwrap();
5673 }
5674
5675 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5678 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5679 self.get_object_cache_reader()
5680 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5681 self.get_reconfig_api()
5682 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5683 }
5684}
5685
5686pub struct RandomnessRoundReceiver {
5687 authority_state: Arc<AuthorityState>,
5688 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5689}
5690
5691impl RandomnessRoundReceiver {
5692 pub fn spawn(
5693 authority_state: Arc<AuthorityState>,
5694 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5695 ) -> JoinHandle<()> {
5696 let rrr = RandomnessRoundReceiver {
5697 authority_state,
5698 randomness_rx,
5699 };
5700 spawn_monitored_task!(rrr.run())
5701 }
5702
5703 async fn run(mut self) {
5704 info!("RandomnessRoundReceiver event loop started");
5705
5706 loop {
5707 tokio::select! {
5708 maybe_recv = self.randomness_rx.recv() => {
5709 if let Some((epoch, round, bytes)) = maybe_recv {
5710 self.handle_new_randomness(epoch, round, bytes);
5711 } else {
5712 break;
5713 }
5714 },
5715 }
5716 }
5717
5718 info!("RandomnessRoundReceiver event loop ended");
5719 }
5720
5721 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5722 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5723 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5724 if epoch_store.epoch() != epoch {
5725 warn!(
5726 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5727 epoch_store.epoch()
5728 );
5729 return;
5730 }
5731 let transaction = VerifiedTransaction::new_randomness_state_update(
5732 epoch,
5733 round,
5734 bytes,
5735 epoch_store
5736 .epoch_start_config()
5737 .randomness_obj_initial_shared_version(),
5738 );
5739 debug!(
5740 "created randomness state update transaction with digest: {:?}",
5741 transaction.digest()
5742 );
5743 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5744 let digest = *transaction.digest();
5745
5746 self.authority_state
5751 .get_cache_commit()
5752 .persist_transaction(&transaction);
5753
5754 self.authority_state
5756 .transaction_manager()
5757 .enqueue(vec![transaction], &epoch_store);
5758
5759 let authority_state = self.authority_state.clone();
5760 spawn_monitored_task!(async move {
5761 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5770 let result = tokio::time::timeout(
5771 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5772 authority_state
5773 .get_transaction_cache_reader()
5774 .try_notify_read_executed_effects(&[digest]),
5775 )
5776 .await;
5777 let result = match result {
5778 Ok(result) => result,
5779 Err(_) => {
5780 if cfg!(debug_assertions) {
5781 panic!(
5783 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5784 );
5785 }
5786 warn!(
5787 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5788 );
5789 authority_state
5791 .get_transaction_cache_reader()
5792 .try_notify_read_executed_effects(&[digest])
5793 .await
5794 }
5795 };
5796
5797 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5798 let effects = effects.pop().expect("should return effects");
5799 if *effects.status() != ExecutionStatus::Success {
5800 fatal!(
5801 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5802 );
5803 }
5804 debug!(
5805 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5806 );
5807 });
5808 }
5809}
5810
5811#[async_trait]
5812impl TransactionKeyValueStoreTrait for AuthorityState {
5813 async fn multi_get(
5814 &self,
5815 transaction_keys: &[TransactionDigest],
5816 effects_keys: &[TransactionDigest],
5817 ) -> IotaResult<KVStoreTransactionData> {
5818 let txns = if !transaction_keys.is_empty() {
5819 self.get_transaction_cache_reader()
5820 .try_multi_get_transaction_blocks(transaction_keys)?
5821 .into_iter()
5822 .map(|t| t.map(|t| (*t).clone().into_inner()))
5823 .collect()
5824 } else {
5825 vec![]
5826 };
5827
5828 let fx = if !effects_keys.is_empty() {
5829 self.get_transaction_cache_reader()
5830 .try_multi_get_executed_effects(effects_keys)?
5831 } else {
5832 vec![]
5833 };
5834
5835 Ok((txns, fx))
5836 }
5837
5838 async fn multi_get_checkpoints(
5839 &self,
5840 checkpoint_summaries: &[CheckpointSequenceNumber],
5841 checkpoint_contents: &[CheckpointSequenceNumber],
5842 checkpoint_summaries_by_digest: &[CheckpointDigest],
5843 ) -> IotaResult<(
5844 Vec<Option<CertifiedCheckpointSummary>>,
5845 Vec<Option<CheckpointContents>>,
5846 Vec<Option<CertifiedCheckpointSummary>>,
5847 )> {
5848 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5850 let store = self.get_checkpoint_store();
5851 for seq in checkpoint_summaries {
5852 let checkpoint = store
5853 .get_checkpoint_by_sequence_number(*seq)?
5854 .map(|c| c.into_inner());
5855
5856 summaries.push(checkpoint);
5857 }
5858
5859 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5860 for seq in checkpoint_contents {
5861 let checkpoint = store
5862 .get_checkpoint_by_sequence_number(*seq)?
5863 .and_then(|summary| {
5864 store
5865 .get_checkpoint_contents(&summary.content_digest)
5866 .expect("db read cannot fail")
5867 });
5868 contents.push(checkpoint);
5869 }
5870
5871 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5872 for digest in checkpoint_summaries_by_digest {
5873 let checkpoint = store
5874 .get_checkpoint_by_digest(digest)?
5875 .map(|c| c.into_inner());
5876 summaries_by_digest.push(checkpoint);
5877 }
5878
5879 Ok((summaries, contents, summaries_by_digest))
5880 }
5881
5882 async fn get_transaction_perpetual_checkpoint(
5883 &self,
5884 digest: TransactionDigest,
5885 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5886 self.get_checkpoint_cache()
5887 .try_get_transaction_perpetual_checkpoint(&digest)
5888 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5889 }
5890
5891 async fn get_object(
5892 &self,
5893 object_id: ObjectID,
5894 version: VersionNumber,
5895 ) -> IotaResult<Option<Object>> {
5896 self.get_object_cache_reader()
5897 .try_get_object_by_key(&object_id, version)
5898 }
5899
5900 #[instrument(skip_all)]
5901 async fn multi_get_objects(
5902 &self,
5903 object_keys: &[ObjectKey],
5904 ) -> IotaResult<Vec<Option<Object>>> {
5905 Ok(self
5906 .get_object_cache_reader()
5907 .multi_get_objects_by_key(object_keys))
5908 }
5909
5910 async fn multi_get_transactions_perpetual_checkpoints(
5911 &self,
5912 digests: &[TransactionDigest],
5913 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5914 let res = self
5915 .get_checkpoint_cache()
5916 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5917
5918 Ok(res
5919 .into_iter()
5920 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5921 .collect())
5922 }
5923
5924 #[instrument(skip(self))]
5925 async fn multi_get_events_by_tx_digests(
5926 &self,
5927 digests: &[TransactionDigest],
5928 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5929 if digests.is_empty() {
5930 return Ok(vec![]);
5931 }
5932
5933 Ok(self
5934 .get_transaction_cache_reader()
5935 .multi_get_events(digests))
5936 }
5937}
5938
5939#[cfg(msim)]
5940pub mod framework_injection {
5941 use std::{
5942 cell::RefCell,
5943 collections::{BTreeMap, BTreeSet},
5944 };
5945
5946 use iota_framework::{BuiltInFramework, SystemPackage};
5947 use iota_types::{
5948 base_types::{AuthorityName, ObjectID},
5949 is_system_package,
5950 };
5951 use move_binary_format::CompiledModule;
5952
5953 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5954
5955 thread_local! {
5957 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5958 }
5959
5960 type Framework = Vec<CompiledModule>;
5961
5962 pub type PackageUpgradeCallback =
5963 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5964
5965 enum PackageOverrideConfig {
5966 Global(Framework),
5967 PerValidator(PackageUpgradeCallback),
5968 }
5969
5970 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5971 modules
5972 .iter()
5973 .map(|m| {
5974 let mut buf = Vec::new();
5975 m.serialize_with_version(m.version, &mut buf).unwrap();
5976 buf
5977 })
5978 .collect()
5979 }
5980
5981 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5982 OVERRIDE.with(|bs| {
5983 bs.borrow_mut()
5984 .insert(package_id, PackageOverrideConfig::Global(modules))
5985 });
5986 }
5987
5988 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5989 OVERRIDE.with(|bs| {
5990 bs.borrow_mut()
5991 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5992 });
5993 }
5994
5995 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5996 OVERRIDE.with(|cfg| {
5997 cfg.borrow().get(package_id).and_then(|entry| match entry {
5998 PackageOverrideConfig::Global(framework) => {
5999 Some(compiled_modules_to_bytes(framework))
6000 }
6001 PackageOverrideConfig::PerValidator(func) => {
6002 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6003 }
6004 })
6005 })
6006 }
6007
6008 pub fn get_override_modules(
6009 package_id: &ObjectID,
6010 name: AuthorityName,
6011 ) -> Option<Vec<CompiledModule>> {
6012 OVERRIDE.with(|cfg| {
6013 cfg.borrow().get(package_id).and_then(|entry| match entry {
6014 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6015 PackageOverrideConfig::PerValidator(func) => func(name),
6016 })
6017 })
6018 }
6019
6020 pub fn get_override_system_package(
6021 package_id: &ObjectID,
6022 name: AuthorityName,
6023 ) -> Option<SystemPackage> {
6024 let bytes = get_override_bytes(package_id, name)?;
6025 let dependencies = if is_system_package(*package_id) {
6026 BuiltInFramework::get_package_by_id(package_id)
6027 .dependencies
6028 .to_vec()
6029 } else {
6030 BuiltInFramework::all_package_ids()
6033 };
6034 Some(SystemPackage {
6035 id: *package_id,
6036 bytes,
6037 dependencies,
6038 })
6039 }
6040
6041 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6042 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6043 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6044 cfg.borrow()
6045 .keys()
6046 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6047 .collect()
6048 });
6049
6050 extra
6051 .into_iter()
6052 .map(|package| SystemPackage {
6053 id: package,
6054 bytes: get_override_bytes(&package, name).unwrap(),
6055 dependencies: BuiltInFramework::all_package_ids(),
6056 })
6057 .collect()
6058 }
6059}
6060
6061#[derive(Debug, Serialize, Deserialize, Clone)]
6062pub struct ObjDumpFormat {
6063 pub id: ObjectID,
6064 pub version: VersionNumber,
6065 pub digest: ObjectDigest,
6066 pub object: Object,
6067}
6068
6069impl ObjDumpFormat {
6070 fn new(object: Object) -> Self {
6071 let oref = object.compute_object_reference();
6072 Self {
6073 id: oref.0,
6074 version: oref.1,
6075 digest: oref.2,
6076 object,
6077 }
6078 }
6079}
6080
6081#[derive(Debug, Serialize, Deserialize, Clone)]
6082pub struct NodeStateDump {
6083 pub tx_digest: TransactionDigest,
6084 pub sender_signed_data: SenderSignedData,
6085 pub executed_epoch: u64,
6086 pub reference_gas_price: u64,
6087 pub protocol_version: u64,
6088 pub epoch_start_timestamp_ms: u64,
6089 pub computed_effects: TransactionEffects,
6090 pub expected_effects_digest: TransactionEffectsDigest,
6091 pub relevant_system_packages: Vec<ObjDumpFormat>,
6092 pub shared_objects: Vec<ObjDumpFormat>,
6093 pub loaded_child_objects: Vec<ObjDumpFormat>,
6094 pub modified_at_versions: Vec<ObjDumpFormat>,
6095 pub runtime_reads: Vec<ObjDumpFormat>,
6096 pub input_objects: Vec<ObjDumpFormat>,
6097}
6098
6099impl NodeStateDump {
6100 pub fn new(
6101 tx_digest: &TransactionDigest,
6102 effects: &TransactionEffects,
6103 expected_effects_digest: TransactionEffectsDigest,
6104 object_store: &dyn ObjectStore,
6105 epoch_store: &Arc<AuthorityPerEpochStore>,
6106 inner_temporary_store: &InnerTemporaryStore,
6107 certificate: &VerifiedExecutableTransaction,
6108 ) -> IotaResult<Self> {
6109 let executed_epoch = epoch_store.epoch();
6111 let reference_gas_price = epoch_store.reference_gas_price();
6112 let epoch_start_config = epoch_store.epoch_start_config();
6113 let protocol_version = epoch_store.protocol_version().as_u64();
6114 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6115
6116 let mut relevant_system_packages = Vec::new();
6118 for sys_package_id in BuiltInFramework::all_package_ids() {
6119 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6120 relevant_system_packages.push(ObjDumpFormat::new(w))
6121 }
6122 }
6123
6124 let mut shared_objects = Vec::new();
6126 for kind in effects.input_shared_objects() {
6127 match kind {
6128 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6129 if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
6130 shared_objects.push(ObjDumpFormat::new(w))
6131 }
6132 }
6133 InputSharedObject::ReadDeleted(..)
6134 | InputSharedObject::MutateDeleted(..)
6135 | InputSharedObject::Cancelled(..) => (), }
6138 }
6139
6140 let mut loaded_child_objects = Vec::new();
6143 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6144 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6145 loaded_child_objects.push(ObjDumpFormat::new(w))
6146 }
6147 }
6148
6149 let mut modified_at_versions = Vec::new();
6151 for (id, ver) in effects.modified_at_versions() {
6152 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6153 modified_at_versions.push(ObjDumpFormat::new(w))
6154 }
6155 }
6156
6157 let mut runtime_reads = Vec::new();
6161 for obj in inner_temporary_store
6162 .runtime_packages_loaded_from_db
6163 .values()
6164 {
6165 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6166 }
6167
6168 Ok(Self {
6171 tx_digest: *tx_digest,
6172 executed_epoch,
6173 reference_gas_price,
6174 epoch_start_timestamp_ms,
6175 protocol_version,
6176 relevant_system_packages,
6177 shared_objects,
6178 loaded_child_objects,
6179 modified_at_versions,
6180 runtime_reads,
6181 sender_signed_data: certificate.clone().into_message(),
6182 input_objects: inner_temporary_store
6183 .input_objects
6184 .values()
6185 .map(|o| ObjDumpFormat::new(o.clone()))
6186 .collect(),
6187 computed_effects: effects.clone(),
6188 expected_effects_digest,
6189 })
6190 }
6191
6192 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6193 let mut objects = Vec::new();
6194 objects.extend(self.relevant_system_packages.clone());
6195 objects.extend(self.shared_objects.clone());
6196 objects.extend(self.loaded_child_objects.clone());
6197 objects.extend(self.modified_at_versions.clone());
6198 objects.extend(self.runtime_reads.clone());
6199 objects.extend(self.input_objects.clone());
6200 objects
6201 }
6202
6203 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6204 let file_name = format!(
6205 "{}_{}_NODE_DUMP.json",
6206 self.tx_digest,
6207 AuthorityState::unixtime_now_ms()
6208 );
6209 let mut path = path.to_path_buf();
6210 path.push(&file_name);
6211 let mut file = File::create(path.clone())?;
6212 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6213 Ok(path)
6214 }
6215
6216 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6217 let file = File::open(path)?;
6218 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6219 }
6220}