1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49 key_value_store::{
50 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51 },
52 key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57 IOTA_SYSTEM_ADDRESS, TypeTag,
58 account_abstraction::{
59 account::AuthenticatorFunctionRefV1Key,
60 authenticator_function::{
61 AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62 AuthenticatorFunctionRefV1,
63 },
64 },
65 authenticator_state::get_authenticator_state,
66 base_types::*,
67 committee::{Committee, EpochId, ProtocolVersion},
68 crypto::{
69 AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70 default_hash,
71 },
72 deny_list_v1::check_coin_deny_list_v1_during_signing,
73 digests::{ChainIdentifier, Digest},
74 dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75 effects::{
76 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77 TransactionEvents, VerifiedSignedTransactionEffects,
78 },
79 error::{ExecutionError, IotaError, IotaResult, UserInputError},
80 event::{Event, EventID, SystemEpochInfoEvent},
81 executable_transaction::VerifiedExecutableTransaction,
82 execution_config_utils::to_binary_config,
83 execution_status::ExecutionStatus,
84 fp_ensure,
85 gas::{GasCostSummary, IotaGasStatus},
86 gas_coin::NANOS_PER_IOTA,
87 inner_temporary_store::{
88 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89 WrittenObjects,
90 },
91 iota_system_state::{
92 IotaSystemState, IotaSystemStateTrait,
93 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94 },
95 is_system_package,
96 layout_resolver::{LayoutResolver, into_struct_layout},
97 message_envelope::Message,
98 messages_checkpoint::{
99 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103 },
104 messages_consensus::AuthorityCapabilitiesV1,
105 messages_grpc::{
106 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108 TransactionStatus,
109 },
110 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111 move_authenticator::MoveAuthenticator,
112 object::{
113 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114 bounded_visitor::BoundedVisitor,
115 },
116 storage::{
117 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118 },
119 supported_protocol_versions::{
120 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121 },
122 transaction::*,
123 transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131 register_histogram_vec_with_registry, register_histogram_with_registry,
132 register_int_counter_vec_with_registry, register_int_counter_with_registry,
133 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139 task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152 authority::{
153 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157 authority_store_tables::AuthorityPrunerTables,
158 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159 },
160 authority_client::NetworkAuthorityClient,
161 checkpoints::CheckpointStore,
162 congestion_tracker::CongestionTracker,
163 consensus_adapter::ConsensusAdapter,
164 epoch::committee_store::CommitteeStore,
165 execution_cache::{
166 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168 TransactionCacheRead,
169 },
170 execution_driver::execution_process,
171 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
172 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
173 metrics::{LatencyObserver, RateTracker},
174 module_cache_metrics::ResolverMetrics,
175 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
176 stake_aggregator::StakeAggregator,
177 state_accumulator::{AccumulatorStore, StateAccumulator},
178 subscription_handler::SubscriptionHandler,
179 transaction_input_loader::TransactionInputLoader,
180 transaction_manager::TransactionManager,
181 transaction_outputs::TransactionOutputs,
182 validator_tx_finalizer::ValidatorTxFinalizer,
183 verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223mod authority_store_migrations;
224pub mod authority_store_pruner;
225pub mod authority_store_tables;
226pub mod authority_store_types;
227pub mod epoch_start_configuration;
228pub mod shared_object_congestion_tracker;
229pub mod shared_object_version_manager;
230pub mod suggested_gas_price_calculator;
231pub mod test_authority_builder;
232pub mod transaction_deferral;
233
234pub(crate) mod authority_store;
235pub mod backpressure;
236
237pub struct AuthorityMetrics {
239 tx_orders: IntCounter,
240 total_certs: IntCounter,
241 total_cert_attempts: IntCounter,
242 total_effects: IntCounter,
243 pub shared_obj_tx: IntCounter,
244 sponsored_tx: IntCounter,
245 tx_already_processed: IntCounter,
246 num_input_objs: Histogram,
247 num_shared_objects: Histogram,
248 batch_size: Histogram,
249
250 authority_state_handle_transaction_latency: Histogram,
251
252 execute_certificate_latency_single_writer: Histogram,
253 execute_certificate_latency_shared_object: Histogram,
254
255 internal_execution_latency: Histogram,
256 execution_load_input_objects_latency: Histogram,
257 prepare_certificate_latency: Histogram,
258 commit_certificate_latency: Histogram,
259 db_checkpoint_latency: Histogram,
260
261 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
262 pub(crate) transaction_manager_num_missing_objects: IntGauge,
263 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
264 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
265 pub(crate) transaction_manager_num_ready: IntGauge,
266 pub(crate) transaction_manager_object_cache_size: IntGauge,
267 pub(crate) transaction_manager_object_cache_hits: IntCounter,
268 pub(crate) transaction_manager_object_cache_misses: IntCounter,
269 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
270 pub(crate) transaction_manager_package_cache_size: IntGauge,
271 pub(crate) transaction_manager_package_cache_hits: IntCounter,
272 pub(crate) transaction_manager_package_cache_misses: IntCounter,
273 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
274 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
275
276 pub(crate) execution_driver_executed_transactions: IntCounter,
277 pub(crate) execution_driver_dispatch_queue: IntGauge,
278 pub(crate) execution_queueing_delay_s: Histogram,
279 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
280 pub(crate) execution_gas_latency_ratio: Histogram,
281
282 pub(crate) skipped_consensus_txns: IntCounter,
283 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
284
285 pub(crate) authority_overload_status: IntGauge,
286 pub(crate) authority_load_shedding_percentage: IntGauge,
287
288 pub(crate) transaction_overload_sources: IntCounterVec,
289
290 post_processing_total_events_emitted: IntCounter,
292 post_processing_total_tx_indexed: IntCounter,
293 post_processing_total_tx_had_event_processed: IntCounter,
294 post_processing_total_failures: IntCounter,
295
296 pub consensus_handler_processed: IntCounterVec,
298 pub consensus_handler_transaction_sizes: HistogramVec,
299 pub consensus_handler_num_low_scoring_authorities: IntGauge,
300 pub consensus_handler_scores: IntGaugeVec,
301 pub consensus_handler_deferred_transactions: IntCounter,
302 pub consensus_handler_congested_transactions: IntCounter,
303 pub consensus_handler_cancelled_transactions: IntCounter,
304 pub consensus_handler_max_object_costs: IntGaugeVec,
305 pub consensus_committed_subdags: IntCounterVec,
306 pub consensus_committed_messages: IntGaugeVec,
307 pub consensus_committed_user_transactions: IntGaugeVec,
308 pub consensus_handler_leader_round: IntGauge,
309 pub consensus_calculated_throughput: IntGauge,
310 pub consensus_calculated_throughput_profile: IntGauge,
311
312 pub limits_metrics: Arc<LimitsMetrics>,
313
314 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
316
317 pub zklogin_sig_count: IntCounter,
319 pub multisig_sig_count: IntCounter,
321
322 pub execution_queueing_latency: LatencyObserver,
325
326 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
333
334 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
337}
338
339const POSITIVE_INT_BUCKETS: &[f64] = &[
341 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
342 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
343 7000000., 10000000.,
344];
345
346const LATENCY_SEC_BUCKETS: &[f64] = &[
347 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
348 10., 20., 30., 60., 90.,
349];
350
351const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
353 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
354 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
355];
356
357const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
358 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
359 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
360];
361
362pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
366 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
367 let execute_certificate_latency = register_histogram_vec_with_registry!(
368 "authority_state_execute_certificate_latency",
369 "Latency of executing certificates, including waiting for inputs",
370 &["tx_type"],
371 LATENCY_SEC_BUCKETS.to_vec(),
372 registry,
373 )
374 .unwrap();
375
376 let execute_certificate_latency_single_writer =
377 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
378 let execute_certificate_latency_shared_object =
379 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
380
381 Self {
382 tx_orders: register_int_counter_with_registry!(
383 "total_transaction_orders",
384 "Total number of transaction orders",
385 registry,
386 )
387 .unwrap(),
388 total_certs: register_int_counter_with_registry!(
389 "total_transaction_certificates",
390 "Total number of transaction certificates handled",
391 registry,
392 )
393 .unwrap(),
394 total_cert_attempts: register_int_counter_with_registry!(
395 "total_handle_certificate_attempts",
396 "Number of calls to handle_certificate",
397 registry,
398 )
399 .unwrap(),
400 total_effects: register_int_counter_with_registry!(
402 "total_transaction_effects",
403 "Total number of transaction effects produced",
404 registry,
405 )
406 .unwrap(),
407
408 shared_obj_tx: register_int_counter_with_registry!(
409 "num_shared_obj_tx",
410 "Number of transactions involving shared objects",
411 registry,
412 )
413 .unwrap(),
414
415 sponsored_tx: register_int_counter_with_registry!(
416 "num_sponsored_tx",
417 "Number of sponsored transactions",
418 registry,
419 )
420 .unwrap(),
421
422 tx_already_processed: register_int_counter_with_registry!(
423 "num_tx_already_processed",
424 "Number of transaction orders already processed previously",
425 registry,
426 )
427 .unwrap(),
428 num_input_objs: register_histogram_with_registry!(
429 "num_input_objects",
430 "Distribution of number of input TX objects per TX",
431 POSITIVE_INT_BUCKETS.to_vec(),
432 registry,
433 )
434 .unwrap(),
435 num_shared_objects: register_histogram_with_registry!(
436 "num_shared_objects",
437 "Number of shared input objects per TX",
438 POSITIVE_INT_BUCKETS.to_vec(),
439 registry,
440 )
441 .unwrap(),
442 batch_size: register_histogram_with_registry!(
443 "batch_size",
444 "Distribution of size of transaction batch",
445 POSITIVE_INT_BUCKETS.to_vec(),
446 registry,
447 )
448 .unwrap(),
449 authority_state_handle_transaction_latency: register_histogram_with_registry!(
450 "authority_state_handle_transaction_latency",
451 "Latency of handling transactions",
452 LATENCY_SEC_BUCKETS.to_vec(),
453 registry,
454 )
455 .unwrap(),
456 execute_certificate_latency_single_writer,
457 execute_certificate_latency_shared_object,
458 internal_execution_latency: register_histogram_with_registry!(
459 "authority_state_internal_execution_latency",
460 "Latency of actual certificate executions",
461 LATENCY_SEC_BUCKETS.to_vec(),
462 registry,
463 )
464 .unwrap(),
465 execution_load_input_objects_latency: register_histogram_with_registry!(
466 "authority_state_execution_load_input_objects_latency",
467 "Latency of loading input objects for execution",
468 LOW_LATENCY_SEC_BUCKETS.to_vec(),
469 registry,
470 )
471 .unwrap(),
472 prepare_certificate_latency: register_histogram_with_registry!(
473 "authority_state_prepare_certificate_latency",
474 "Latency of executing certificates, before committing the results",
475 LATENCY_SEC_BUCKETS.to_vec(),
476 registry,
477 )
478 .unwrap(),
479 commit_certificate_latency: register_histogram_with_registry!(
480 "authority_state_commit_certificate_latency",
481 "Latency of committing certificate execution results",
482 LATENCY_SEC_BUCKETS.to_vec(),
483 registry,
484 )
485 .unwrap(),
486 db_checkpoint_latency: register_histogram_with_registry!(
487 "db_checkpoint_latency",
488 "Latency of checkpointing dbs",
489 LATENCY_SEC_BUCKETS.to_vec(),
490 registry,
491 ).unwrap(),
492 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
493 "transaction_manager_num_enqueued_certificates",
494 "Current number of certificates enqueued to TransactionManager",
495 &["result"],
496 registry,
497 )
498 .unwrap(),
499 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
500 "transaction_manager_num_missing_objects",
501 "Current number of missing objects in TransactionManager",
502 registry,
503 )
504 .unwrap(),
505 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
506 "transaction_manager_num_pending_certificates",
507 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
508 registry,
509 )
510 .unwrap(),
511 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
512 "transaction_manager_num_executing_certificates",
513 "Number of executing certificates, including queued and actually running certificates",
514 registry,
515 )
516 .unwrap(),
517 transaction_manager_num_ready: register_int_gauge_with_registry!(
518 "transaction_manager_num_ready",
519 "Number of ready transactions in TransactionManager",
520 registry,
521 )
522 .unwrap(),
523 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
524 "transaction_manager_object_cache_size",
525 "Current size of object-availability cache in TransactionManager",
526 registry,
527 )
528 .unwrap(),
529 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
530 "transaction_manager_object_cache_hits",
531 "Number of object-availability cache hits in TransactionManager",
532 registry,
533 )
534 .unwrap(),
535 authority_overload_status: register_int_gauge_with_registry!(
536 "authority_overload_status",
537 "Whether authority is current experiencing overload and enters load shedding mode.",
538 registry)
539 .unwrap(),
540 authority_load_shedding_percentage: register_int_gauge_with_registry!(
541 "authority_load_shedding_percentage",
542 "The percentage of transactions is shed when the authority is in load shedding mode.",
543 registry)
544 .unwrap(),
545 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
546 "transaction_manager_object_cache_misses",
547 "Number of object-availability cache misses in TransactionManager",
548 registry,
549 )
550 .unwrap(),
551 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
552 "transaction_manager_object_cache_evictions",
553 "Number of object-availability cache evictions in TransactionManager",
554 registry,
555 )
556 .unwrap(),
557 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
558 "transaction_manager_package_cache_size",
559 "Current size of package-availability cache in TransactionManager",
560 registry,
561 )
562 .unwrap(),
563 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
564 "transaction_manager_package_cache_hits",
565 "Number of package-availability cache hits in TransactionManager",
566 registry,
567 )
568 .unwrap(),
569 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
570 "transaction_manager_package_cache_misses",
571 "Number of package-availability cache misses in TransactionManager",
572 registry,
573 )
574 .unwrap(),
575 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
576 "transaction_manager_package_cache_evictions",
577 "Number of package-availability cache evictions in TransactionManager",
578 registry,
579 )
580 .unwrap(),
581 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
582 "transaction_manager_transaction_queue_age_s",
583 "Time spent in waiting for transaction in the queue",
584 LATENCY_SEC_BUCKETS.to_vec(),
585 registry,
586 )
587 .unwrap(),
588 transaction_overload_sources: register_int_counter_vec_with_registry!(
589 "transaction_overload_sources",
590 "Number of times each source indicates transaction overload.",
591 &["source"],
592 registry)
593 .unwrap(),
594 execution_driver_executed_transactions: register_int_counter_with_registry!(
595 "execution_driver_executed_transactions",
596 "Cumulative number of transaction executed by execution driver",
597 registry,
598 )
599 .unwrap(),
600 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
601 "execution_driver_dispatch_queue",
602 "Number of transaction pending in execution driver dispatch queue",
603 registry,
604 )
605 .unwrap(),
606 execution_queueing_delay_s: register_histogram_with_registry!(
607 "execution_queueing_delay_s",
608 "Queueing delay between a transaction is ready for execution until it starts executing.",
609 LATENCY_SEC_BUCKETS.to_vec(),
610 registry
611 )
612 .unwrap(),
613 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
614 "prepare_cert_gas_latency_ratio",
615 "The ratio of computation gas divided by VM execution latency.",
616 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
617 registry
618 )
619 .unwrap(),
620 execution_gas_latency_ratio: register_histogram_with_registry!(
621 "execution_gas_latency_ratio",
622 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
623 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
624 registry
625 )
626 .unwrap(),
627 skipped_consensus_txns: register_int_counter_with_registry!(
628 "skipped_consensus_txns",
629 "Total number of consensus transactions skipped",
630 registry,
631 )
632 .unwrap(),
633 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
634 "skipped_consensus_txns_cache_hit",
635 "Total number of consensus transactions skipped because of local cache hit",
636 registry,
637 )
638 .unwrap(),
639 post_processing_total_events_emitted: register_int_counter_with_registry!(
640 "post_processing_total_events_emitted",
641 "Total number of events emitted in post processing",
642 registry,
643 )
644 .unwrap(),
645 post_processing_total_tx_indexed: register_int_counter_with_registry!(
646 "post_processing_total_tx_indexed",
647 "Total number of txes indexed in post processing",
648 registry,
649 )
650 .unwrap(),
651 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
652 "post_processing_total_tx_had_event_processed",
653 "Total number of txes finished event processing in post processing",
654 registry,
655 )
656 .unwrap(),
657 post_processing_total_failures: register_int_counter_with_registry!(
658 "post_processing_total_failures",
659 "Total number of failure in post processing",
660 registry,
661 )
662 .unwrap(),
663 consensus_handler_processed: register_int_counter_vec_with_registry!(
664 "consensus_handler_processed",
665 "Number of transactions processed by consensus handler",
666 &["class"],
667 registry
668 ).unwrap(),
669 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
670 "consensus_handler_transaction_sizes",
671 "Sizes of each type of transactions processed by consensus handler",
672 &["class"],
673 POSITIVE_INT_BUCKETS.to_vec(),
674 registry
675 ).unwrap(),
676 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
677 "consensus_handler_num_low_scoring_authorities",
678 "Number of low scoring authorities based on reputation scores from consensus",
679 registry
680 ).unwrap(),
681 consensus_handler_scores: register_int_gauge_vec_with_registry!(
682 "consensus_handler_scores",
683 "scores from consensus for each authority",
684 &["authority"],
685 registry,
686 ).unwrap(),
687 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
688 "consensus_handler_deferred_transactions",
689 "Number of transactions deferred by consensus handler",
690 registry,
691 ).unwrap(),
692 consensus_handler_congested_transactions: register_int_counter_with_registry!(
693 "consensus_handler_congested_transactions",
694 "Number of transactions deferred by consensus handler due to congestion",
695 registry,
696 ).unwrap(),
697 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
698 "consensus_handler_cancelled_transactions",
699 "Number of transactions cancelled by consensus handler",
700 registry,
701 ).unwrap(),
702 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
703 "consensus_handler_max_congestion_control_object_costs",
704 "Max object costs for congestion control in the current consensus commit",
705 &["commit_type"],
706 registry,
707 ).unwrap(),
708 consensus_committed_subdags: register_int_counter_vec_with_registry!(
709 "consensus_committed_subdags",
710 "Number of committed subdags, sliced by leader",
711 &["authority"],
712 registry,
713 ).unwrap(),
714 consensus_committed_messages: register_int_gauge_vec_with_registry!(
715 "consensus_committed_messages",
716 "Total number of committed consensus messages, sliced by author",
717 &["authority"],
718 registry,
719 ).unwrap(),
720 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
721 "consensus_committed_user_transactions",
722 "Number of committed user transactions, sliced by submitter",
723 &["authority"],
724 registry,
725 ).unwrap(),
726 consensus_handler_leader_round: register_int_gauge_with_registry!(
727 "consensus_handler_leader_round",
728 "The leader round of the current consensus output being processed in the consensus handler",
729 registry,
730 ).unwrap(),
731 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
732 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
733 zklogin_sig_count: register_int_counter_with_registry!(
734 "zklogin_sig_count",
735 "Count of zkLogin signatures",
736 registry,
737 )
738 .unwrap(),
739 multisig_sig_count: register_int_counter_with_registry!(
740 "multisig_sig_count",
741 "Count of zkLogin signatures",
742 registry,
743 )
744 .unwrap(),
745 consensus_calculated_throughput: register_int_gauge_with_registry!(
746 "consensus_calculated_throughput",
747 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
748 registry,
749 ).unwrap(),
750 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
751 "consensus_calculated_throughput_profile",
752 "The current active calculated throughput profile",
753 registry
754 ).unwrap(),
755 execution_queueing_latency: LatencyObserver::new(),
756 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
758 }
759 }
760
761 pub fn reset_on_reconfigure(&self) {
765 self.consensus_committed_messages.reset();
766 self.consensus_handler_scores.reset();
767 self.consensus_committed_user_transactions.reset();
768 }
769}
770
771pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
778
779pub struct AuthorityState {
780 pub name: AuthorityName,
783 pub secret: StableSyncAuthoritySigner,
785
786 input_loader: TransactionInputLoader,
788 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
789
790 epoch_store: ArcSwap<AuthorityPerEpochStore>,
791
792 execution_lock: RwLock<EpochId>,
798
799 pub indexes: Option<Arc<IndexStore>>,
800 pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
801
802 pub subscription_handler: Arc<SubscriptionHandler>,
803 pub checkpoint_store: Arc<CheckpointStore>,
804
805 committee_store: Arc<CommitteeStore>,
806
807 transaction_manager: Arc<TransactionManager>,
809
810 #[cfg_attr(not(test), expect(unused))]
812 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
813
814 pub metrics: Arc<AuthorityMetrics>,
815 _pruner: AuthorityStorePruner,
816 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
817
818 db_checkpoint_config: DBCheckpointConfig,
820
821 pub config: NodeConfig,
822
823 pub overload_info: AuthorityOverloadInfo,
825
826 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
827
828 chain_identifier: ChainIdentifier,
831
832 pub(crate) congestion_tracker: Arc<CongestionTracker>,
833}
834
835impl AuthorityState {
844 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
845 epoch_store.committee().authority_exists(&self.name)
846 }
847
848 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
849 epoch_store
850 .active_validators()
851 .iter()
852 .any(|a| AuthorityName::from(a) == self.name)
853 }
854
855 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
856 !self.is_committee_validator(epoch_store)
857 }
858
859 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
860 &self.committee_store
861 }
862
863 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
864 self.committee_store.clone()
865 }
866
867 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
868 &self.config.authority_overload_config
869 }
870
871 pub fn get_epoch_state_commitments(
872 &self,
873 epoch: EpochId,
874 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
875 self.checkpoint_store.get_epoch_state_commitments(epoch)
876 }
877
878 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
882 async fn handle_transaction_impl(
883 &self,
884 transaction: VerifiedTransaction,
885 epoch_store: &Arc<AuthorityPerEpochStore>,
886 ) -> IotaResult<VerifiedSignedTransaction> {
887 let _execution_lock = self.execution_lock_for_signing()?;
889
890 let protocol_config = epoch_store.protocol_config();
891 let reference_gas_price = epoch_store.reference_gas_price();
892
893 let epoch = epoch_store.epoch();
894
895 let tx_data = transaction.data().transaction_data();
896
897 iota_transaction_checks::deny::check_transaction_for_signing(
901 tx_data,
902 transaction.tx_signatures(),
903 &transaction.input_objects()?,
904 &tx_data.receiving_objects(),
905 &self.config.transaction_deny_config,
906 self.get_backing_package_store().as_ref(),
907 )?;
908
909 let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
913 self.read_objects_for_signing(&transaction, epoch)?;
914
915 let move_authenticators = transaction.move_authenticators();
917
918 let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
924 .check_transaction_inputs_for_signing(
925 protocol_config,
926 reference_gas_price,
927 tx_data,
928 tx_input_objects,
929 &tx_receiving_objects,
930 &move_authenticators,
931 per_authenticator_inputs,
932 )?;
933
934 let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
937 .iter()
938 .map(|i| &i.0)
939 .collect();
940
941 check_coin_deny_list_v1_during_signing(
945 tx_data.sender(),
946 &tx_checked_input_objects,
947 &tx_receiving_objects,
948 &per_authenticator_checked_input_objects,
949 &self.get_object_store(),
950 )?;
951
952 if !move_authenticators.is_empty() {
955 let aggregated_authenticator_input_objects =
956 iota_transaction_checks::aggregate_authenticator_input_objects(
957 &per_authenticator_checked_input_objects,
958 )?;
959
960 debug_assert_eq!(
961 move_authenticators.len(),
962 per_authenticator_checked_inputs.len(),
963 "Move authenticators amount must match the number of checked authenticator inputs"
964 );
965
966 let move_authenticators = move_authenticators
967 .into_iter()
968 .zip(per_authenticator_checked_inputs)
969 .map(
970 |(
971 move_authenticator,
972 (authenticator_checked_input_objects, authenticator_function_ref),
973 )| {
974 (
975 move_authenticator.to_owned(),
976 authenticator_function_ref,
977 authenticator_checked_input_objects,
978 )
979 },
980 )
981 .collect();
982
983 let tx_data_bytes =
988 bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
989
990 let (kind, signer, gas_data) = tx_data.execution_parts();
991
992 let validation_result = epoch_store.executor().authenticate_transaction(
994 self.get_backing_store().as_ref(),
995 protocol_config,
996 self.metrics.limits_metrics.clone(),
997 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
998 epoch_store
999 .epoch_start_config()
1000 .epoch_data()
1001 .epoch_start_timestamp(),
1002 gas_data,
1003 gas_status,
1004 move_authenticators,
1005 aggregated_authenticator_input_objects,
1006 kind,
1007 signer,
1008 transaction.digest().to_owned(),
1009 tx_data_bytes,
1010 &mut None,
1011 );
1012
1013 if let Err(validation_error) = validation_result {
1014 return Err(IotaError::MoveAuthenticatorExecutionFailure {
1015 error: validation_error.to_string(),
1016 });
1017 }
1018 }
1019
1020 let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
1021
1022 let signed_transaction =
1023 VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1024
1025 self.get_cache_writer().try_acquire_transaction_locks(
1030 epoch_store,
1031 &owned_objects,
1032 signed_transaction.clone(),
1033 )?;
1034
1035 Ok(signed_transaction)
1036 }
1037
1038 #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()))]
1040 pub async fn handle_transaction(
1041 &self,
1042 epoch_store: &Arc<AuthorityPerEpochStore>,
1043 transaction: VerifiedTransaction,
1044 ) -> IotaResult<HandleTransactionResponse> {
1045 let tx_digest = *transaction.digest();
1046 debug!("handle_transaction");
1047
1048 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1050 return Ok(HandleTransactionResponse { status });
1051 }
1052
1053 let _metrics_guard = self
1054 .metrics
1055 .authority_state_handle_transaction_latency
1056 .start_timer();
1057 self.metrics.tx_orders.inc();
1058
1059 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1060 match signed {
1061 Ok(s) => {
1062 if self.is_committee_validator(epoch_store) {
1063 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1064 let tx = s.clone();
1065 let validator_tx_finalizer = validator_tx_finalizer.clone();
1066 let cache_reader = self.get_transaction_cache_reader().clone();
1067 let epoch_store = epoch_store.clone();
1068 spawn_monitored_task!(epoch_store.within_alive_epoch(
1069 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1070 ));
1071 }
1072 }
1073 Ok(HandleTransactionResponse {
1074 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1075 })
1076 }
1077 Err(err) => Ok(HandleTransactionResponse {
1081 status: self
1082 .get_transaction_status(&tx_digest, epoch_store)?
1083 .ok_or(err)?
1084 .1,
1085 }),
1086 }
1087 }
1088
1089 pub fn check_system_overload_at_signing(&self) -> bool {
1090 self.config
1091 .authority_overload_config
1092 .check_system_overload_at_signing
1093 }
1094
1095 pub fn check_system_overload_at_execution(&self) -> bool {
1096 self.config
1097 .authority_overload_config
1098 .check_system_overload_at_execution
1099 }
1100
1101 pub(crate) fn check_system_overload(
1102 &self,
1103 consensus_adapter: &Arc<ConsensusAdapter>,
1104 tx_data: &SenderSignedData,
1105 do_authority_overload_check: bool,
1106 ) -> IotaResult {
1107 if do_authority_overload_check {
1108 self.check_authority_overload(tx_data).tap_err(|_| {
1109 self.update_overload_metrics("execution_queue");
1110 })?;
1111 }
1112 self.transaction_manager
1113 .check_execution_overload(self.overload_config(), tx_data)
1114 .tap_err(|_| {
1115 self.update_overload_metrics("execution_pending");
1116 })?;
1117 consensus_adapter.check_consensus_overload().tap_err(|_| {
1118 self.update_overload_metrics("consensus");
1119 })?;
1120
1121 let pending_tx_count = self
1122 .get_cache_commit()
1123 .approximate_pending_transaction_count();
1124 if pending_tx_count
1125 > self
1126 .config
1127 .execution_cache_config
1128 .writeback_cache
1129 .backpressure_threshold_for_rpc()
1130 {
1131 return Err(IotaError::ValidatorOverloadedRetryAfter {
1132 retry_after_secs: 10,
1133 });
1134 }
1135
1136 Ok(())
1137 }
1138
1139 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1140 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1141 return Ok(());
1142 }
1143
1144 let load_shedding_percentage = self
1145 .overload_info
1146 .load_shedding_percentage
1147 .load(Ordering::Relaxed);
1148 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1149 }
1150
1151 fn update_overload_metrics(&self, source: &str) {
1152 self.metrics
1153 .transaction_overload_sources
1154 .with_label_values(&[source])
1155 .inc();
1156 }
1157
1158 #[instrument(level = "trace", skip_all)]
1160 pub async fn execute_certificate(
1161 &self,
1162 certificate: &VerifiedCertificate,
1163 epoch_store: &Arc<AuthorityPerEpochStore>,
1164 ) -> IotaResult<TransactionEffects> {
1165 let _metrics_guard = if certificate.contains_shared_object() {
1166 self.metrics
1167 .execute_certificate_latency_shared_object
1168 .start_timer()
1169 } else {
1170 self.metrics
1171 .execute_certificate_latency_single_writer
1172 .start_timer()
1173 };
1174 trace!("execute_certificate");
1175
1176 self.metrics.total_cert_attempts.inc();
1177
1178 if !certificate.contains_shared_object() {
1179 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1184 }
1185
1186 epoch_store
1189 .within_alive_epoch(self.notify_read_effects(certificate))
1190 .await
1191 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1192 .and_then(|r| r)
1193 }
1194
1195 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1210 pub fn try_execute_immediately(
1211 &self,
1212 certificate: &VerifiedExecutableTransaction,
1213 expected_effects_digest: Option<TransactionEffectsDigest>,
1214 epoch_store: &Arc<AuthorityPerEpochStore>,
1215 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1216 let _scope = monitored_scope("Execution::try_execute_immediately");
1217 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1218
1219 let tx_digest = certificate.digest();
1220
1221 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1223
1224 if let Some(effects) = self
1227 .get_transaction_cache_reader()
1228 .try_get_executed_effects(tx_digest)?
1229 {
1230 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1231 assert_eq!(
1232 effects.digest(),
1233 expected_effects_digest_inner,
1234 "Unexpected effects digest for transaction {tx_digest:?}"
1235 );
1236 }
1237 tx_guard.release();
1238 return Ok((effects, None));
1239 }
1240
1241 let (tx_input_objects, per_authenticator_inputs) =
1242 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1243
1244 let expected_effects_digest =
1250 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1251
1252 self.process_certificate(
1253 tx_guard,
1254 certificate,
1255 tx_input_objects,
1256 per_authenticator_inputs,
1257 expected_effects_digest,
1258 epoch_store,
1259 )
1260 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1261 .tap_ok(
1262 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1263 )
1264 }
1265
1266 pub fn read_objects_for_execution(
1267 &self,
1268 tx_lock: &CertLockGuard,
1269 certificate: &VerifiedExecutableTransaction,
1270 epoch_store: &Arc<AuthorityPerEpochStore>,
1271 ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1272 let _scope = monitored_scope("Execution::load_input_objects");
1273 let _metrics_guard = self
1274 .metrics
1275 .execution_load_input_objects_latency
1276 .start_timer();
1277
1278 let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1279
1280 let input_objects = self.input_loader.read_objects_for_execution(
1281 epoch_store,
1282 &certificate.key(),
1283 tx_lock,
1284 &input_objects,
1285 epoch_store.epoch(),
1286 )?;
1287
1288 certificate.split_input_objects_into_groups_for_reading(input_objects)
1289 }
1290
1291 pub fn try_execute_for_test(
1295 &self,
1296 certificate: &VerifiedCertificate,
1297 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1298 let epoch_store = self.epoch_store_for_testing();
1299 let (effects, execution_error_opt) = self.try_execute_immediately(
1300 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1301 None,
1302 &epoch_store,
1303 )?;
1304 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1305 Ok((signed_effects, execution_error_opt))
1306 }
1307
1308 pub fn execute_for_test(
1310 &self,
1311 certificate: &VerifiedCertificate,
1312 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1313 self.try_execute_for_test(certificate)
1314 .expect("try_execute_for_test should not fail")
1315 }
1316
1317 pub async fn notify_read_effects(
1318 &self,
1319 certificate: &VerifiedCertificate,
1320 ) -> IotaResult<TransactionEffects> {
1321 self.get_transaction_cache_reader()
1322 .try_notify_read_executed_effects(&[*certificate.digest()])
1323 .await
1324 .map(|mut r| r.pop().expect("must return correct number of effects"))
1325 }
1326
1327 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1328 self.get_object_cache_reader()
1329 .try_check_owned_objects_are_live(owned_object_refs)
1330 }
1331
1332 pub(crate) fn debug_dump_transaction_state(
1337 &self,
1338 tx_digest: &TransactionDigest,
1339 effects: &TransactionEffects,
1340 expected_effects_digest: TransactionEffectsDigest,
1341 inner_temporary_store: &InnerTemporaryStore,
1342 certificate: &VerifiedExecutableTransaction,
1343 debug_dump_config: &StateDebugDumpConfig,
1344 ) -> IotaResult<PathBuf> {
1345 let dump_dir = debug_dump_config
1346 .dump_file_directory
1347 .as_ref()
1348 .cloned()
1349 .unwrap_or(std::env::temp_dir());
1350 let epoch_store = self.load_epoch_store_one_call_per_task();
1351
1352 NodeStateDump::new(
1353 tx_digest,
1354 effects,
1355 expected_effects_digest,
1356 self.get_object_store().as_ref(),
1357 &epoch_store,
1358 inner_temporary_store,
1359 certificate,
1360 )?
1361 .write_to_file(&dump_dir)
1362 .map_err(|e| IotaError::FileIO(e.to_string()))
1363 }
1364
1365 #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?certificate.digest(), sender = ?certificate.data().transaction_data().gas_owner().to_string()))]
1366 pub(crate) fn process_certificate(
1367 &self,
1368 tx_guard: CertTxGuard,
1369 certificate: &VerifiedExecutableTransaction,
1370 tx_input_objects: InputObjects,
1371 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1372 expected_effects_digest: Option<TransactionEffectsDigest>,
1373 epoch_store: &Arc<AuthorityPerEpochStore>,
1374 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1375 let process_certificate_start_time = tokio::time::Instant::now();
1376 let digest = *certificate.digest();
1377
1378 let _scope = monitored_scope("Execution::process_certificate");
1379
1380 fail_point_if!("correlated-crash-process-certificate", || {
1381 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1382 iota_simulator::task::kill_current_node(None);
1383 }
1384 });
1385
1386 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1387 let execution_guard = match execution_guard {
1393 Ok(execution_guard) => execution_guard,
1394 Err(err) => {
1395 tx_guard.release();
1396 return Err(err);
1397 }
1398 };
1399 if *execution_guard != epoch_store.epoch() {
1403 tx_guard.release();
1404 info!("The epoch of the execution_guard doesn't match the epoch store");
1405 return Err(IotaError::WrongEpoch {
1406 expected_epoch: epoch_store.epoch(),
1407 actual_epoch: *execution_guard,
1408 });
1409 }
1410
1411 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1417 &execution_guard,
1418 certificate,
1419 tx_input_objects,
1420 per_authenticator_inputs,
1421 epoch_store,
1422 ) {
1423 Err(e) => {
1424 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1425 tx_guard.release();
1426 return Err(e);
1427 }
1428 Ok(res) => res,
1429 };
1430
1431 if let Some(expected_effects_digest) = expected_effects_digest {
1432 if effects.digest() != expected_effects_digest {
1433 match self.debug_dump_transaction_state(
1435 &digest,
1436 &effects,
1437 expected_effects_digest,
1438 &inner_temporary_store,
1439 certificate,
1440 &self.config.state_debug_dump_config,
1441 ) {
1442 Ok(out_path) => {
1443 info!(
1444 "Dumped node state for transaction {} to {}",
1445 digest,
1446 out_path.as_path().display().to_string()
1447 );
1448 }
1449 Err(e) => {
1450 error!("Error dumping state for transaction {}: {e}", digest);
1451 }
1452 }
1453 error!(
1454 tx_digest = ?digest,
1455 ?expected_effects_digest,
1456 actual_effects = ?effects,
1457 "fork detected!"
1458 );
1459 panic!(
1460 "Transaction {} is expected to have effects digest {}, but got {}!",
1461 digest,
1462 expected_effects_digest,
1463 effects.digest(),
1464 );
1465 }
1466 }
1467
1468 fail_point!("crash");
1469
1470 self.commit_certificate(
1471 certificate,
1472 inner_temporary_store,
1473 &effects,
1474 tx_guard,
1475 execution_guard,
1476 epoch_store,
1477 )?;
1478
1479 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1480 certificate.data().transaction_data().kind()
1481 {
1482 if let Some(err) = &execution_error_opt {
1483 debug_fatal!("Authenticator state update failed: {:?}", err);
1484 }
1485 epoch_store.update_authenticator_state(auth_state);
1486
1487 if cfg!(debug_assertions) {
1490 let authenticator_state = get_authenticator_state(self.get_object_store())
1491 .expect("Read cannot fail")
1492 .expect("Authenticator state must exist");
1493
1494 let mut sys_jwks: Vec<_> = authenticator_state
1495 .active_jwks
1496 .into_iter()
1497 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1498 .collect();
1499 let mut active_jwks: Vec<_> = epoch_store
1500 .signature_verifier
1501 .get_jwks()
1502 .into_iter()
1503 .collect();
1504 sys_jwks.sort();
1505 active_jwks.sort();
1506
1507 assert_eq!(sys_jwks, active_jwks);
1508 }
1509 }
1510
1511 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1512 if elapsed > 0.0 {
1513 self.metrics
1514 .execution_gas_latency_ratio
1515 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1516 };
1517 Ok((effects, execution_error_opt))
1518 }
1519
1520 #[instrument(level = "trace", skip_all)]
1521 fn commit_certificate(
1522 &self,
1523 certificate: &VerifiedExecutableTransaction,
1524 inner_temporary_store: InnerTemporaryStore,
1525 effects: &TransactionEffects,
1526 tx_guard: CertTxGuard,
1527 _execution_guard: ExecutionLockReadGuard<'_>,
1528 epoch_store: &Arc<AuthorityPerEpochStore>,
1529 ) -> IotaResult {
1530 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1531 monitored_scope("Execution::commit_certificate");
1532 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1533
1534 let tx_key = certificate.key();
1535 let tx_digest = certificate.digest();
1536 let input_object_count = inner_temporary_store.input_objects.len();
1537 let shared_object_count = effects.input_shared_objects().len();
1538
1539 let output_keys = inner_temporary_store.get_output_keys(effects);
1540
1541 let _ = self
1543 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1544 .tap_err(|e| {
1545 self.metrics.post_processing_total_failures.inc();
1546 error!(?tx_digest, "tx post processing failed: {e}");
1547 });
1548
1549 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1553
1554 fail_point!("crash");
1556
1557 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1558 certificate.clone().into_unsigned(),
1559 effects.clone(),
1560 inner_temporary_store,
1561 );
1562 self.get_cache_writer()
1563 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1564
1565 if certificate.transaction_data().is_end_of_epoch_tx() {
1566 self.get_object_cache_reader()
1569 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1570 }
1571
1572 tx_guard.commit_tx();
1574
1575 self.transaction_manager
1579 .notify_commit(tx_digest, output_keys, epoch_store);
1580
1581 self.update_metrics(certificate, input_object_count, shared_object_count);
1582
1583 Ok(())
1584 }
1585
1586 fn update_metrics(
1587 &self,
1588 certificate: &VerifiedExecutableTransaction,
1589 input_object_count: usize,
1590 shared_object_count: usize,
1591 ) {
1592 if certificate.has_zklogin_sig() {
1594 self.metrics.zklogin_sig_count.inc();
1595 } else if certificate.has_upgraded_multisig() {
1596 self.metrics.multisig_sig_count.inc();
1597 }
1598
1599 self.metrics.total_effects.inc();
1600 self.metrics.total_certs.inc();
1601
1602 if shared_object_count > 0 {
1603 self.metrics.shared_obj_tx.inc();
1604 }
1605
1606 if certificate.is_sponsored_tx() {
1607 self.metrics.sponsored_tx.inc();
1608 }
1609
1610 self.metrics
1611 .num_input_objs
1612 .observe(input_object_count as f64);
1613 self.metrics
1614 .num_shared_objects
1615 .observe(shared_object_count as f64);
1616 self.metrics.batch_size.observe(
1617 certificate
1618 .data()
1619 .intent_message()
1620 .value
1621 .kind()
1622 .num_commands() as f64,
1623 );
1624 }
1625
1626 #[instrument(level = "trace", skip_all)]
1638 fn prepare_certificate(
1639 &self,
1640 _execution_guard: &ExecutionLockReadGuard<'_>,
1641 certificate: &VerifiedExecutableTransaction,
1642 tx_input_objects: InputObjects,
1643 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1644 epoch_store: &Arc<AuthorityPerEpochStore>,
1645 ) -> IotaResult<(
1646 InnerTemporaryStore,
1647 TransactionEffects,
1648 Option<ExecutionError>,
1649 )> {
1650 let _scope = monitored_scope("Execution::prepare_certificate");
1651 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1652 let prepare_certificate_start_time = tokio::time::Instant::now();
1653
1654 let protocol_config = epoch_store.protocol_config();
1655
1656 let reference_gas_price = epoch_store.reference_gas_price();
1657
1658 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1659 let epoch_start_timestamp = epoch_store
1660 .epoch_start_config()
1661 .epoch_data()
1662 .epoch_start_timestamp();
1663
1664 let backing_store = self.get_backing_store().as_ref();
1665
1666 let tx_digest = *certificate.digest();
1667
1668 let tx_data = certificate.data().transaction_data();
1671 tx_data.validity_check(protocol_config)?;
1672
1673 let (kind, signer, gas_data) = tx_data.execution_parts();
1674
1675 let move_authenticators = certificate.move_authenticators();
1676
1677 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1678 let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1679 .is_empty()
1680 {
1681 let (tx_gas_status, tx_checked_input_objects) =
1686 iota_transaction_checks::check_certificate_input(
1687 certificate,
1688 tx_input_objects,
1689 protocol_config,
1690 reference_gas_price,
1691 )?;
1692
1693 let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1694 self.check_owned_locks(&owned_object_refs)?;
1695 epoch_store.executor().execute_transaction_to_effects(
1696 backing_store,
1697 protocol_config,
1698 self.metrics.limits_metrics.clone(),
1699 self.config
1702 .expensive_safety_check_config
1703 .enable_deep_per_tx_iota_conservation_check(),
1704 self.config.certificate_deny_config.certificate_deny_set(),
1705 &epoch_id,
1706 epoch_start_timestamp,
1707 tx_checked_input_objects,
1708 gas_data,
1709 tx_gas_status,
1710 kind,
1711 signer,
1712 tx_digest,
1713 &mut None,
1714 )
1715 } else {
1716 debug_assert_eq!(
1722 move_authenticators.len(),
1723 per_authenticator_inputs.len(),
1724 "Move authenticators amount must match the number of authenticator inputs"
1725 );
1726
1727 let per_authenticator_inputs = move_authenticators
1728 .iter()
1729 .zip(per_authenticator_inputs)
1730 .map(
1731 |(move_authenticator, (authenticator_input_objects, account_object))| {
1732 let (
1735 auth_account_object_id,
1736 auth_account_object_seq_number,
1737 auth_account_object_digest,
1738 ) = move_authenticator.object_to_authenticate_components()?;
1739
1740 let signer = move_authenticator.address()?;
1741
1742 let authenticator_function_ref_for_execution = self.check_move_account(
1743 auth_account_object_id,
1744 auth_account_object_seq_number,
1745 auth_account_object_digest,
1746 account_object,
1747 &signer,
1748 )?;
1749
1750 Ok((
1751 authenticator_input_objects,
1752 authenticator_function_ref_for_execution,
1753 ))
1754 },
1755 )
1756 .collect::<IotaResult<Vec<_>>>()?;
1757
1758 let per_authenticator_input_objects = per_authenticator_inputs
1759 .iter()
1760 .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1761 .collect::<Vec<_>>();
1762
1763 let tx_data_bytes =
1765 bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1766
1767 let authenticator_gas_budget = protocol_config.max_auth_gas();
1772 let (
1773 gas_status,
1774 per_authenticator_checked_input_objects,
1775 authenticator_and_tx_checked_input_objects,
1776 ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1777 certificate,
1778 tx_input_objects,
1779 per_authenticator_input_objects,
1780 authenticator_gas_budget,
1781 protocol_config,
1782 reference_gas_price,
1783 )?;
1784
1785 debug_assert_eq!(
1786 move_authenticators.len(),
1787 per_authenticator_checked_input_objects.len(),
1788 "Move authenticators amount must match the number of checked authenticator inputs"
1789 );
1790
1791 let move_authenticators = move_authenticators
1792 .into_iter()
1793 .zip(per_authenticator_inputs)
1794 .zip(per_authenticator_checked_input_objects)
1795 .map(
1796 |(
1797 (move_authenticator, (_, authenticator_function_ref_for_execution)),
1798 authenticator_checked_input_objects,
1799 )| {
1800 (
1801 move_authenticator.to_owned(),
1802 authenticator_function_ref_for_execution,
1803 authenticator_checked_input_objects,
1804 )
1805 },
1806 )
1807 .collect::<Vec<_>>();
1808
1809 let owned_object_refs = authenticator_and_tx_checked_input_objects
1810 .inner()
1811 .filter_owned_objects();
1812 self.check_owned_locks(&owned_object_refs)?;
1813
1814 epoch_store
1815 .executor()
1816 .authenticate_then_execute_transaction_to_effects(
1817 backing_store,
1818 protocol_config,
1819 self.metrics.limits_metrics.clone(),
1820 self.config
1821 .expensive_safety_check_config
1822 .enable_deep_per_tx_iota_conservation_check(),
1823 self.config.certificate_deny_config.certificate_deny_set(),
1824 &epoch_id,
1825 epoch_start_timestamp,
1826 gas_data,
1827 gas_status,
1828 move_authenticators,
1829 authenticator_and_tx_checked_input_objects,
1830 kind,
1831 signer,
1832 tx_digest,
1833 tx_data_bytes,
1834 &mut None,
1835 )
1836 };
1837
1838 fail_point_if!("cp_execution_nondeterminism", || {
1839 #[cfg(msim)]
1840 self.create_fail_state(certificate, epoch_store, &mut effects);
1841 });
1842
1843 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1844 if elapsed > 0.0 {
1845 self.metrics
1846 .prepare_cert_gas_latency_ratio
1847 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1848 }
1849
1850 Ok((inner_temp_store, effects, execution_error_opt.err()))
1851 }
1852
1853 pub fn prepare_certificate_for_benchmark(
1854 &self,
1855 certificate: &VerifiedExecutableTransaction,
1856 input_objects: InputObjects,
1857 epoch_store: &Arc<AuthorityPerEpochStore>,
1858 ) -> IotaResult<(
1859 InnerTemporaryStore,
1860 TransactionEffects,
1861 Option<ExecutionError>,
1862 )> {
1863 let lock = RwLock::new(epoch_store.epoch());
1864 let execution_guard = lock.try_read().unwrap();
1865
1866 self.prepare_certificate(
1867 &execution_guard,
1868 certificate,
1869 input_objects,
1870 vec![],
1871 epoch_store,
1872 )
1873 }
1874
1875 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1878 #[allow(clippy::type_complexity)]
1879 pub fn dry_exec_transaction(
1880 &self,
1881 transaction: TransactionData,
1882 transaction_digest: TransactionDigest,
1883 ) -> IotaResult<(
1884 DryRunTransactionBlockResponse,
1885 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1886 TransactionEffects,
1887 Option<ObjectID>,
1888 )> {
1889 let epoch_store = self.load_epoch_store_one_call_per_task();
1890 if !self.is_fullnode(&epoch_store) {
1891 return Err(IotaError::UnsupportedFeature {
1892 error: "dry-exec is only supported on fullnodes".to_string(),
1893 });
1894 }
1895
1896 if transaction.kind().is_system_tx() {
1897 return Err(IotaError::UnsupportedFeature {
1898 error: "dry-exec does not support system transactions".to_string(),
1899 });
1900 }
1901
1902 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1903 }
1904
1905 #[allow(clippy::type_complexity)]
1906 pub fn dry_exec_transaction_for_benchmark(
1907 &self,
1908 transaction: TransactionData,
1909 transaction_digest: TransactionDigest,
1910 ) -> IotaResult<(
1911 DryRunTransactionBlockResponse,
1912 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1913 TransactionEffects,
1914 Option<ObjectID>,
1915 )> {
1916 let epoch_store = self.load_epoch_store_one_call_per_task();
1917 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1918 }
1919
1920 #[instrument(level = "trace", skip_all)]
1921 #[allow(clippy::type_complexity)]
1922 fn dry_exec_transaction_impl(
1923 &self,
1924 epoch_store: &AuthorityPerEpochStore,
1925 transaction: TransactionData,
1926 transaction_digest: TransactionDigest,
1927 ) -> IotaResult<(
1928 DryRunTransactionBlockResponse,
1929 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1930 TransactionEffects,
1931 Option<ObjectID>,
1932 )> {
1933 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1935
1936 let input_object_kinds = transaction.input_objects()?;
1937 let receiving_object_refs = transaction.receiving_objects();
1938
1939 iota_transaction_checks::deny::check_transaction_for_signing(
1940 &transaction,
1941 &[],
1942 &input_object_kinds,
1943 &receiving_object_refs,
1944 &self.config.transaction_deny_config,
1945 self.get_backing_package_store().as_ref(),
1946 )?;
1947
1948 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1949 None,
1951 &input_object_kinds,
1952 &receiving_object_refs,
1953 epoch_store.epoch(),
1954 )?;
1955
1956 let mut transaction = transaction;
1958 let reference_gas_price = epoch_store.reference_gas_price();
1959 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1960 let sender = transaction.gas_owner();
1961 let gas_object_id = ObjectID::random();
1962 let gas_object = Object::new_move(
1963 MoveObject::new_gas_coin(
1964 OBJECT_START_VERSION,
1965 gas_object_id,
1966 SIMULATION_GAS_COIN_VALUE,
1967 ),
1968 Owner::AddressOwner(sender),
1969 TransactionDigest::genesis_marker(),
1970 );
1971 let gas_object_ref = gas_object.compute_object_reference();
1972 transaction.gas_data_mut().payment = vec![gas_object_ref];
1974 (
1975 iota_transaction_checks::check_transaction_input_with_given_gas(
1976 epoch_store.protocol_config(),
1977 reference_gas_price,
1978 &transaction,
1979 input_objects,
1980 receiving_objects,
1981 gas_object,
1982 &self.metrics.bytecode_verifier_metrics,
1983 &self.config.verifier_signing_config,
1984 )?,
1985 Some(gas_object_id),
1986 )
1987 } else {
1988 let authenticator_gas_budget = 0;
1991
1992 (
1993 iota_transaction_checks::check_transaction_input(
1994 epoch_store.protocol_config(),
1995 reference_gas_price,
1996 &transaction,
1997 input_objects,
1998 &receiving_objects,
1999 &self.metrics.bytecode_verifier_metrics,
2000 &self.config.verifier_signing_config,
2001 authenticator_gas_budget,
2002 )?,
2003 None,
2004 )
2005 };
2006
2007 let protocol_config = epoch_store.protocol_config();
2008 let (kind, signer, gas_data) = transaction.execution_parts();
2009
2010 let silent = true;
2011 let executor = iota_execution::executor(protocol_config, silent, None)
2012 .expect("Creating an executor should not fail here");
2013
2014 let expensive_checks = false;
2015 let (inner_temp_store, _, effects, execution_error) = executor
2016 .execute_transaction_to_effects(
2017 self.get_backing_store().as_ref(),
2018 protocol_config,
2019 self.metrics.limits_metrics.clone(),
2020 expensive_checks,
2021 self.config.certificate_deny_config.certificate_deny_set(),
2022 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2023 epoch_store
2024 .epoch_start_config()
2025 .epoch_data()
2026 .epoch_start_timestamp(),
2027 checked_input_objects,
2028 gas_data,
2029 gas_status,
2030 kind,
2031 signer,
2032 transaction_digest,
2033 &mut None,
2034 );
2035 let tx_digest = *effects.transaction_digest();
2036
2037 let module_cache =
2038 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2039
2040 let mut layout_resolver =
2041 epoch_store
2042 .executor()
2043 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2044 &inner_temp_store,
2045 self.get_backing_package_store(),
2046 )));
2047 let object_changes = Vec::new();
2049
2050 let balance_changes = Vec::new();
2052
2053 let written_with_kind = effects
2054 .created()
2055 .into_iter()
2056 .map(|(oref, _)| (oref, WriteKind::Create))
2057 .chain(
2058 effects
2059 .unwrapped()
2060 .into_iter()
2061 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2062 )
2063 .chain(
2064 effects
2065 .mutated()
2066 .into_iter()
2067 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2068 )
2069 .map(|(oref, kind)| {
2070 let obj = inner_temp_store.written.get(&oref.0).unwrap();
2071 (oref.0, (oref, obj.clone(), kind))
2073 })
2074 .collect();
2075
2076 let execution_error_source = execution_error
2077 .as_ref()
2078 .err()
2079 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2080
2081 Ok((
2082 DryRunTransactionBlockResponse {
2083 suggested_gas_price: self
2085 .congestion_tracker
2086 .get_prediction_suggested_gas_price(&transaction),
2087 input: IotaTransactionBlockData::try_from_with_module_cache(
2088 transaction,
2089 &module_cache,
2090 tx_digest,
2091 )
2092 .map_err(|e| IotaError::TransactionSerialization {
2093 error: format!(
2094 "Failed to convert transaction to IotaTransactionBlockData: {e}",
2095 ),
2096 })?, effects: effects.clone().try_into()?,
2098 events: IotaTransactionBlockEvents::try_from(
2099 inner_temp_store.events.clone(),
2100 tx_digest,
2101 None,
2102 layout_resolver.as_mut(),
2103 )?,
2104 object_changes,
2105 balance_changes,
2106 execution_error_source,
2107 },
2108 written_with_kind,
2109 effects,
2110 mock_gas,
2111 ))
2112 }
2113
2114 pub fn simulate_transaction(
2115 &self,
2116 mut transaction: TransactionData,
2117 checks: VmChecks,
2118 ) -> IotaResult<SimulateTransactionResult> {
2119 if transaction.kind().is_system_tx() {
2120 return Err(IotaError::UnsupportedFeature {
2121 error: "simulate does not support system transactions".to_string(),
2122 });
2123 }
2124
2125 let epoch_store = self.load_epoch_store_one_call_per_task();
2126 if !self.is_fullnode(&epoch_store) {
2127 return Err(IotaError::UnsupportedFeature {
2128 error: "simulate is only supported on fullnodes".to_string(),
2129 });
2130 }
2131
2132 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2136
2137 let input_object_kinds = transaction.input_objects()?;
2138 let receiving_object_refs = transaction.receiving_objects();
2139
2140 iota_transaction_checks::deny::check_transaction_for_signing(
2143 &transaction,
2144 &[],
2145 &input_object_kinds,
2146 &receiving_object_refs,
2147 &self.config.transaction_deny_config,
2148 self.get_backing_package_store().as_ref(),
2149 )?;
2150
2151 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2153 None,
2155 &input_object_kinds,
2156 &receiving_object_refs,
2157 epoch_store.epoch(),
2158 )?;
2159
2160 let mock_gas_id = if transaction.gas().is_empty() {
2162 let mock_gas_object = Object::new_move(
2163 MoveObject::new_gas_coin(
2164 OBJECT_START_VERSION,
2165 ObjectID::MAX,
2166 SIMULATION_GAS_COIN_VALUE,
2167 ),
2168 Owner::AddressOwner(transaction.gas_data().owner),
2169 TransactionDigest::genesis_marker(),
2170 );
2171 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2172 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2173 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2174 Some(mock_gas_object.id())
2175 } else {
2176 None
2177 };
2178
2179 let protocol_config = epoch_store.protocol_config();
2180
2181 let authenticator_gas_budget = 0;
2184
2185 let (gas_status, checked_input_objects) = if checks.enabled() {
2188 iota_transaction_checks::check_transaction_input(
2189 protocol_config,
2190 epoch_store.reference_gas_price(),
2191 &transaction,
2192 input_objects,
2193 &receiving_objects,
2194 &self.metrics.bytecode_verifier_metrics,
2195 &self.config.verifier_signing_config,
2196 authenticator_gas_budget,
2197 )?
2198 } else {
2199 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2200 protocol_config,
2201 transaction.kind(),
2202 input_objects,
2203 receiving_objects,
2204 )?;
2205 let gas_status = IotaGasStatus::new(
2206 transaction.gas_budget(),
2207 transaction.gas_price(),
2208 epoch_store.reference_gas_price(),
2209 protocol_config,
2210 )?;
2211
2212 (gas_status, checked_input_objects)
2213 };
2214
2215 let executor = iota_execution::executor(
2217 protocol_config,
2218 true, None,
2220 )
2221 .expect("Creating an executor should not fail here");
2222
2223 let (kind, signer, gas_data) = transaction.execution_parts();
2225 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2226 self.get_backing_store().as_ref(),
2227 protocol_config,
2228 self.metrics.limits_metrics.clone(),
2229 false, self.config.certificate_deny_config.certificate_deny_set(),
2231 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2232 epoch_store
2233 .epoch_start_config()
2234 .epoch_data()
2235 .epoch_start_timestamp(),
2236 checked_input_objects,
2237 gas_data,
2238 gas_status,
2239 kind,
2240 signer,
2241 transaction.digest(),
2242 checks.disabled(),
2243 );
2244
2245 Ok(SimulateTransactionResult {
2248 input_objects: inner_temp_store.input_objects,
2249 output_objects: inner_temp_store.written,
2250 events: effects.events_digest().map(|_| inner_temp_store.events),
2251 effects,
2252 execution_result,
2253 suggested_gas_price: self
2254 .congestion_tracker
2255 .get_prediction_suggested_gas_price(&transaction),
2256 mock_gas_id,
2257 })
2258 }
2259
2260 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2265 pub async fn dev_inspect_transaction_block(
2266 &self,
2267 sender: IotaAddress,
2268 transaction_kind: TransactionKind,
2269 gas_price: Option<u64>,
2270 gas_budget: Option<u64>,
2271 gas_sponsor: Option<IotaAddress>,
2272 gas_objects: Option<Vec<ObjectRef>>,
2273 show_raw_txn_data_and_effects: Option<bool>,
2274 skip_checks: Option<bool>,
2275 ) -> IotaResult<DevInspectResults> {
2276 let epoch_store = self.load_epoch_store_one_call_per_task();
2277
2278 if !self.is_fullnode(&epoch_store) {
2279 return Err(IotaError::UnsupportedFeature {
2280 error: "dev-inspect is only supported on fullnodes".to_string(),
2281 });
2282 }
2283
2284 if transaction_kind.is_system_tx() {
2285 return Err(IotaError::UnsupportedFeature {
2286 error: "system transactions are not supported".to_string(),
2287 });
2288 }
2289
2290 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2291 let skip_checks = skip_checks.unwrap_or(true);
2292 let reference_gas_price = epoch_store.reference_gas_price();
2293 let protocol_config = epoch_store.protocol_config();
2294 let max_tx_gas = protocol_config.max_tx_gas();
2295
2296 let price = gas_price.unwrap_or(reference_gas_price);
2297 let budget = gas_budget.unwrap_or(max_tx_gas);
2298 let owner = gas_sponsor.unwrap_or(sender);
2299 let payment = gas_objects.unwrap_or_default();
2302 let mut transaction = TransactionData::V1(TransactionDataV1 {
2303 kind: transaction_kind.clone(),
2304 sender,
2305 gas_data: GasData {
2306 payment,
2307 owner,
2308 price,
2309 budget,
2310 },
2311 expiration: TransactionExpiration::None,
2312 });
2313
2314 let raw_txn_data = if show_raw_txn_data_and_effects {
2315 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2316 error: "Failed to serialize transaction during dev inspect".to_string(),
2317 })?
2318 } else {
2319 vec![]
2320 };
2321
2322 transaction.validity_check_no_gas_check(protocol_config)?;
2323
2324 let input_object_kinds = transaction.input_objects()?;
2325 let receiving_object_refs = transaction.receiving_objects();
2326
2327 iota_transaction_checks::deny::check_transaction_for_signing(
2328 &transaction,
2329 &[],
2330 &input_object_kinds,
2331 &receiving_object_refs,
2332 &self.config.transaction_deny_config,
2333 self.get_backing_package_store().as_ref(),
2334 )?;
2335
2336 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2337 None,
2339 &input_object_kinds,
2340 &receiving_object_refs,
2341 epoch_store.epoch(),
2342 )?;
2343
2344 let (gas_status, checked_input_objects) = if skip_checks {
2345 if transaction.gas().is_empty() {
2350 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2352 SIMULATION_GAS_COIN_VALUE,
2353 transaction.gas_owner(),
2354 );
2355 let gas_object_ref = dummy_gas_object.compute_object_reference();
2356 transaction.gas_data_mut().payment = vec![gas_object_ref];
2357 input_objects.push(ObjectReadResult::new(
2358 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2359 dummy_gas_object.into(),
2360 ));
2361 }
2362 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2363 protocol_config,
2364 &transaction_kind,
2365 input_objects,
2366 receiving_objects,
2367 )?;
2368 let gas_status = IotaGasStatus::new(
2369 max_tx_gas,
2370 transaction.gas_price(),
2371 reference_gas_price,
2372 protocol_config,
2373 )?;
2374
2375 (gas_status, checked_input_objects)
2376 } else {
2377 if transaction.gas().is_empty() {
2381 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2383 SIMULATION_GAS_COIN_VALUE,
2384 transaction.gas_owner(),
2385 );
2386 let gas_object_ref = dummy_gas_object.compute_object_reference();
2387 transaction.gas_data_mut().payment = vec![gas_object_ref];
2388 iota_transaction_checks::check_transaction_input_with_given_gas(
2389 epoch_store.protocol_config(),
2390 reference_gas_price,
2391 &transaction,
2392 input_objects,
2393 receiving_objects,
2394 dummy_gas_object,
2395 &self.metrics.bytecode_verifier_metrics,
2396 &self.config.verifier_signing_config,
2397 )?
2398 } else {
2399 let authenticator_gas_budget = 0;
2402
2403 iota_transaction_checks::check_transaction_input(
2404 epoch_store.protocol_config(),
2405 reference_gas_price,
2406 &transaction,
2407 input_objects,
2408 &receiving_objects,
2409 &self.metrics.bytecode_verifier_metrics,
2410 &self.config.verifier_signing_config,
2411 authenticator_gas_budget,
2412 )?
2413 }
2414 };
2415
2416 let executor = iota_execution::executor(protocol_config, true, None)
2417 .expect("Creating an executor should not fail here");
2418 let gas_data = transaction.gas_data().clone();
2419 let intent_msg = IntentMessage::new(
2420 Intent {
2421 version: IntentVersion::V0,
2422 scope: IntentScope::TransactionData,
2423 app_id: IntentAppId::Iota,
2424 },
2425 transaction,
2426 );
2427 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2428 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2429 self.get_backing_store().as_ref(),
2430 protocol_config,
2431 self.metrics.limits_metrics.clone(),
2432 false,
2434 self.config.certificate_deny_config.certificate_deny_set(),
2435 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2436 epoch_store
2437 .epoch_start_config()
2438 .epoch_data()
2439 .epoch_start_timestamp(),
2440 checked_input_objects,
2441 gas_data,
2442 gas_status,
2443 transaction_kind,
2444 sender,
2445 transaction_digest,
2446 skip_checks,
2447 );
2448
2449 let raw_effects = if show_raw_txn_data_and_effects {
2450 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2451 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2452 })?
2453 } else {
2454 vec![]
2455 };
2456
2457 let mut layout_resolver =
2458 epoch_store
2459 .executor()
2460 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2461 &inner_temp_store,
2462 self.get_backing_package_store(),
2463 )));
2464
2465 DevInspectResults::new(
2466 effects,
2467 inner_temp_store.events.clone(),
2468 execution_result,
2469 raw_txn_data,
2470 raw_effects,
2471 layout_resolver.as_mut(),
2472 )
2473 }
2474
2475 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2477 let epoch_store = self.epoch_store_for_testing();
2478 Ok(epoch_store.reference_gas_price())
2479 }
2480
2481 #[instrument(level = "trace", skip_all)]
2482 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2483 self.get_transaction_cache_reader()
2484 .try_is_tx_already_executed(digest)
2485 }
2486
2487 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2489 self.try_is_tx_already_executed(digest)
2490 .expect("storage access failed")
2491 }
2492
2493 #[instrument(level = "debug", skip_all, err)]
2495 fn index_tx(
2496 &self,
2497 indexes: &IndexStore,
2498 digest: &TransactionDigest,
2499 cert: &VerifiedExecutableTransaction,
2501 effects: &TransactionEffects,
2502 events: &TransactionEvents,
2503 timestamp_ms: u64,
2504 tx_coins: Option<TxCoins>,
2505 written: &WrittenObjects,
2506 inner_temporary_store: &InnerTemporaryStore,
2507 ) -> IotaResult<u64> {
2508 let changes = self
2509 .process_object_index(effects, written, inner_temporary_store)
2510 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2511
2512 indexes.index_tx(
2513 cert.data().intent_message().value.sender(),
2514 cert.data()
2515 .intent_message()
2516 .value
2517 .input_objects()?
2518 .iter()
2519 .map(|o| o.object_id()),
2520 effects
2521 .all_changed_objects()
2522 .into_iter()
2523 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2524 cert.data()
2525 .intent_message()
2526 .value
2527 .move_calls()
2528 .into_iter()
2529 .map(|(package, module, function)| {
2530 (*package, module.to_owned(), function.to_owned())
2531 }),
2532 events,
2533 changes,
2534 digest,
2535 timestamp_ms,
2536 tx_coins,
2537 )
2538 }
2539
2540 #[cfg(msim)]
2541 fn create_fail_state(
2542 &self,
2543 certificate: &VerifiedExecutableTransaction,
2544 epoch_store: &Arc<AuthorityPerEpochStore>,
2545 effects: &mut TransactionEffects,
2546 ) {
2547 use std::cell::RefCell;
2548 thread_local! {
2549 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2550 }
2551 if !certificate.data().intent_message().value.is_system_tx() {
2552 let committee = epoch_store.committee();
2553 let cur_stake = (**committee).weight(&self.name);
2554 if cur_stake > 0 {
2555 FAIL_STATE.with_borrow_mut(|fail_state| {
2556 if fail_state.0 < committee.validity_threshold() {
2558 fail_state.0 += cur_stake;
2559 fail_state.1.insert(self.name);
2560 }
2561
2562 if fail_state.1.contains(&self.name) {
2563 info!("cp_exec failing tx");
2564 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2565 }
2566 });
2567 }
2568 }
2569 }
2570
2571 fn process_object_index(
2572 &self,
2573 effects: &TransactionEffects,
2574 written: &WrittenObjects,
2575 inner_temporary_store: &InnerTemporaryStore,
2576 ) -> IotaResult<ObjectIndexChanges> {
2577 let epoch_store = self.load_epoch_store_one_call_per_task();
2578 let mut layout_resolver =
2579 epoch_store
2580 .executor()
2581 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2582 inner_temporary_store,
2583 self.get_backing_package_store(),
2584 )));
2585
2586 let modified_at_version = effects
2587 .modified_at_versions()
2588 .into_iter()
2589 .collect::<HashMap<_, _>>();
2590
2591 let tx_digest = effects.transaction_digest();
2592 let mut deleted_owners = vec![];
2593 let mut deleted_dynamic_fields = vec![];
2594 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2595 let old_version = modified_at_version.get(&id).unwrap();
2596 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2599 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2600 ) {
2601 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2602 Owner::ObjectOwner(object_id) => {
2603 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2604 }
2605 _ => {}
2606 }
2607 }
2608
2609 let mut new_owners = vec![];
2610 let mut new_dynamic_fields = vec![];
2611
2612 for (oref, owner, kind) in effects.all_changed_objects() {
2613 let id = &oref.0;
2614 if let WriteKind::Mutate = kind {
2617 let Some(old_version) = modified_at_version.get(id) else {
2618 panic!(
2619 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2620 );
2621 };
2622 let Some(old_object) = self
2625 .get_object_store()
2626 .try_get_object_by_key(id, *old_version)?
2627 else {
2628 panic!(
2629 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2630 );
2631 };
2632 if old_object.owner != owner {
2633 match old_object.owner {
2634 Owner::AddressOwner(addr) => {
2635 deleted_owners.push((addr, *id));
2636 }
2637 Owner::ObjectOwner(object_id) => {
2638 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2639 }
2640 _ => {}
2641 }
2642 }
2643 }
2644
2645 match owner {
2646 Owner::AddressOwner(addr) => {
2647 let new_object = written.get(id).unwrap_or_else(
2650 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2651 );
2652 assert_eq!(
2653 new_object.version(),
2654 oref.1,
2655 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2656 tx_digest,
2657 id,
2658 new_object.version(),
2659 oref.1
2660 );
2661
2662 let type_ = new_object
2663 .type_()
2664 .map(|type_| ObjectType::Struct(type_.clone()))
2665 .unwrap_or(ObjectType::Package);
2666
2667 new_owners.push((
2668 (addr, *id),
2669 ObjectInfo {
2670 object_id: *id,
2671 version: oref.1,
2672 digest: oref.2,
2673 type_,
2674 owner,
2675 previous_transaction: *effects.transaction_digest(),
2676 },
2677 ));
2678 }
2679 Owner::ObjectOwner(owner) => {
2680 let new_object = written.get(id).unwrap_or_else(
2681 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2682 );
2683 assert_eq!(
2684 new_object.version(),
2685 oref.1,
2686 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2687 tx_digest,
2688 id,
2689 new_object.version(),
2690 oref.1
2691 );
2692
2693 let Some(df_info) = self
2694 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2695 .unwrap_or_else(|e| {
2696 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2697 None
2698 }
2699 )
2700 else {
2701 continue;
2703 };
2704 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2705 }
2706 _ => {}
2707 }
2708 }
2709
2710 Ok(ObjectIndexChanges {
2711 deleted_owners,
2712 deleted_dynamic_fields,
2713 new_owners,
2714 new_dynamic_fields,
2715 })
2716 }
2717
2718 fn try_create_dynamic_field_info(
2719 &self,
2720 o: &Object,
2721 written: &WrittenObjects,
2722 resolver: &mut dyn LayoutResolver,
2723 ) -> IotaResult<Option<DynamicFieldInfo>> {
2724 let Some(move_object) = o.data.try_as_move().cloned() else {
2726 return Ok(None);
2727 };
2728
2729 if !move_object.type_().is_dynamic_field() {
2731 return Ok(None);
2732 }
2733
2734 let layout = resolver
2735 .get_annotated_layout(&move_object.type_().clone().into())?
2736 .into_layout();
2737
2738 let field =
2739 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2740 IotaError::ObjectDeserialization {
2741 error: e.to_string(),
2742 }
2743 })?;
2744
2745 let type_ = field.kind;
2746 let name_type: TypeTag = field.name_layout.into();
2747 let bcs_name = field.name_bytes.to_owned();
2748
2749 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2750 .map_err(|e| {
2751 warn!("{e}");
2752 IotaError::ObjectDeserialization {
2753 error: e.to_string(),
2754 }
2755 })?;
2756
2757 let name = DynamicFieldName {
2758 type_: name_type,
2759 value: IotaMoveValue::from(name_value).to_json_value(),
2760 };
2761
2762 let value_metadata = field.value_metadata().map_err(|e| {
2763 warn!("{e}");
2764 IotaError::ObjectDeserialization {
2765 error: e.to_string(),
2766 }
2767 })?;
2768
2769 Ok(Some(match value_metadata {
2770 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2771 name,
2772 bcs_name,
2773 type_,
2774 object_type: object_type.to_canonical_string(true),
2775 object_id: o.id(),
2776 version: o.version(),
2777 digest: o.digest(),
2778 },
2779
2780 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2781 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2786 (
2787 object.version(),
2788 object.digest(),
2789 object.data.type_().unwrap().clone(),
2790 )
2791 } else {
2792 let object = self
2794 .get_object_store()
2795 .try_get_object_by_key(&object_id, o.version())?
2796 .ok_or_else(|| UserInputError::ObjectNotFound {
2797 object_id,
2798 version: Some(o.version()),
2799 })?;
2800 let version = object.version();
2801 let digest = object.digest();
2802 let object_type = object.data.type_().unwrap().clone();
2803 (version, digest, object_type)
2804 };
2805
2806 DynamicFieldInfo {
2807 name,
2808 bcs_name,
2809 type_,
2810 object_type: object_type.to_string(),
2811 object_id,
2812 version,
2813 digest,
2814 }
2815 }
2816 }))
2817 }
2818
2819 #[instrument(level = "trace", skip_all, err)]
2820 fn post_process_one_tx(
2821 &self,
2822 certificate: &VerifiedExecutableTransaction,
2823 effects: &TransactionEffects,
2824 inner_temporary_store: &InnerTemporaryStore,
2825 epoch_store: &Arc<AuthorityPerEpochStore>,
2826 ) -> IotaResult {
2827 if self.indexes.is_none() {
2828 return Ok(());
2829 }
2830
2831 let _scope = monitored_scope("Execution::post_process_one_tx");
2832
2833 let tx_digest = certificate.digest();
2834 let timestamp_ms = Self::unixtime_now_ms();
2835 let events = &inner_temporary_store.events;
2836 let written = &inner_temporary_store.written;
2837 let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2838 effects,
2839 inner_temporary_store,
2840 epoch_store,
2841 );
2842
2843 if let Some(indexes) = &self.indexes {
2845 let _ = self
2846 .index_tx(
2847 indexes.as_ref(),
2848 tx_digest,
2849 certificate,
2850 effects,
2851 events,
2852 timestamp_ms,
2853 tx_coins,
2854 written,
2855 inner_temporary_store,
2856 )
2857 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2858 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2859 .expect("Indexing tx should not fail");
2860
2861 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2862 let events = self.make_transaction_block_events(
2863 events.clone(),
2864 *tx_digest,
2865 timestamp_ms,
2866 epoch_store,
2867 inner_temporary_store,
2868 )?;
2869 self.subscription_handler
2871 .process_tx(certificate.data().transaction_data(), &effects, &events)
2872 .tap_ok(|_| {
2873 self.metrics
2874 .post_processing_total_tx_had_event_processed
2875 .inc()
2876 })
2877 .tap_err(|e| {
2878 warn!(
2879 ?tx_digest,
2880 "Post processing - Couldn't process events for tx: {}", e
2881 )
2882 })?;
2883
2884 self.metrics
2885 .post_processing_total_events_emitted
2886 .inc_by(events.data.len() as u64);
2887 };
2888 Ok(())
2889 }
2890
2891 fn make_transaction_block_events(
2892 &self,
2893 transaction_events: TransactionEvents,
2894 digest: TransactionDigest,
2895 timestamp_ms: u64,
2896 epoch_store: &Arc<AuthorityPerEpochStore>,
2897 inner_temporary_store: &InnerTemporaryStore,
2898 ) -> IotaResult<IotaTransactionBlockEvents> {
2899 let mut layout_resolver =
2900 epoch_store
2901 .executor()
2902 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2903 inner_temporary_store,
2904 self.get_backing_package_store(),
2905 )));
2906 IotaTransactionBlockEvents::try_from(
2907 transaction_events,
2908 digest,
2909 Some(timestamp_ms),
2910 layout_resolver.as_mut(),
2911 )
2912 }
2913
2914 pub fn unixtime_now_ms() -> u64 {
2915 let now = SystemTime::now()
2916 .duration_since(UNIX_EPOCH)
2917 .expect("Time went backwards")
2918 .as_millis();
2919 u64::try_from(now).expect("Travelling in time machine")
2920 }
2921
2922 #[instrument(level = "trace", skip_all)]
2923 pub async fn handle_transaction_info_request(
2924 &self,
2925 request: TransactionInfoRequest,
2926 ) -> IotaResult<TransactionInfoResponse> {
2927 let epoch_store = self.load_epoch_store_one_call_per_task();
2928 let (transaction, status) = self
2929 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2930 .ok_or(IotaError::TransactionNotFound {
2931 digest: request.transaction_digest,
2932 })?;
2933 Ok(TransactionInfoResponse {
2934 transaction,
2935 status,
2936 })
2937 }
2938
2939 #[instrument(level = "trace", skip_all)]
2940 pub async fn handle_object_info_request(
2941 &self,
2942 request: ObjectInfoRequest,
2943 ) -> IotaResult<ObjectInfoResponse> {
2944 let epoch_store = self.load_epoch_store_one_call_per_task();
2945
2946 let requested_object_seq = match request.request_kind {
2947 ObjectInfoRequestKind::LatestObjectInfo => {
2948 let (_, seq, _) = self
2949 .try_get_object_or_tombstone(request.object_id)
2950 .await?
2951 .ok_or_else(|| {
2952 IotaError::from(UserInputError::ObjectNotFound {
2953 object_id: request.object_id,
2954 version: None,
2955 })
2956 })?;
2957 seq
2958 }
2959 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2960 };
2961
2962 let object = self
2963 .get_object_store()
2964 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2965 .ok_or_else(|| {
2966 IotaError::from(UserInputError::ObjectNotFound {
2967 object_id: request.object_id,
2968 version: Some(requested_object_seq),
2969 })
2970 })?;
2971
2972 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2973 (request.generate_layout, object.data.try_as_move())
2974 {
2975 Some(into_struct_layout(
2976 epoch_store
2977 .executor()
2978 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2979 .get_annotated_layout(&move_obj.type_().clone().into())?,
2980 )?)
2981 } else {
2982 None
2983 };
2984
2985 let lock = if !object.is_address_owned() {
2986 None
2988 } else {
2989 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2990 .await?
2991 .map(|s| s.into_inner())
2992 };
2993
2994 Ok(ObjectInfoResponse {
2995 object,
2996 layout,
2997 lock_for_debugging: lock,
2998 })
2999 }
3000
3001 #[instrument(level = "trace", skip_all)]
3002 pub fn handle_checkpoint_request(
3003 &self,
3004 request: &CheckpointRequest,
3005 ) -> IotaResult<CheckpointResponse> {
3006 let summary = if request.certified {
3007 let summary = match request.sequence_number {
3008 Some(seq) => self
3009 .checkpoint_store
3010 .get_checkpoint_by_sequence_number(seq)?,
3011 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3012 }
3013 .map(|v| v.into_inner());
3014 summary.map(CheckpointSummaryResponse::Certified)
3015 } else {
3016 let summary = match request.sequence_number {
3017 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3018 None => self
3019 .checkpoint_store
3020 .get_latest_locally_computed_checkpoint()?,
3021 };
3022 summary.map(CheckpointSummaryResponse::Pending)
3023 };
3024 let contents = match &summary {
3025 Some(s) => self
3026 .checkpoint_store
3027 .get_checkpoint_contents(&s.content_digest())?,
3028 None => None,
3029 };
3030 Ok(CheckpointResponse {
3031 checkpoint: summary,
3032 contents,
3033 })
3034 }
3035
3036 fn check_protocol_version(
3037 supported_protocol_versions: SupportedProtocolVersions,
3038 current_version: ProtocolVersion,
3039 ) {
3040 info!("current protocol version is now {:?}", current_version);
3041 info!("supported versions are: {:?}", supported_protocol_versions);
3042 if !supported_protocol_versions.is_version_supported(current_version) {
3043 let msg = format!(
3044 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3045 );
3046
3047 error!("{}", msg);
3048 eprintln!("{msg}");
3049
3050 #[cfg(not(msim))]
3051 std::process::exit(1);
3052
3053 #[cfg(msim)]
3054 iota_simulator::task::shutdown_current_node();
3055 }
3056 }
3057
3058 #[expect(clippy::disallowed_methods)] pub async fn new(
3060 name: AuthorityName,
3061 secret: StableSyncAuthoritySigner,
3062 supported_protocol_versions: SupportedProtocolVersions,
3063 store: Arc<AuthorityStore>,
3064 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3065 epoch_store: Arc<AuthorityPerEpochStore>,
3066 committee_store: Arc<CommitteeStore>,
3067 indexes: Option<Arc<IndexStore>>,
3068 grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3069 checkpoint_store: Arc<CheckpointStore>,
3070 prometheus_registry: &Registry,
3071 genesis_objects: &[Object],
3072 db_checkpoint_config: &DBCheckpointConfig,
3073 config: NodeConfig,
3074 archive_readers: ArchiveReaderBalancer,
3075 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3076 chain_identifier: ChainIdentifier,
3077 pruner_db: Option<Arc<AuthorityPrunerTables>>,
3078 ) -> Arc<Self> {
3079 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3080
3081 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3082 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3083 let transaction_manager = Arc::new(TransactionManager::new(
3084 execution_cache_trait_pointers.object_cache_reader.clone(),
3085 execution_cache_trait_pointers
3086 .transaction_cache_reader
3087 .clone(),
3088 &epoch_store,
3089 tx_ready_certificates,
3090 metrics.clone(),
3091 ));
3092 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3093
3094 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3095 epoch_store.get_parent_path(),
3096 &config.authority_store_pruning_config,
3097 );
3098 let _pruner = AuthorityStorePruner::new(
3099 store.perpetual_tables.clone(),
3100 checkpoint_store.clone(),
3101 grpc_indexes_store.clone(),
3102 indexes.clone(),
3103 config.authority_store_pruning_config.clone(),
3104 epoch_store.committee().authority_exists(&name),
3105 epoch_store.epoch_start_state().epoch_duration_ms(),
3106 prometheus_registry,
3107 archive_readers,
3108 pruner_db,
3109 );
3110 let input_loader =
3111 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3112 let epoch = epoch_store.epoch();
3113 let rgp = epoch_store.reference_gas_price();
3114 let state = Arc::new(AuthorityState {
3115 name,
3116 secret,
3117 execution_lock: RwLock::new(epoch),
3118 epoch_store: ArcSwap::new(epoch_store.clone()),
3119 input_loader,
3120 execution_cache_trait_pointers,
3121 indexes,
3122 grpc_indexes_store,
3123 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3124 checkpoint_store,
3125 committee_store,
3126 transaction_manager,
3127 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3128 metrics,
3129 _pruner,
3130 _authority_per_epoch_pruner,
3131 db_checkpoint_config: db_checkpoint_config.clone(),
3132 config,
3133 overload_info: AuthorityOverloadInfo::default(),
3134 validator_tx_finalizer,
3135 chain_identifier,
3136 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3137 });
3138
3139 let authority_state = Arc::downgrade(&state);
3141 spawn_monitored_task!(execution_process(
3142 authority_state,
3143 rx_ready_certificates,
3144 rx_execution_shutdown,
3145 ));
3146 spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3147
3148 state
3150 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3151 .expect("Error indexing genesis objects.");
3152
3153 state
3154 }
3155
3156 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3158 &self.execution_cache_trait_pointers.object_cache_reader
3159 }
3160
3161 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3162 &self.execution_cache_trait_pointers.transaction_cache_reader
3163 }
3164
3165 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3166 &self.execution_cache_trait_pointers.cache_writer
3167 }
3168
3169 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3170 &self.execution_cache_trait_pointers.backing_store
3171 }
3172
3173 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3174 &self.execution_cache_trait_pointers.backing_package_store
3175 }
3176
3177 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3178 &self.execution_cache_trait_pointers.object_store
3179 }
3180
3181 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3182 &self.execution_cache_trait_pointers.reconfig_api
3183 }
3184
3185 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3186 &self.execution_cache_trait_pointers.accumulator_store
3187 }
3188
3189 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3190 &self.execution_cache_trait_pointers.checkpoint_cache
3191 }
3192
3193 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3194 &self.execution_cache_trait_pointers.state_sync_store
3195 }
3196
3197 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3198 &self.execution_cache_trait_pointers.cache_commit
3199 }
3200
3201 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3202 self.execution_cache_trait_pointers
3203 .testing_api
3204 .database_for_testing()
3205 }
3206
3207 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3208 &self,
3209 config: NodeConfig,
3210 metrics: Arc<AuthorityStorePruningMetrics>,
3211 ) -> anyhow::Result<()> {
3212 let archive_readers =
3213 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3214 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3215 &self.database_for_testing().perpetual_tables,
3216 &self.checkpoint_store,
3217 self.grpc_indexes_store.as_deref(),
3218 None,
3219 config.authority_store_pruning_config,
3220 metrics,
3221 archive_readers,
3222 EPOCH_DURATION_MS_FOR_TESTING,
3223 )
3224 .await
3225 }
3226
3227 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3228 &self.transaction_manager
3229 }
3230
3231 pub fn enqueue_transactions_for_execution(
3234 &self,
3235 txns: Vec<VerifiedExecutableTransaction>,
3236 epoch_store: &Arc<AuthorityPerEpochStore>,
3237 ) {
3238 self.transaction_manager.enqueue(txns, epoch_store)
3239 }
3240
3241 pub fn enqueue_certificates_for_execution(
3243 &self,
3244 certs: Vec<VerifiedCertificate>,
3245 epoch_store: &Arc<AuthorityPerEpochStore>,
3246 ) {
3247 self.transaction_manager
3248 .enqueue_certificates(certs, epoch_store)
3249 }
3250
3251 pub fn enqueue_with_expected_effects_digest(
3252 &self,
3253 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3254 epoch_store: &AuthorityPerEpochStore,
3255 ) {
3256 self.transaction_manager
3257 .enqueue_with_expected_effects_digest(certs, epoch_store)
3258 }
3259
3260 fn create_owner_index_if_empty(
3261 &self,
3262 genesis_objects: &[Object],
3263 epoch_store: &Arc<AuthorityPerEpochStore>,
3264 ) -> IotaResult {
3265 let Some(index_store) = &self.indexes else {
3266 return Ok(());
3267 };
3268 if !index_store.is_empty() {
3269 return Ok(());
3270 }
3271
3272 let mut new_owners = vec![];
3273 let mut new_dynamic_fields = vec![];
3274 let mut layout_resolver = epoch_store
3275 .executor()
3276 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3277 for o in genesis_objects.iter() {
3278 match o.owner {
3279 Owner::AddressOwner(addr) => new_owners.push((
3280 (addr, o.id()),
3281 ObjectInfo::new(&o.compute_object_reference(), o),
3282 )),
3283 Owner::ObjectOwner(object_id) => {
3284 let id = o.id();
3285 let info = match self.try_create_dynamic_field_info(
3286 o,
3287 &BTreeMap::new(),
3288 layout_resolver.as_mut(),
3289 ) {
3290 Ok(Some(info)) => info,
3291 Ok(None) => continue,
3292 Err(IotaError::UserInput {
3293 error:
3294 UserInputError::ObjectNotFound {
3295 object_id: not_found_id,
3296 version,
3297 },
3298 }) => {
3299 warn!(
3300 ?not_found_id,
3301 ?version,
3302 object_owner=?object_id,
3303 field=?id,
3304 "Skipping dynamic field: referenced genesis object not found"
3305 );
3306 continue;
3307 }
3308 Err(e) => return Err(e),
3309 };
3310 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3311 }
3312 _ => {}
3313 }
3314 }
3315
3316 index_store.insert_genesis_objects(ObjectIndexChanges {
3317 deleted_owners: vec![],
3318 deleted_dynamic_fields: vec![],
3319 new_owners,
3320 new_dynamic_fields,
3321 })
3322 }
3323
3324 pub fn execution_lock_for_executable_transaction(
3328 &self,
3329 transaction: &VerifiedExecutableTransaction,
3330 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3331 let lock = self
3332 .execution_lock
3333 .try_read()
3334 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3335 if *lock == transaction.auth_sig().epoch() {
3336 Ok(lock)
3337 } else {
3338 Err(IotaError::WrongEpoch {
3339 expected_epoch: *lock,
3340 actual_epoch: transaction.auth_sig().epoch(),
3341 })
3342 }
3343 }
3344
3345 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3351 self.execution_lock
3352 .try_read()
3353 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3354 }
3355
3356 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3357 self.execution_lock.write().await
3358 }
3359
3360 #[instrument(level = "error", skip_all)]
3361 pub async fn reconfigure(
3362 &self,
3363 cur_epoch_store: &AuthorityPerEpochStore,
3364 supported_protocol_versions: SupportedProtocolVersions,
3365 new_committee: Committee,
3366 epoch_start_configuration: EpochStartConfiguration,
3367 accumulator: Arc<StateAccumulator>,
3368 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3369 epoch_supply_change: i64,
3370 epoch_last_checkpoint: CheckpointSequenceNumber,
3371 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3372 Self::check_protocol_version(
3373 supported_protocol_versions,
3374 epoch_start_configuration
3375 .epoch_start_state()
3376 .protocol_version(),
3377 );
3378 self.metrics.reset_on_reconfigure();
3379 self.committee_store.insert_new_committee(&new_committee)?;
3380
3381 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3383
3384 cur_epoch_store.epoch_terminated().await;
3386
3387 let highest_locally_built_checkpoint_seq = self
3388 .checkpoint_store
3389 .get_latest_locally_computed_checkpoint()?
3390 .map(|c| *c.sequence_number())
3391 .unwrap_or(0);
3392
3393 assert!(
3394 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3395 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3396 );
3397 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3398 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3402 if num_shared_version_assignments > 1 {
3405 debug_fatal!(
3407 "all shared_version_assignments should have been removed \
3408 (num_shared_version_assignments: {num_shared_version_assignments})"
3409 );
3410 }
3411 }
3412
3413 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3419 .await?;
3420 self.get_reconfig_api()
3421 .clear_state_end_of_epoch(&execution_lock);
3422 self.check_system_consistency(
3423 cur_epoch_store,
3424 accumulator,
3425 expensive_safety_check_config,
3426 epoch_supply_change,
3427 )?;
3428 self.get_reconfig_api()
3429 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3430 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3431 if self
3432 .db_checkpoint_config
3433 .perform_db_checkpoints_at_epoch_end
3434 {
3435 let checkpoint_indexes = self
3436 .db_checkpoint_config
3437 .perform_index_db_checkpoints_at_epoch_end
3438 .unwrap_or(false);
3439 let current_epoch = cur_epoch_store.epoch();
3440 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3441 self.checkpoint_all_dbs(
3442 &epoch_checkpoint_path,
3443 cur_epoch_store,
3444 checkpoint_indexes,
3445 )?;
3446 }
3447 }
3448
3449 self.get_reconfig_api()
3450 .reconfigure_cache(&epoch_start_configuration)
3451 .await;
3452
3453 let new_epoch = new_committee.epoch;
3454 let new_epoch_store = self
3455 .reopen_epoch_db(
3456 cur_epoch_store,
3457 new_committee,
3458 epoch_start_configuration,
3459 expensive_safety_check_config,
3460 epoch_last_checkpoint,
3461 )
3462 .await?;
3463 assert_eq!(new_epoch_store.epoch(), new_epoch);
3464 self.transaction_manager.reconfigure(new_epoch);
3465 *execution_lock = new_epoch;
3466 Ok(new_epoch_store)
3470 }
3471
3472 pub async fn reconfigure_for_testing(&self) {
3477 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3478 let epoch_store = self.epoch_store_for_testing().clone();
3479 let protocol_config = epoch_store.protocol_config().clone();
3480 let _guard =
3488 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3489 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3490 self.get_backing_package_store().clone(),
3491 self.get_object_store().clone(),
3492 &self.config.expensive_safety_check_config,
3493 self.checkpoint_store
3494 .get_epoch_last_checkpoint(epoch_store.epoch())
3495 .unwrap()
3496 .map(|c| *c.sequence_number())
3497 .unwrap_or_default(),
3498 );
3499 let new_epoch = new_epoch_store.epoch();
3500 self.transaction_manager.reconfigure(new_epoch);
3501 self.epoch_store.store(new_epoch_store);
3502 epoch_store.epoch_terminated().await;
3503 *execution_lock = new_epoch;
3504 }
3505
3506 #[instrument(level = "error", skip_all)]
3507 fn check_system_consistency(
3508 &self,
3509 cur_epoch_store: &AuthorityPerEpochStore,
3510 accumulator: Arc<StateAccumulator>,
3511 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3512 epoch_supply_change: i64,
3513 ) -> IotaResult<()> {
3514 info!(
3515 "Performing iota conservation consistency check for epoch {}",
3516 cur_epoch_store.epoch()
3517 );
3518
3519 if cfg!(debug_assertions) {
3520 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3521 }
3522
3523 self.get_reconfig_api()
3524 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3525
3526 if expensive_safety_check_config.enable_state_consistency_check() {
3528 info!(
3529 "Performing state consistency check for epoch {}",
3530 cur_epoch_store.epoch()
3531 );
3532 self.expensive_check_is_consistent_state(
3533 accumulator,
3534 cur_epoch_store,
3535 cfg!(debug_assertions), );
3537 }
3538
3539 if expensive_safety_check_config.enable_secondary_index_checks() {
3540 if let Some(indexes) = self.indexes.clone() {
3541 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3542 .expect("secondary indexes are inconsistent");
3543 }
3544 }
3545
3546 Ok(())
3547 }
3548
3549 fn expensive_check_is_consistent_state(
3550 &self,
3551 accumulator: Arc<StateAccumulator>,
3552 cur_epoch_store: &AuthorityPerEpochStore,
3553 panic: bool,
3554 ) {
3555 let live_object_set_hash = accumulator.digest_live_object_set();
3556
3557 let root_state_hash: ECMHLiveObjectSetDigest = self
3558 .get_accumulator_store()
3559 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3560 .expect("Retrieving root state hash cannot fail")
3561 .expect("Root state hash for epoch must exist")
3562 .1
3563 .digest()
3564 .into();
3565
3566 let is_inconsistent = root_state_hash != live_object_set_hash;
3567 if is_inconsistent {
3568 if panic {
3569 panic!(
3570 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3571 );
3572 } else {
3573 error!(
3574 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3575 root_state_hash, live_object_set_hash
3576 );
3577 }
3578 } else {
3579 info!("State consistency check passed");
3580 }
3581
3582 if !panic {
3583 accumulator.set_inconsistent_state(is_inconsistent);
3584 }
3585 }
3586
3587 pub fn current_epoch_for_testing(&self) -> EpochId {
3588 self.epoch_store_for_testing().epoch()
3589 }
3590
3591 #[instrument(level = "error", skip_all)]
3592 pub fn checkpoint_all_dbs(
3593 &self,
3594 checkpoint_path: &Path,
3595 cur_epoch_store: &AuthorityPerEpochStore,
3596 checkpoint_indexes: bool,
3597 ) -> IotaResult {
3598 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3599 let current_epoch = cur_epoch_store.epoch();
3600
3601 if checkpoint_path.exists() {
3602 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3603 return Ok(());
3604 }
3605
3606 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3607 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3608
3609 if checkpoint_path_tmp.exists() {
3610 fs::remove_dir_all(&checkpoint_path_tmp)
3611 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3612 }
3613
3614 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3615 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3616
3617 self.checkpoint_store
3620 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3621
3622 self.get_reconfig_api()
3623 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3624
3625 self.committee_store
3626 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3627
3628 if checkpoint_indexes {
3629 if let Some(indexes) = self.indexes.as_ref() {
3630 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3631 }
3632 if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3633 grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3634 }
3635 }
3636
3637 fs::rename(checkpoint_path_tmp, checkpoint_path)
3638 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3639 Ok(())
3640 }
3641
3642 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3649 self.epoch_store.load()
3650 }
3651
3652 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3654 self.load_epoch_store_one_call_per_task()
3655 }
3656
3657 pub fn clone_committee_for_testing(&self) -> Committee {
3658 Committee::clone(self.epoch_store_for_testing().committee())
3659 }
3660
3661 #[instrument(level = "trace", skip_all)]
3662 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3663 self.get_object_store()
3664 .try_get_object(object_id)
3665 .map_err(Into::into)
3666 }
3667
3668 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3670 self.try_get_object(object_id)
3671 .await
3672 .expect("storage access failed")
3673 }
3674
3675 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3676 Ok(self
3677 .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3678 .await?
3679 .expect("framework object should always exist")
3680 .compute_object_reference())
3681 }
3682
3683 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3685 self.get_object_cache_reader()
3686 .try_get_iota_system_state_object_unsafe()
3687 }
3688
3689 #[instrument(level = "trace", skip_all)]
3690 pub fn get_checkpoint_by_sequence_number(
3691 &self,
3692 sequence_number: CheckpointSequenceNumber,
3693 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3694 Ok(self
3695 .checkpoint_store
3696 .get_checkpoint_by_sequence_number(sequence_number)?)
3697 }
3698
3699 pub async fn wait_for_checkpoint_inclusion(
3706 &self,
3707 digests: &[TransactionDigest],
3708 timeout: Duration,
3709 ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3710 let epoch_store = self.load_epoch_store_one_call_per_task();
3711
3712 let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3715
3716 let results = epoch_store
3717 .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3718 *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3719 self.get_checkpoint_by_sequence_number(seq)
3720 .ok()
3721 .flatten()
3722 .map(|c| c.timestamp_ms)
3723 .unwrap_or(0)
3724 })
3725 })
3726 .await?;
3727
3728 Ok(digests
3729 .iter()
3730 .copied()
3731 .zip(results)
3732 .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3733 .collect())
3734 }
3735
3736 #[instrument(level = "trace", skip_all)]
3737 pub fn get_transaction_checkpoint_for_tests(
3738 &self,
3739 digest: &TransactionDigest,
3740 epoch_store: &AuthorityPerEpochStore,
3741 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3742 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3743 let Some(checkpoint) = checkpoint else {
3744 return Ok(None);
3745 };
3746 let checkpoint = self
3747 .checkpoint_store
3748 .get_checkpoint_by_sequence_number(checkpoint)?;
3749 Ok(checkpoint)
3750 }
3751
3752 #[instrument(level = "trace", skip_all)]
3753 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3754 Ok(
3755 match self
3756 .get_object_cache_reader()
3757 .try_get_latest_object_or_tombstone(*object_id)?
3758 {
3759 Some((_, ObjectOrTombstone::Object(object))) => {
3760 let layout = self.get_object_layout(&object)?;
3761 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3762 }
3763 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3764 None => ObjectRead::NotExists(*object_id),
3765 },
3766 )
3767 }
3768
3769 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3771 self.chain_identifier
3772 }
3773
3774 #[instrument(level = "trace", skip_all)]
3775 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3776 where
3777 T: DeserializeOwned,
3778 {
3779 let o = self.get_object_read(object_id)?.into_object()?;
3780 if let Some(move_object) = o.data.try_as_move() {
3781 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3782 IotaError::ObjectDeserialization {
3783 error: format!("{e}"),
3784 }
3785 })?)
3786 } else {
3787 Err(IotaError::ObjectDeserialization {
3788 error: format!("Provided object : [{object_id}] is not a Move object."),
3789 })
3790 }
3791 }
3792
3793 #[instrument(level = "trace", skip_all)]
3799 pub fn get_past_object_read(
3800 &self,
3801 object_id: &ObjectID,
3802 version: SequenceNumber,
3803 ) -> IotaResult<PastObjectRead> {
3804 let Some(obj_ref) = self
3806 .get_object_cache_reader()
3807 .try_get_latest_object_ref_or_tombstone(*object_id)?
3808 else {
3809 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3810 };
3811
3812 if version > obj_ref.1 {
3813 return Ok(PastObjectRead::VersionTooHigh {
3814 object_id: *object_id,
3815 asked_version: version,
3816 latest_version: obj_ref.1,
3817 });
3818 }
3819
3820 if version < obj_ref.1 {
3821 return Ok(match self.read_object_at_version(object_id, version)? {
3823 Some((object, layout)) => {
3824 let obj_ref = object.compute_object_reference();
3825 PastObjectRead::VersionFound(obj_ref, object, layout)
3826 }
3827
3828 None => PastObjectRead::VersionNotFound(*object_id, version),
3829 });
3830 }
3831
3832 if !obj_ref.2.is_alive() {
3833 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3834 }
3835
3836 match self.read_object_at_version(object_id, obj_ref.1)? {
3837 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3838 None => {
3839 error!(
3840 "Object with in parent_entry is missing from object store, datastore is \
3841 inconsistent",
3842 );
3843 Err(UserInputError::ObjectNotFound {
3844 object_id: *object_id,
3845 version: Some(obj_ref.1),
3846 }
3847 .into())
3848 }
3849 }
3850 }
3851
3852 #[instrument(level = "trace", skip_all)]
3853 fn read_object_at_version(
3854 &self,
3855 object_id: &ObjectID,
3856 version: SequenceNumber,
3857 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3858 let Some(object) = self
3859 .get_object_cache_reader()
3860 .try_get_object_by_key(object_id, version)?
3861 else {
3862 return Ok(None);
3863 };
3864
3865 let layout = self.get_object_layout(&object)?;
3866 Ok(Some((object, layout)))
3867 }
3868
3869 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3870 let layout = object
3871 .data
3872 .try_as_move()
3873 .map(|object| {
3874 into_struct_layout(
3875 self.load_epoch_store_one_call_per_task()
3876 .executor()
3877 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3879 .get_annotated_layout(&object.type_().clone().into())?,
3880 )
3881 })
3882 .transpose()?;
3883 Ok(layout)
3884 }
3885
3886 fn get_owner_at_version(
3887 &self,
3888 object_id: &ObjectID,
3889 version: SequenceNumber,
3890 ) -> IotaResult<Owner> {
3891 self.get_object_store()
3892 .try_get_object_by_key(object_id, version)?
3893 .ok_or_else(|| {
3894 IotaError::from(UserInputError::ObjectNotFound {
3895 object_id: *object_id,
3896 version: Some(version),
3897 })
3898 })
3899 .map(|o| o.owner)
3900 }
3901
3902 #[instrument(level = "trace", skip_all)]
3903 pub fn get_owner_objects(
3904 &self,
3905 owner: IotaAddress,
3906 cursor: Option<ObjectID>,
3908 limit: usize,
3909 filter: Option<IotaObjectDataFilter>,
3910 ) -> IotaResult<Vec<ObjectInfo>> {
3911 if let Some(indexes) = &self.indexes {
3912 indexes.get_owner_objects(owner, cursor, limit, filter)
3913 } else {
3914 Err(IotaError::IndexStoreNotAvailable)
3915 }
3916 }
3917
3918 #[instrument(level = "trace", skip_all)]
3919 pub fn get_owned_coins_iterator_with_cursor(
3920 &self,
3921 owner: IotaAddress,
3922 cursor: (String, ObjectID),
3924 limit: usize,
3925 one_coin_type_only: bool,
3926 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3927 if let Some(indexes) = &self.indexes {
3928 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3929 } else {
3930 Err(IotaError::IndexStoreNotAvailable)
3931 }
3932 }
3933
3934 #[instrument(level = "trace", skip_all)]
3935 pub fn get_owner_objects_iterator(
3936 &self,
3937 owner: IotaAddress,
3938 cursor: Option<ObjectID>,
3940 filter: Option<IotaObjectDataFilter>,
3941 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3942 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3943 if let Some(indexes) = &self.indexes {
3944 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3945 } else {
3946 Err(IotaError::IndexStoreNotAvailable)
3947 }
3948 }
3949
3950 #[instrument(level = "trace", skip_all)]
3951 pub async fn get_move_objects<T>(
3952 &self,
3953 owner: IotaAddress,
3954 type_: MoveObjectType,
3955 ) -> IotaResult<Vec<T>>
3956 where
3957 T: DeserializeOwned,
3958 {
3959 let object_ids = self
3960 .get_owner_objects_iterator(owner, None, None)?
3961 .filter(|o| match &o.type_ {
3962 ObjectType::Struct(s) => &type_ == s,
3963 ObjectType::Package => false,
3964 })
3965 .map(|info| ObjectKey(info.object_id, info.version))
3966 .collect::<Vec<_>>();
3967 let mut move_objects = vec![];
3968
3969 let objects = self
3970 .get_object_store()
3971 .try_multi_get_objects_by_key(&object_ids)?;
3972
3973 for (o, id) in objects.into_iter().zip(object_ids) {
3974 let object = o.ok_or_else(|| {
3975 IotaError::from(UserInputError::ObjectNotFound {
3976 object_id: id.0,
3977 version: Some(id.1),
3978 })
3979 })?;
3980 let move_object = object.data.try_as_move().ok_or_else(|| {
3981 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3982 })?;
3983 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3984 IotaError::ObjectDeserialization {
3985 error: format!("{e}"),
3986 }
3987 })?);
3988 }
3989 Ok(move_objects)
3990 }
3991
3992 #[instrument(level = "trace", skip_all)]
3993 pub fn get_dynamic_fields(
3994 &self,
3995 owner: ObjectID,
3996 cursor: Option<ObjectID>,
3998 limit: usize,
3999 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4000 Ok(self
4001 .get_dynamic_fields_iterator(owner, cursor)?
4002 .take(limit)
4003 .collect::<Result<Vec<_>, _>>()?)
4004 }
4005
4006 fn get_dynamic_fields_iterator(
4007 &self,
4008 owner: ObjectID,
4009 cursor: Option<ObjectID>,
4011 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4012 {
4013 if let Some(indexes) = &self.indexes {
4014 indexes.get_dynamic_fields_iterator(owner, cursor)
4015 } else {
4016 Err(IotaError::IndexStoreNotAvailable)
4017 }
4018 }
4019
4020 #[instrument(level = "trace", skip_all)]
4021 pub fn get_dynamic_field_object_id(
4022 &self,
4023 owner: ObjectID,
4024 name_type: TypeTag,
4025 name_bcs_bytes: &[u8],
4026 ) -> IotaResult<Option<ObjectID>> {
4027 if let Some(indexes) = &self.indexes {
4028 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4029 } else {
4030 Err(IotaError::IndexStoreNotAvailable)
4031 }
4032 }
4033
4034 #[instrument(level = "trace", skip_all)]
4035 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4036 Ok(self.get_indexes()?.next_sequence_number())
4037 }
4038
4039 #[instrument(level = "trace", skip_all)]
4040 pub async fn get_executed_transaction_and_effects(
4041 &self,
4042 digest: TransactionDigest,
4043 kv_store: Arc<TransactionKeyValueStore>,
4044 ) -> IotaResult<(Transaction, TransactionEffects)> {
4045 let transaction = kv_store.get_tx(digest).await?;
4046 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4047 Ok((transaction, effects))
4048 }
4049
4050 #[instrument(level = "trace", skip_all)]
4051 pub fn multi_get_checkpoint_by_sequence_number(
4052 &self,
4053 sequence_numbers: &[CheckpointSequenceNumber],
4054 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4055 Ok(self
4056 .checkpoint_store
4057 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4058 }
4059
4060 #[instrument(level = "trace", skip_all)]
4061 pub fn get_transaction_events(
4062 &self,
4063 digest: &TransactionDigest,
4064 ) -> IotaResult<TransactionEvents> {
4065 self.get_transaction_cache_reader()
4066 .try_get_events(digest)?
4067 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4068 }
4069
4070 pub fn get_transaction_input_objects(
4071 &self,
4072 effects: &TransactionEffects,
4073 ) -> anyhow::Result<Vec<Object>> {
4074 iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4075 .map_err(Into::into)
4076 }
4077
4078 pub fn get_transaction_output_objects(
4079 &self,
4080 effects: &TransactionEffects,
4081 ) -> anyhow::Result<Vec<Object>> {
4082 iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4083 .map_err(Into::into)
4084 }
4085
4086 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4087 match &self.indexes {
4088 Some(i) => Ok(i.clone()),
4089 None => Err(IotaError::UnsupportedFeature {
4090 error: "extended object indexing is not enabled on this server".into(),
4091 }),
4092 }
4093 }
4094
4095 pub async fn get_transactions_for_tests(
4096 self: &Arc<Self>,
4097 filter: Option<TransactionFilter>,
4098 cursor: Option<TransactionDigest>,
4099 limit: Option<usize>,
4100 reverse: bool,
4101 ) -> IotaResult<Vec<TransactionDigest>> {
4102 let metrics = KeyValueStoreMetrics::new_for_tests();
4103 let kv_store = Arc::new(TransactionKeyValueStore::new(
4104 "rocksdb",
4105 metrics,
4106 self.clone(),
4107 ));
4108 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4109 .await
4110 }
4111
4112 #[instrument(level = "trace", skip_all)]
4113 pub async fn get_transactions(
4114 &self,
4115 kv_store: &Arc<TransactionKeyValueStore>,
4116 filter: Option<TransactionFilter>,
4117 cursor: Option<TransactionDigest>,
4119 limit: Option<usize>,
4120 reverse: bool,
4121 ) -> IotaResult<Vec<TransactionDigest>> {
4122 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4123 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4124 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4125 if reverse {
4126 let iter = iter
4127 .rev()
4128 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4129 .skip(usize::from(cursor.is_some()));
4130 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4131 } else {
4132 let iter = iter
4133 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4134 .skip(usize::from(cursor.is_some()));
4135 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4136 }
4137 }
4138 self.get_indexes()?
4139 .get_transactions(filter, cursor, limit, reverse)
4140 }
4141
4142 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4143 &self.checkpoint_store
4144 }
4145
4146 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4147 self.get_checkpoint_store()
4148 .get_highest_executed_checkpoint_seq_number()?
4149 .ok_or(IotaError::UserInput {
4150 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4151 })
4152 }
4153
4154 #[cfg(msim)]
4155 pub fn get_highest_pruned_checkpoint_for_testing(
4156 &self,
4157 ) -> IotaResult<CheckpointSequenceNumber> {
4158 self.database_for_testing()
4159 .perpetual_tables
4160 .get_highest_pruned_checkpoint()
4161 .map(|c| c.unwrap_or(0))
4162 .map_err(Into::into)
4163 }
4164
4165 #[instrument(level = "trace", skip_all)]
4166 pub fn get_checkpoint_summary_by_sequence_number(
4167 &self,
4168 sequence_number: CheckpointSequenceNumber,
4169 ) -> IotaResult<CheckpointSummary> {
4170 let verified_checkpoint = self
4171 .get_checkpoint_store()
4172 .get_checkpoint_by_sequence_number(sequence_number)?;
4173 match verified_checkpoint {
4174 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4175 None => Err(IotaError::UserInput {
4176 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4177 }),
4178 }
4179 }
4180
4181 #[instrument(level = "trace", skip_all)]
4182 pub fn get_checkpoint_summary_by_digest(
4183 &self,
4184 digest: CheckpointDigest,
4185 ) -> IotaResult<CheckpointSummary> {
4186 let verified_checkpoint = self
4187 .get_checkpoint_store()
4188 .get_checkpoint_by_digest(&digest)?;
4189 match verified_checkpoint {
4190 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4191 None => Err(IotaError::UserInput {
4192 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4193 }),
4194 }
4195 }
4196
4197 #[instrument(level = "trace", skip_all)]
4198 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4199 if is_system_package(package_id) {
4200 return self.find_genesis_txn_digest();
4201 }
4202 Ok(self
4203 .get_object_read(&package_id)?
4204 .into_object()?
4205 .previous_transaction)
4206 }
4207
4208 #[instrument(level = "trace", skip_all)]
4209 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4210 let summary = self
4211 .get_verified_checkpoint_by_sequence_number(0)?
4212 .into_message();
4213 let content = self.get_checkpoint_contents(summary.content_digest)?;
4214 let genesis_transaction = content.enumerate_transactions(&summary).next();
4215 Ok(genesis_transaction
4216 .ok_or(IotaError::UserInput {
4217 error: UserInputError::GenesisTransactionNotFound,
4218 })?
4219 .1
4220 .transaction)
4221 }
4222
4223 #[instrument(level = "trace", skip_all)]
4224 pub fn get_verified_checkpoint_by_sequence_number(
4225 &self,
4226 sequence_number: CheckpointSequenceNumber,
4227 ) -> IotaResult<VerifiedCheckpoint> {
4228 let verified_checkpoint = self
4229 .get_checkpoint_store()
4230 .get_checkpoint_by_sequence_number(sequence_number)?;
4231 match verified_checkpoint {
4232 Some(verified_checkpoint) => Ok(verified_checkpoint),
4233 None => Err(IotaError::UserInput {
4234 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4235 }),
4236 }
4237 }
4238
4239 #[instrument(level = "trace", skip_all)]
4240 pub fn get_verified_checkpoint_summary_by_digest(
4241 &self,
4242 digest: CheckpointDigest,
4243 ) -> IotaResult<VerifiedCheckpoint> {
4244 let verified_checkpoint = self
4245 .get_checkpoint_store()
4246 .get_checkpoint_by_digest(&digest)?;
4247 match verified_checkpoint {
4248 Some(verified_checkpoint) => Ok(verified_checkpoint),
4249 None => Err(IotaError::UserInput {
4250 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4251 }),
4252 }
4253 }
4254
4255 #[instrument(level = "trace", skip_all)]
4256 pub fn get_checkpoint_contents(
4257 &self,
4258 digest: CheckpointContentsDigest,
4259 ) -> IotaResult<CheckpointContents> {
4260 self.get_checkpoint_store()
4261 .get_checkpoint_contents(&digest)?
4262 .ok_or(IotaError::UserInput {
4263 error: UserInputError::CheckpointContentsNotFound(digest),
4264 })
4265 }
4266
4267 #[instrument(level = "trace", skip_all)]
4268 pub fn get_checkpoint_contents_by_sequence_number(
4269 &self,
4270 sequence_number: CheckpointSequenceNumber,
4271 ) -> IotaResult<CheckpointContents> {
4272 let verified_checkpoint = self
4273 .get_checkpoint_store()
4274 .get_checkpoint_by_sequence_number(sequence_number)?;
4275 match verified_checkpoint {
4276 Some(verified_checkpoint) => {
4277 let content_digest = verified_checkpoint.into_inner().content_digest;
4278 self.get_checkpoint_contents(content_digest)
4279 }
4280 None => Err(IotaError::UserInput {
4281 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4282 }),
4283 }
4284 }
4285
4286 #[instrument(level = "trace", skip_all)]
4287 pub async fn query_events(
4288 &self,
4289 kv_store: &Arc<TransactionKeyValueStore>,
4290 query: EventFilter,
4291 cursor: Option<EventID>,
4293 limit: usize,
4294 descending: bool,
4295 ) -> IotaResult<Vec<IotaEvent>> {
4296 let index_store = self.get_indexes()?;
4297
4298 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4300 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4301 IotaError::TransactionNotFound {
4302 digest: cursor.tx_digest,
4303 },
4304 )?;
4305 (tx_seq, cursor.event_seq as usize)
4306 } else if descending {
4307 (u64::MAX, usize::MAX)
4308 } else {
4309 (0, 0)
4310 };
4311
4312 let limit = limit + 1;
4313 let mut event_keys = match query {
4314 EventFilter::All(filters) => {
4315 if filters.is_empty() {
4316 index_store.all_events(tx_num, event_num, limit, descending)?
4317 } else {
4318 return Err(IotaError::UserInput {
4319 error: UserInputError::Unsupported(
4320 "This query type does not currently support filter combinations"
4321 .to_string(),
4322 ),
4323 });
4324 }
4325 }
4326 EventFilter::Transaction(digest) => {
4327 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4328 }
4329 EventFilter::MoveModule { package, module } => {
4330 let module_id = ModuleId::new(package.into(), module);
4331 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4332 }
4333 EventFilter::MoveEventType(struct_name) => index_store
4334 .events_by_move_event_struct_name(
4335 &struct_name,
4336 tx_num,
4337 event_num,
4338 limit,
4339 descending,
4340 )?,
4341 EventFilter::Sender(sender) => {
4342 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4343 }
4344 EventFilter::TimeRange {
4345 start_time,
4346 end_time,
4347 } => index_store
4348 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4349 EventFilter::MoveEventModule { package, module } => index_store
4350 .events_by_move_event_module(
4351 &ModuleId::new(package.into(), module),
4352 tx_num,
4353 event_num,
4354 limit,
4355 descending,
4356 )?,
4357 EventFilter::Package(_)
4359 | EventFilter::MoveEventField { .. }
4360 | EventFilter::Any(_)
4361 | EventFilter::And(_, _)
4362 | EventFilter::Or(_, _) => {
4363 return Err(IotaError::UserInput {
4364 error: UserInputError::Unsupported(
4365 "This query type is not supported by the full node.".to_string(),
4366 ),
4367 });
4368 }
4369 };
4370
4371 if cursor.is_some() {
4374 if !event_keys.is_empty() {
4375 event_keys.remove(0);
4376 }
4377 } else {
4378 event_keys.truncate(limit - 1);
4379 }
4380
4381 let transaction_digests = event_keys
4383 .iter()
4384 .map(|(_, digest, _, _)| *digest)
4385 .collect::<HashSet<_>>()
4386 .into_iter()
4387 .collect::<Vec<_>>();
4388
4389 let events = kv_store
4390 .multi_get_events_by_tx_digests(&transaction_digests)
4391 .await?;
4392
4393 let events_map: HashMap<_, _> =
4394 transaction_digests.iter().zip(events.into_iter()).collect();
4395
4396 let stored_events = event_keys
4397 .into_iter()
4398 .map(|k| {
4399 (
4400 k,
4401 events_map
4402 .get(&k.1)
4403 .expect("fetched digest is missing")
4404 .clone()
4405 .and_then(|e| e.data.get(k.2).cloned()),
4406 )
4407 })
4408 .map(
4409 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4410 event
4411 .map(|e| (e, tx_digest, event_seq, timestamp))
4412 .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4413 },
4414 )
4415 .collect::<Result<Vec<_>, _>>()?;
4416
4417 let epoch_store = self.load_epoch_store_one_call_per_task();
4418 let backing_store = self.get_backing_package_store().as_ref();
4419 let mut layout_resolver = epoch_store
4420 .executor()
4421 .type_layout_resolver(Box::new(backing_store));
4422 let mut events = vec![];
4423 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4424 events.push(IotaEvent::try_from(
4425 e.clone(),
4426 tx_digest,
4427 event_seq as u64,
4428 Some(timestamp),
4429 layout_resolver.get_annotated_layout(&e.type_)?,
4430 )?)
4431 }
4432 Ok(events)
4433 }
4434
4435 pub async fn insert_genesis_object(&self, object: Object) {
4436 self.get_reconfig_api()
4437 .try_insert_genesis_object(object)
4438 .expect("Cannot insert genesis object")
4439 }
4440
4441 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4442 futures::future::join_all(
4443 objects
4444 .iter()
4445 .map(|o| self.insert_genesis_object(o.clone())),
4446 )
4447 .await;
4448 }
4449
4450 #[instrument(level = "trace", skip_all)]
4452 pub fn get_transaction_status(
4453 &self,
4454 transaction_digest: &TransactionDigest,
4455 epoch_store: &Arc<AuthorityPerEpochStore>,
4456 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4457 if let Some(effects) =
4459 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4460 {
4461 if let Some(transaction) = self
4462 .get_transaction_cache_reader()
4463 .try_get_transaction_block(transaction_digest)?
4464 {
4465 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4466 let events = if effects.events_digest().is_some() {
4467 self.get_transaction_events(effects.transaction_digest())?
4468 } else {
4469 TransactionEvents::default()
4470 };
4471 return Ok(Some((
4472 (*transaction).clone().into_message(),
4473 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4474 )));
4475 } else {
4476 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4481 }
4482 }
4483 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4484 self.metrics.tx_already_processed.inc();
4485 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4486 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4487 } else {
4488 Ok(None)
4489 }
4490 }
4491
4492 #[instrument(level = "trace", skip_all)]
4496 pub fn get_signed_effects_and_maybe_resign(
4497 &self,
4498 transaction_digest: &TransactionDigest,
4499 epoch_store: &Arc<AuthorityPerEpochStore>,
4500 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4501 let effects = self
4502 .get_transaction_cache_reader()
4503 .try_get_executed_effects(transaction_digest)?;
4504 match effects {
4505 Some(effects) => {
4506 if effects.executed_epoch() != epoch_store.epoch() {
4529 debug!(
4530 tx_digest=?transaction_digest,
4531 effects_epoch=?effects.executed_epoch(),
4532 epoch=?epoch_store.epoch(),
4533 "Re-signing the effects with the current epoch"
4534 );
4535 }
4536 Ok(Some(self.sign_effects(effects, epoch_store)?))
4537 }
4538 None => Ok(None),
4539 }
4540 }
4541
4542 #[instrument(level = "trace", skip_all)]
4543 pub(crate) fn sign_effects(
4544 &self,
4545 effects: TransactionEffects,
4546 epoch_store: &Arc<AuthorityPerEpochStore>,
4547 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4548 let tx_digest = *effects.transaction_digest();
4549 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4550 Some(sig) => {
4551 debug_assert!(sig.epoch == epoch_store.epoch());
4552 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4553 }
4554 _ => {
4555 let sig = AuthoritySignInfo::new(
4556 epoch_store.epoch(),
4557 &effects,
4558 Intent::iota_app(IntentScope::TransactionEffects),
4559 self.name,
4560 &*self.secret,
4561 );
4562
4563 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4564
4565 epoch_store.insert_effects_digest_and_signature(
4566 &tx_digest,
4567 effects.digest(),
4568 &sig,
4569 )?;
4570
4571 effects
4572 }
4573 };
4574
4575 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4576 signed_effects,
4577 ))
4578 }
4579
4580 #[instrument(level = "trace", skip_all)]
4582 fn fullnode_only_get_tx_coins_for_indexing(
4583 &self,
4584 effects: &TransactionEffects,
4585 inner_temporary_store: &InnerTemporaryStore,
4586 epoch_store: &Arc<AuthorityPerEpochStore>,
4587 ) -> Option<TxCoins> {
4588 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4589 return None;
4590 }
4591 let written_coin_objects = inner_temporary_store
4592 .written
4593 .iter()
4594 .filter_map(|(k, v)| {
4595 if v.is_coin() {
4596 Some((*k, v.clone()))
4597 } else {
4598 None
4599 }
4600 })
4601 .collect();
4602 let mut input_coin_objects = inner_temporary_store
4603 .input_objects
4604 .iter()
4605 .filter_map(|(k, v)| {
4606 if v.is_coin() {
4607 Some((*k, v.clone()))
4608 } else {
4609 None
4610 }
4611 })
4612 .collect::<ObjectMap>();
4613
4614 for (object_id, version) in effects.modified_at_versions() {
4619 if inner_temporary_store
4620 .loaded_runtime_objects
4621 .contains_key(&object_id)
4622 {
4623 if let Some(object) = self
4624 .get_object_store()
4625 .get_object_by_key(&object_id, version)
4626 {
4627 if object.is_coin() {
4628 input_coin_objects.insert(object_id, object);
4629 }
4630 }
4631 }
4632 }
4633
4634 Some((input_coin_objects, written_coin_objects))
4635 }
4636
4637 #[instrument(level = "trace", skip_all)]
4649 pub async fn get_transaction_lock(
4650 &self,
4651 object_ref: &ObjectRef,
4652 epoch_store: &AuthorityPerEpochStore,
4653 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4654 let lock_info = self
4655 .get_object_cache_reader()
4656 .try_get_lock(*object_ref, epoch_store)?;
4657 let lock_info = match lock_info {
4658 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4659 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4660 provided_obj_ref: *object_ref,
4661 current_version: locked_ref.1,
4662 }
4663 .into());
4664 }
4665 ObjectLockStatus::Initialized => {
4666 return Ok(None);
4667 }
4668 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4669 };
4670
4671 epoch_store.get_signed_transaction(&lock_info)
4672 }
4673
4674 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4675 self.get_object_cache_reader().try_get_objects(objects)
4676 }
4677
4678 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4680 self.try_get_objects(objects)
4681 .await
4682 .expect("storage access failed")
4683 }
4684
4685 pub async fn try_get_object_or_tombstone(
4686 &self,
4687 object_id: ObjectID,
4688 ) -> IotaResult<Option<ObjectRef>> {
4689 self.get_object_cache_reader()
4690 .try_get_latest_object_ref_or_tombstone(object_id)
4691 }
4692
4693 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4695 self.try_get_object_or_tombstone(object_id)
4696 .await
4697 .expect("storage access failed")
4698 }
4699
4700 pub fn set_override_protocol_upgrade_buffer_stake(
4710 &self,
4711 expected_epoch: EpochId,
4712 buffer_stake_bps: u64,
4713 ) -> IotaResult {
4714 let epoch_store = self.load_epoch_store_one_call_per_task();
4715 let actual_epoch = epoch_store.epoch();
4716 if actual_epoch != expected_epoch {
4717 return Err(IotaError::WrongEpoch {
4718 expected_epoch,
4719 actual_epoch,
4720 });
4721 }
4722
4723 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4724 }
4725
4726 pub fn clear_override_protocol_upgrade_buffer_stake(
4727 &self,
4728 expected_epoch: EpochId,
4729 ) -> IotaResult {
4730 let epoch_store = self.load_epoch_store_one_call_per_task();
4731 let actual_epoch = epoch_store.epoch();
4732 if actual_epoch != expected_epoch {
4733 return Err(IotaError::WrongEpoch {
4734 expected_epoch,
4735 actual_epoch,
4736 });
4737 }
4738
4739 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4740 }
4741
4742 pub async fn get_available_system_packages(
4746 &self,
4747 binary_config: &BinaryConfig,
4748 ) -> Vec<ObjectRef> {
4749 let mut results = vec![];
4750
4751 let system_packages = BuiltInFramework::iter_system_packages();
4752
4753 #[cfg(msim)]
4755 let extra_packages = framework_injection::get_extra_packages(self.name);
4756 #[cfg(msim)]
4757 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4758
4759 for system_package in system_packages {
4760 let modules = system_package.modules().to_vec();
4761 #[cfg(msim)]
4763 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4764 .unwrap_or(modules);
4765
4766 let Some(obj_ref) = iota_framework::compare_system_package(
4767 &self.get_object_store(),
4768 &system_package.id,
4769 &modules,
4770 system_package.dependencies.to_vec(),
4771 binary_config,
4772 )
4773 .await
4774 else {
4775 return vec![];
4776 };
4777 results.push(obj_ref);
4778 }
4779
4780 results
4781 }
4782
4783 async fn get_system_package_bytes(
4800 &self,
4801 system_packages: Vec<ObjectRef>,
4802 binary_config: &BinaryConfig,
4803 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4804 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4805 let objects = self.get_objects(&ids).await;
4806
4807 let mut res = Vec::with_capacity(system_packages.len());
4808 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4809 let prev_transaction = match object {
4810 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4811 info!("Framework {} does not need updating", system_package_ref.0);
4813 continue;
4814 }
4815
4816 Some(cur_object) => cur_object.previous_transaction,
4817 None => TransactionDigest::genesis_marker(),
4818 };
4819
4820 #[cfg(msim)]
4821 let SystemPackage {
4822 id: _,
4823 bytes,
4824 dependencies,
4825 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4826 .unwrap_or_else(|| {
4827 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4828 });
4829
4830 #[cfg(not(msim))]
4831 let SystemPackage {
4832 id: _,
4833 bytes,
4834 dependencies,
4835 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4836
4837 let modules: Vec<_> = bytes
4838 .iter()
4839 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4840 .collect();
4841
4842 let new_object = Object::new_system_package(
4843 &modules,
4844 system_package_ref.1,
4845 dependencies.clone(),
4846 prev_transaction,
4847 );
4848
4849 let new_ref = new_object.compute_object_reference();
4850 if new_ref != system_package_ref {
4851 error!(
4852 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4853 );
4854 return None;
4855 }
4856
4857 res.push((system_package_ref.1, bytes, dependencies));
4858 }
4859
4860 Some(res)
4861 }
4862
4863 fn is_protocol_version_supported_v1(
4867 proposed_protocol_version: ProtocolVersion,
4868 committee: &Committee,
4869 capabilities: Vec<AuthorityCapabilitiesV1>,
4870 mut buffer_stake_bps: u64,
4871 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4872 if buffer_stake_bps > 10000 {
4873 warn!("clamping buffer_stake_bps to 10000");
4874 buffer_stake_bps = 10000;
4875 }
4876
4877 let mut desired_upgrades: Vec<_> = capabilities
4880 .into_iter()
4881 .filter_map(|mut cap| {
4882 if cap.available_system_packages.is_empty() {
4884 return None;
4885 }
4886
4887 cap.available_system_packages.sort();
4888
4889 info!(
4890 "validator {:?} supports {:?} with system packages: {:?}",
4891 cap.authority.concise(),
4892 cap.supported_protocol_versions,
4893 cap.available_system_packages,
4894 );
4895
4896 cap.supported_protocol_versions
4900 .get_version_digest(proposed_protocol_version)
4901 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4902 })
4903 .collect();
4904
4905 desired_upgrades.sort();
4908 desired_upgrades
4909 .into_iter()
4910 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4911 .into_iter()
4912 .find_map(|((digest, packages), group)| {
4913 assert!(!packages.is_empty());
4915
4916 let mut stake_aggregator: StakeAggregator<(), true> =
4917 StakeAggregator::new(Arc::new(committee.clone()));
4918
4919 for (_, _, authority) in group {
4920 stake_aggregator.insert_generic(authority, ());
4921 }
4922
4923 let total_votes = stake_aggregator.total_votes();
4924 let quorum_threshold = committee.quorum_threshold();
4925 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4926
4927 info!(
4928 protocol_config_digest = ?digest,
4929 ?total_votes,
4930 ?quorum_threshold,
4931 ?buffer_stake_bps,
4932 ?effective_threshold,
4933 ?proposed_protocol_version,
4934 ?packages,
4935 "support for upgrade"
4936 );
4937
4938 let has_support = total_votes >= effective_threshold;
4939 has_support.then_some((proposed_protocol_version, digest, packages))
4940 })
4941 }
4942
4943 fn choose_protocol_version_and_system_packages_v1(
4947 current_protocol_version: ProtocolVersion,
4948 current_protocol_digest: Digest,
4949 committee: &Committee,
4950 capabilities: Vec<AuthorityCapabilitiesV1>,
4951 buffer_stake_bps: u64,
4952 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4953 let mut next_protocol_version = current_protocol_version;
4954 let mut system_packages = vec![];
4955 let mut protocol_version_digest = current_protocol_digest;
4956
4957 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4961 next_protocol_version + 1,
4962 committee,
4963 capabilities.clone(),
4964 buffer_stake_bps,
4965 ) {
4966 next_protocol_version = version;
4967 protocol_version_digest = digest;
4968 system_packages = packages;
4969 }
4970
4971 (
4972 next_protocol_version,
4973 protocol_version_digest,
4974 system_packages,
4975 )
4976 }
4977
4978 fn get_validators_supporting_protocol_version(
4983 target_protocol_version: ProtocolVersion,
4984 target_digest: Digest,
4985 active_validators: &[AuthorityPublicKey],
4986 capabilities: &[AuthorityCapabilitiesV1],
4987 ) -> Vec<u64> {
4988 let mut eligible_validators = Vec::new();
4989
4990 for capability in capabilities {
4991 if let Some(digest) = capability
4993 .supported_protocol_versions
4994 .get_version_digest(target_protocol_version)
4995 {
4996 if digest == target_digest {
4997 if let Some(index) = active_validators
4999 .iter()
5000 .position(|name| AuthorityName::from(name) == capability.authority)
5001 {
5002 eligible_validators.push(index as u64);
5003 }
5004 }
5005 }
5006 }
5007
5008 eligible_validators.sort();
5010 eligible_validators
5011 }
5012
5013 fn calculate_eligible_validators_weight(
5018 eligible_validator_indices: &[u64],
5019 active_validators: &[AuthorityPublicKey],
5020 committee: &Committee,
5021 ) -> u64 {
5022 let mut total_weight = 0u64;
5023
5024 for &index in eligible_validator_indices {
5025 let authority_pubkey = &active_validators[index as usize];
5026 if let Some((_, weight)) = committee
5028 .members()
5029 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5030 {
5031 total_weight += weight;
5032 }
5033 }
5034
5035 total_weight
5036 }
5037
5038 #[instrument(level = "debug", skip_all)]
5039 fn create_authenticator_state_tx(
5040 &self,
5041 epoch_store: &Arc<AuthorityPerEpochStore>,
5042 ) -> Option<EndOfEpochTransactionKind> {
5043 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
5044 info!("authenticator state transactions not enabled");
5045 return None;
5046 }
5047
5048 let authenticator_state_exists = epoch_store.authenticator_state_exists();
5049 let tx = if authenticator_state_exists {
5050 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
5051 let min_epoch =
5052 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
5053 let authenticator_obj_initial_shared_version = epoch_store
5054 .epoch_start_config()
5055 .authenticator_obj_initial_shared_version()
5056 .expect("initial version must exist");
5057
5058 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
5059 min_epoch,
5060 authenticator_obj_initial_shared_version,
5061 );
5062
5063 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
5064
5065 tx
5066 } else {
5067 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
5068 info!("Creating AuthenticatorStateCreate tx");
5069 tx
5070 };
5071 Some(tx)
5072 }
5073
5074 #[instrument(level = "error", skip_all)]
5087 pub async fn create_and_execute_advance_epoch_tx(
5088 &self,
5089 epoch_store: &Arc<AuthorityPerEpochStore>,
5090 gas_cost_summary: &GasCostSummary,
5091 checkpoint: CheckpointSequenceNumber,
5092 epoch_start_timestamp_ms: CheckpointTimestamp,
5093 scores: Vec<u64>,
5094 ) -> anyhow::Result<(
5095 IotaSystemState,
5096 Option<SystemEpochInfoEvent>,
5097 TransactionEffects,
5098 )> {
5099 let mut txns = Vec::new();
5100
5101 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
5102 txns.push(tx);
5103 }
5104
5105 let next_epoch = epoch_store.epoch() + 1;
5106
5107 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5108 let authority_capabilities = epoch_store
5109 .get_capabilities_v1()
5110 .expect("read capabilities from db cannot fail");
5111 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5112 Self::choose_protocol_version_and_system_packages_v1(
5113 epoch_store.protocol_version(),
5114 SupportedProtocolVersionsWithHashes::protocol_config_digest(
5115 epoch_store.protocol_config(),
5116 ),
5117 epoch_store.committee(),
5118 authority_capabilities.clone(),
5119 buffer_stake_bps,
5120 );
5121
5122 let config = epoch_store.protocol_config();
5126 let binary_config = to_binary_config(config);
5127 let Some(next_epoch_system_package_bytes) = self
5128 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5129 .await
5130 else {
5131 error!(
5132 "upgraded system packages {:?} are not locally available, cannot create \
5133 ChangeEpochTx. validator binary must be upgraded to the correct version!",
5134 next_epoch_system_packages
5135 );
5136 bail!("missing system packages: cannot form ChangeEpochTx");
5146 };
5147
5148 if config.select_committee_from_eligible_validators() {
5151 let active_validators = epoch_store.epoch_start_state().get_active_validators();
5153
5154 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5155
5156 if config.select_committee_supporting_next_epoch_version() {
5160 eligible_active_validators = Self::get_validators_supporting_protocol_version(
5161 next_epoch_protocol_version,
5162 next_epoch_protocol_digest,
5163 &active_validators,
5164 &authority_capabilities,
5165 );
5166
5167 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5169 &eligible_active_validators,
5170 &active_validators,
5171 epoch_store.committee(),
5172 );
5173
5174 let committee = epoch_store.committee();
5178 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5179
5180 if eligible_validators_weight < effective_threshold {
5181 error!(
5182 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5183 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5184 );
5185 eligible_active_validators = (0..active_validators.len() as u64).collect();
5188 }
5189 }
5190
5191 if config.pass_validator_scores_to_advance_epoch() {
5194 txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5195 next_epoch,
5196 next_epoch_protocol_version,
5197 gas_cost_summary.storage_cost,
5198 gas_cost_summary.computation_cost,
5199 gas_cost_summary.computation_cost_burned,
5200 gas_cost_summary.storage_rebate,
5201 gas_cost_summary.non_refundable_storage_fee,
5202 epoch_start_timestamp_ms,
5203 next_epoch_system_package_bytes,
5204 eligible_active_validators,
5205 scores,
5206 config.adjust_rewards_by_score(),
5207 ));
5208 } else {
5209 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5210 next_epoch,
5211 next_epoch_protocol_version,
5212 gas_cost_summary.storage_cost,
5213 gas_cost_summary.computation_cost,
5214 gas_cost_summary.computation_cost_burned,
5215 gas_cost_summary.storage_rebate,
5216 gas_cost_summary.non_refundable_storage_fee,
5217 epoch_start_timestamp_ms,
5218 next_epoch_system_package_bytes,
5219 eligible_active_validators,
5220 ));
5221 }
5222 } else if config.protocol_defined_base_fee()
5223 && config.max_committee_members_count_as_option().is_some()
5224 {
5225 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5226 next_epoch,
5227 next_epoch_protocol_version,
5228 gas_cost_summary.storage_cost,
5229 gas_cost_summary.computation_cost,
5230 gas_cost_summary.computation_cost_burned,
5231 gas_cost_summary.storage_rebate,
5232 gas_cost_summary.non_refundable_storage_fee,
5233 epoch_start_timestamp_ms,
5234 next_epoch_system_package_bytes,
5235 ));
5236 } else {
5237 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5238 next_epoch,
5239 next_epoch_protocol_version,
5240 gas_cost_summary.storage_cost,
5241 gas_cost_summary.computation_cost,
5242 gas_cost_summary.storage_rebate,
5243 gas_cost_summary.non_refundable_storage_fee,
5244 epoch_start_timestamp_ms,
5245 next_epoch_system_package_bytes,
5246 ));
5247 }
5248
5249 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5250
5251 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5252 tx.clone(),
5253 epoch_store.epoch(),
5254 checkpoint,
5255 );
5256
5257 let tx_digest = executable_tx.digest();
5258
5259 info!(
5260 ?next_epoch,
5261 ?next_epoch_protocol_version,
5262 ?next_epoch_system_packages,
5263 computation_cost=?gas_cost_summary.computation_cost,
5264 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5265 storage_cost=?gas_cost_summary.storage_cost,
5266 storage_rebate=?gas_cost_summary.storage_rebate,
5267 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5268 ?tx_digest,
5269 "Creating advance epoch transaction"
5270 );
5271
5272 fail_point_async!("change_epoch_tx_delay");
5273 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5274
5275 if self
5279 .get_transaction_cache_reader()
5280 .try_is_tx_already_executed(tx_digest)?
5281 {
5282 warn!("change epoch tx has already been executed via state sync");
5283 bail!("change epoch tx has already been executed via state sync",);
5284 }
5285
5286 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5287
5288 epoch_store.assign_shared_object_versions_idempotent(
5292 self.get_object_cache_reader().as_ref(),
5293 std::slice::from_ref(&executable_tx),
5294 )?;
5295
5296 let (input_objects, _) =
5297 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5298
5299 let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5300 &execution_guard,
5301 &executable_tx,
5302 input_objects,
5303 vec![],
5304 epoch_store,
5305 )?;
5306 let system_obj = get_iota_system_state(&temporary_store.written)
5307 .expect("change epoch tx must write to system object");
5308 let system_epoch_info_event = temporary_store
5310 .events
5311 .data
5312 .into_iter()
5313 .find(|event| event.is_system_epoch_info_event())
5314 .map(SystemEpochInfoEvent::from);
5315 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5318
5319 self.get_state_sync_store()
5323 .try_insert_transaction_and_effects(&tx, &effects)
5324 .map_err(|err| {
5325 let err: anyhow::Error = err.into();
5326 err
5327 })?;
5328
5329 info!(
5330 "Effects summary of the change epoch transaction: {:?}",
5331 effects.summary_for_debug()
5332 );
5333 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5334 assert!(effects.status().is_ok());
5336 Ok((system_obj, system_epoch_info_event, effects))
5337 }
5338
5339 #[instrument(level = "error", skip_all)]
5343 async fn revert_uncommitted_epoch_transactions(
5344 &self,
5345 epoch_store: &AuthorityPerEpochStore,
5346 ) -> IotaResult {
5347 {
5348 let state = epoch_store.get_reconfig_state_write_lock_guard();
5349 if state.should_accept_user_certs() {
5350 epoch_store.close_user_certs(state);
5359 }
5360 }
5362 let pending_certificates = epoch_store.pending_consensus_certificates();
5363 info!(
5364 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5365 pending_certificates.len(),
5366 pending_certificates,
5367 );
5368 for digest in pending_certificates {
5369 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5370 info!(
5371 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5372 digest
5373 );
5374 continue;
5375 }
5376 info!("Reverting {:?} at the end of epoch", digest);
5377 epoch_store.revert_executed_transaction(&digest)?;
5378 self.get_reconfig_api().try_revert_state_update(&digest)?;
5379 }
5380 info!("All uncommitted local transactions reverted");
5381 Ok(())
5382 }
5383
5384 #[instrument(level = "error", skip_all)]
5385 async fn reopen_epoch_db(
5386 &self,
5387 cur_epoch_store: &AuthorityPerEpochStore,
5388 new_committee: Committee,
5389 epoch_start_configuration: EpochStartConfiguration,
5390 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5391 epoch_last_checkpoint: CheckpointSequenceNumber,
5392 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5393 let new_epoch = new_committee.epoch;
5394 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5395 assert_eq!(
5396 epoch_start_configuration.epoch_start_state().epoch(),
5397 new_committee.epoch
5398 );
5399 fail_point!("before-open-new-epoch-store");
5400 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5401 self.name,
5402 new_committee,
5403 epoch_start_configuration,
5404 self.get_backing_package_store().clone(),
5405 self.get_object_store().clone(),
5406 expensive_safety_check_config,
5407 epoch_last_checkpoint,
5408 )?;
5409 self.epoch_store.store(new_epoch_store.clone());
5410 Ok(new_epoch_store)
5411 }
5412
5413 fn check_move_account(
5416 &self,
5417 auth_account_object_id: ObjectID,
5418 auth_account_object_seq_number: Option<SequenceNumber>,
5419 auth_account_object_digest: Option<ObjectDigest>,
5420 account_object: ObjectReadResult,
5421 signer: &IotaAddress,
5422 ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5423 let account_object = match account_object.object {
5424 ObjectReadResultKind::Object(object) => Ok(object),
5425 ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5426 Err(UserInputError::AccountObjectDeleted {
5427 account_id: account_object.id(),
5428 account_version: version,
5429 transaction_digest: digest,
5430 })
5431 }
5432 ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5435 Err(UserInputError::AccountObjectInCanceledTransaction {
5436 account_id: account_object.id(),
5437 account_version: version,
5438 })
5439 }
5440 }?;
5441
5442 let account_object_addr = IotaAddress::from(auth_account_object_id);
5443
5444 fp_ensure!(
5445 signer == &account_object_addr,
5446 UserInputError::IncorrectUserSignature {
5447 error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5448 }
5449 .into()
5450 );
5451
5452 fp_ensure!(
5453 account_object.is_shared() || account_object.is_immutable(),
5454 UserInputError::AccountObjectNotSupported {
5455 object_id: auth_account_object_id
5456 }
5457 .into()
5458 );
5459
5460 let auth_account_object_seq_number =
5461 if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5462 let account_object_version = account_object.version();
5463
5464 fp_ensure!(
5465 account_object_version == auth_account_object_seq_number,
5466 UserInputError::AccountObjectVersionMismatch {
5467 object_id: auth_account_object_id,
5468 expected_version: auth_account_object_seq_number,
5469 actual_version: account_object_version,
5470 }
5471 .into()
5472 );
5473
5474 auth_account_object_seq_number
5475 } else {
5476 account_object.version()
5477 };
5478
5479 if let Some(auth_account_object_digest) = auth_account_object_digest {
5480 let expected_digest = account_object.digest();
5481 fp_ensure!(
5482 expected_digest == auth_account_object_digest,
5483 UserInputError::InvalidAccountObjectDigest {
5484 object_id: auth_account_object_id,
5485 expected_digest,
5486 actual_digest: auth_account_object_digest,
5487 }
5488 .into()
5489 );
5490 }
5491
5492 let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5493 auth_account_object_id,
5494 &AuthenticatorFunctionRefV1Key::tag().into(),
5495 &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5496 )
5497 .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5498 account_object_id: auth_account_object_id,
5499 })?;
5500
5501 let authenticator_function_ref_field = self
5502 .get_object_cache_reader()
5503 .try_find_object_lt_or_eq_version(
5504 authenticator_function_ref_field_id,
5505 auth_account_object_seq_number,
5506 )?;
5507
5508 if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5509 let field_move_object = authenticator_function_ref_field_obj
5510 .data
5511 .try_as_move()
5512 .expect("dynamic field should never be a package object");
5513
5514 let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5515 field_move_object.to_rust().ok_or(
5516 UserInputError::InvalidAuthenticatorFunctionRefField {
5517 account_object_id: auth_account_object_id,
5518 },
5519 )?;
5520
5521 Ok(AuthenticatorFunctionRefForExecution::new_v1(
5522 field.value,
5523 authenticator_function_ref_field_obj.compute_object_reference(),
5524 authenticator_function_ref_field_obj.owner,
5525 authenticator_function_ref_field_obj.storage_rebate,
5526 authenticator_function_ref_field_obj.previous_transaction,
5527 ))
5528 } else {
5529 Err(UserInputError::MoveAuthenticatorNotFound {
5530 authenticator_function_ref_id: authenticator_function_ref_field_id,
5531 account_object_id: auth_account_object_id,
5532 account_object_version: auth_account_object_seq_number,
5533 }
5534 .into())
5535 }
5536 }
5537
5538 #[allow(clippy::type_complexity)]
5539 fn read_objects_for_signing(
5540 &self,
5541 transaction: &VerifiedTransaction,
5542 epoch: u64,
5543 ) -> IotaResult<(
5544 InputObjects,
5545 ReceivingObjects,
5546 Vec<(InputObjects, ObjectReadResult)>,
5547 )> {
5548 let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5549 Some(transaction.digest()),
5550 &transaction.collect_all_input_object_kind_for_reading()?,
5551 &transaction.data().transaction_data().receiving_objects(),
5552 epoch,
5553 )?;
5554
5555 transaction
5556 .split_input_objects_into_groups_for_reading(input_objects)
5557 .map(|(tx_input_objects, per_authenticator_inputs)| {
5558 (
5559 tx_input_objects,
5560 tx_receiving_objects,
5561 per_authenticator_inputs,
5562 )
5563 })
5564 }
5565
5566 #[allow(clippy::type_complexity)]
5567 fn check_transaction_inputs_for_signing(
5568 &self,
5569 protocol_config: &ProtocolConfig,
5570 reference_gas_price: u64,
5571 tx_data: &TransactionData,
5572 tx_input_objects: InputObjects,
5573 tx_receiving_objects: &ReceivingObjects,
5574 move_authenticators: &Vec<&MoveAuthenticator>,
5575 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5576 ) -> IotaResult<(
5577 IotaGasStatus,
5578 CheckedInputObjects,
5579 Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5580 )> {
5581 let authenticator_gas_budget = if move_authenticators.is_empty() {
5582 0
5583 } else {
5584 protocol_config.max_auth_gas()
5587 };
5588
5589 debug_assert_eq!(
5590 move_authenticators.len(),
5591 per_authenticator_inputs.len(),
5592 "Move authenticators amount must match the number of authenticator inputs"
5593 );
5594
5595 let per_authenticator_checked_inputs = move_authenticators
5596 .iter()
5597 .zip(per_authenticator_inputs)
5598 .map(
5599 |(move_authenticator, (authenticator_input_objects, account_object))| {
5600 let (
5602 auth_account_object_id,
5603 auth_account_object_seq_number,
5604 auth_account_object_digest,
5605 ) = move_authenticator.object_to_authenticate_components()?;
5606
5607 let signer = move_authenticator.address()?;
5608
5609 let AuthenticatorFunctionRefForExecution {
5611 authenticator_function_ref,
5612 ..
5613 } = self.check_move_account(
5614 auth_account_object_id,
5615 auth_account_object_seq_number,
5616 auth_account_object_digest,
5617 account_object,
5618 &signer,
5619 )?;
5620
5621 let authenticator_checked_input_objects =
5623 iota_transaction_checks::check_move_authenticator_input_for_signing(
5624 authenticator_input_objects,
5625 )?;
5626
5627 Ok((
5628 authenticator_checked_input_objects,
5629 authenticator_function_ref,
5630 ))
5631 },
5632 )
5633 .collect::<IotaResult<Vec<_>>>()?;
5634
5635 let (gas_status, tx_checked_input_objects) =
5637 iota_transaction_checks::check_transaction_input(
5638 protocol_config,
5639 reference_gas_price,
5640 tx_data,
5641 tx_input_objects,
5642 tx_receiving_objects,
5643 &self.metrics.bytecode_verifier_metrics,
5644 &self.config.verifier_signing_config,
5645 authenticator_gas_budget,
5646 )?;
5647
5648 Ok((
5649 gas_status,
5650 tx_checked_input_objects,
5651 per_authenticator_checked_inputs,
5652 ))
5653 }
5654
5655 #[cfg(test)]
5656 pub(crate) fn iter_live_object_set_for_testing(
5657 &self,
5658 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5659 self.get_accumulator_store()
5660 .iter_cached_live_object_set_for_testing()
5661 }
5662
5663 #[cfg(test)]
5664 pub(crate) fn shutdown_execution_for_test(&self) {
5665 self.tx_execution_shutdown
5666 .lock()
5667 .take()
5668 .unwrap()
5669 .send(())
5670 .unwrap();
5671 }
5672
5673 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5676 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5677 self.get_object_cache_reader()
5678 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5679 self.get_reconfig_api()
5680 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5681 }
5682}
5683
5684pub struct RandomnessRoundReceiver {
5685 authority_state: Arc<AuthorityState>,
5686 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5687}
5688
5689impl RandomnessRoundReceiver {
5690 pub fn spawn(
5691 authority_state: Arc<AuthorityState>,
5692 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5693 ) -> JoinHandle<()> {
5694 let rrr = RandomnessRoundReceiver {
5695 authority_state,
5696 randomness_rx,
5697 };
5698 spawn_monitored_task!(rrr.run())
5699 }
5700
5701 async fn run(mut self) {
5702 info!("RandomnessRoundReceiver event loop started");
5703
5704 loop {
5705 tokio::select! {
5706 maybe_recv = self.randomness_rx.recv() => {
5707 if let Some((epoch, round, bytes)) = maybe_recv {
5708 self.handle_new_randomness(epoch, round, bytes);
5709 } else {
5710 break;
5711 }
5712 },
5713 }
5714 }
5715
5716 info!("RandomnessRoundReceiver event loop ended");
5717 }
5718
5719 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5720 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5721 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5722 if epoch_store.epoch() != epoch {
5723 warn!(
5724 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5725 epoch_store.epoch()
5726 );
5727 return;
5728 }
5729 let transaction = VerifiedTransaction::new_randomness_state_update(
5730 epoch,
5731 round,
5732 bytes,
5733 epoch_store
5734 .epoch_start_config()
5735 .randomness_obj_initial_shared_version(),
5736 );
5737 debug!(
5738 "created randomness state update transaction with digest: {:?}",
5739 transaction.digest()
5740 );
5741 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5742 let digest = *transaction.digest();
5743
5744 self.authority_state
5749 .get_cache_commit()
5750 .persist_transaction(&transaction);
5751
5752 self.authority_state
5754 .transaction_manager()
5755 .enqueue(vec![transaction], &epoch_store);
5756
5757 let authority_state = self.authority_state.clone();
5758 spawn_monitored_task!(async move {
5759 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5768 let result = tokio::time::timeout(
5769 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5770 authority_state
5771 .get_transaction_cache_reader()
5772 .try_notify_read_executed_effects(&[digest]),
5773 )
5774 .await;
5775 let result = match result {
5776 Ok(result) => result,
5777 Err(_) => {
5778 if cfg!(debug_assertions) {
5779 panic!(
5781 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5782 );
5783 }
5784 warn!(
5785 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5786 );
5787 authority_state
5789 .get_transaction_cache_reader()
5790 .try_notify_read_executed_effects(&[digest])
5791 .await
5792 }
5793 };
5794
5795 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5796 let effects = effects.pop().expect("should return effects");
5797 if *effects.status() != ExecutionStatus::Success {
5798 fatal!(
5799 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5800 );
5801 }
5802 debug!(
5803 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5804 );
5805 });
5806 }
5807}
5808
5809#[async_trait]
5810impl TransactionKeyValueStoreTrait for AuthorityState {
5811 async fn multi_get(
5812 &self,
5813 transaction_keys: &[TransactionDigest],
5814 effects_keys: &[TransactionDigest],
5815 ) -> IotaResult<KVStoreTransactionData> {
5816 let txns = if !transaction_keys.is_empty() {
5817 self.get_transaction_cache_reader()
5818 .try_multi_get_transaction_blocks(transaction_keys)?
5819 .into_iter()
5820 .map(|t| t.map(|t| (*t).clone().into_inner()))
5821 .collect()
5822 } else {
5823 vec![]
5824 };
5825
5826 let fx = if !effects_keys.is_empty() {
5827 self.get_transaction_cache_reader()
5828 .try_multi_get_executed_effects(effects_keys)?
5829 } else {
5830 vec![]
5831 };
5832
5833 Ok((txns, fx))
5834 }
5835
5836 async fn multi_get_checkpoints(
5837 &self,
5838 checkpoint_summaries: &[CheckpointSequenceNumber],
5839 checkpoint_contents: &[CheckpointSequenceNumber],
5840 checkpoint_summaries_by_digest: &[CheckpointDigest],
5841 ) -> IotaResult<(
5842 Vec<Option<CertifiedCheckpointSummary>>,
5843 Vec<Option<CheckpointContents>>,
5844 Vec<Option<CertifiedCheckpointSummary>>,
5845 )> {
5846 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5848 let store = self.get_checkpoint_store();
5849 for seq in checkpoint_summaries {
5850 let checkpoint = store
5851 .get_checkpoint_by_sequence_number(*seq)?
5852 .map(|c| c.into_inner());
5853
5854 summaries.push(checkpoint);
5855 }
5856
5857 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5858 for seq in checkpoint_contents {
5859 let checkpoint = store
5860 .get_checkpoint_by_sequence_number(*seq)?
5861 .and_then(|summary| {
5862 store
5863 .get_checkpoint_contents(&summary.content_digest)
5864 .expect("db read cannot fail")
5865 });
5866 contents.push(checkpoint);
5867 }
5868
5869 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5870 for digest in checkpoint_summaries_by_digest {
5871 let checkpoint = store
5872 .get_checkpoint_by_digest(digest)?
5873 .map(|c| c.into_inner());
5874 summaries_by_digest.push(checkpoint);
5875 }
5876
5877 Ok((summaries, contents, summaries_by_digest))
5878 }
5879
5880 async fn get_transaction_perpetual_checkpoint(
5881 &self,
5882 digest: TransactionDigest,
5883 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5884 self.get_checkpoint_cache()
5885 .try_get_transaction_perpetual_checkpoint(&digest)
5886 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5887 }
5888
5889 async fn get_object(
5890 &self,
5891 object_id: ObjectID,
5892 version: VersionNumber,
5893 ) -> IotaResult<Option<Object>> {
5894 self.get_object_cache_reader()
5895 .try_get_object_by_key(&object_id, version)
5896 }
5897
5898 #[instrument(skip_all)]
5899 async fn multi_get_objects(
5900 &self,
5901 object_keys: &[ObjectKey],
5902 ) -> IotaResult<Vec<Option<Object>>> {
5903 Ok(self
5904 .get_object_cache_reader()
5905 .multi_get_objects_by_key(object_keys))
5906 }
5907
5908 async fn multi_get_transactions_perpetual_checkpoints(
5909 &self,
5910 digests: &[TransactionDigest],
5911 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5912 let res = self
5913 .get_checkpoint_cache()
5914 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5915
5916 Ok(res
5917 .into_iter()
5918 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5919 .collect())
5920 }
5921
5922 #[instrument(skip(self))]
5923 async fn multi_get_events_by_tx_digests(
5924 &self,
5925 digests: &[TransactionDigest],
5926 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5927 if digests.is_empty() {
5928 return Ok(vec![]);
5929 }
5930
5931 Ok(self
5932 .get_transaction_cache_reader()
5933 .multi_get_events(digests))
5934 }
5935}
5936
5937#[cfg(msim)]
5938pub mod framework_injection {
5939 use std::{
5940 cell::RefCell,
5941 collections::{BTreeMap, BTreeSet},
5942 };
5943
5944 use iota_framework::{BuiltInFramework, SystemPackage};
5945 use iota_types::{
5946 base_types::{AuthorityName, ObjectID},
5947 is_system_package,
5948 };
5949 use move_binary_format::CompiledModule;
5950
5951 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5952
5953 thread_local! {
5955 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5956 }
5957
5958 type Framework = Vec<CompiledModule>;
5959
5960 pub type PackageUpgradeCallback =
5961 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5962
5963 enum PackageOverrideConfig {
5964 Global(Framework),
5965 PerValidator(PackageUpgradeCallback),
5966 }
5967
5968 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5969 modules
5970 .iter()
5971 .map(|m| {
5972 let mut buf = Vec::new();
5973 m.serialize_with_version(m.version, &mut buf).unwrap();
5974 buf
5975 })
5976 .collect()
5977 }
5978
5979 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5980 OVERRIDE.with(|bs| {
5981 bs.borrow_mut()
5982 .insert(package_id, PackageOverrideConfig::Global(modules))
5983 });
5984 }
5985
5986 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5987 OVERRIDE.with(|bs| {
5988 bs.borrow_mut()
5989 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5990 });
5991 }
5992
5993 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5994 OVERRIDE.with(|cfg| {
5995 cfg.borrow().get(package_id).and_then(|entry| match entry {
5996 PackageOverrideConfig::Global(framework) => {
5997 Some(compiled_modules_to_bytes(framework))
5998 }
5999 PackageOverrideConfig::PerValidator(func) => {
6000 func(name).map(|fw| compiled_modules_to_bytes(&fw))
6001 }
6002 })
6003 })
6004 }
6005
6006 pub fn get_override_modules(
6007 package_id: &ObjectID,
6008 name: AuthorityName,
6009 ) -> Option<Vec<CompiledModule>> {
6010 OVERRIDE.with(|cfg| {
6011 cfg.borrow().get(package_id).and_then(|entry| match entry {
6012 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
6013 PackageOverrideConfig::PerValidator(func) => func(name),
6014 })
6015 })
6016 }
6017
6018 pub fn get_override_system_package(
6019 package_id: &ObjectID,
6020 name: AuthorityName,
6021 ) -> Option<SystemPackage> {
6022 let bytes = get_override_bytes(package_id, name)?;
6023 let dependencies = if is_system_package(*package_id) {
6024 BuiltInFramework::get_package_by_id(package_id)
6025 .dependencies
6026 .to_vec()
6027 } else {
6028 BuiltInFramework::all_package_ids()
6031 };
6032 Some(SystemPackage {
6033 id: *package_id,
6034 bytes,
6035 dependencies,
6036 })
6037 }
6038
6039 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6040 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6041 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6042 cfg.borrow()
6043 .keys()
6044 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6045 .collect()
6046 });
6047
6048 extra
6049 .into_iter()
6050 .map(|package| SystemPackage {
6051 id: package,
6052 bytes: get_override_bytes(&package, name).unwrap(),
6053 dependencies: BuiltInFramework::all_package_ids(),
6054 })
6055 .collect()
6056 }
6057}
6058
6059#[derive(Debug, Serialize, Deserialize, Clone)]
6060pub struct ObjDumpFormat {
6061 pub id: ObjectID,
6062 pub version: VersionNumber,
6063 pub digest: ObjectDigest,
6064 pub object: Object,
6065}
6066
6067impl ObjDumpFormat {
6068 fn new(object: Object) -> Self {
6069 let oref = object.compute_object_reference();
6070 Self {
6071 id: oref.0,
6072 version: oref.1,
6073 digest: oref.2,
6074 object,
6075 }
6076 }
6077}
6078
6079#[derive(Debug, Serialize, Deserialize, Clone)]
6080pub struct NodeStateDump {
6081 pub tx_digest: TransactionDigest,
6082 pub sender_signed_data: SenderSignedData,
6083 pub executed_epoch: u64,
6084 pub reference_gas_price: u64,
6085 pub protocol_version: u64,
6086 pub epoch_start_timestamp_ms: u64,
6087 pub computed_effects: TransactionEffects,
6088 pub expected_effects_digest: TransactionEffectsDigest,
6089 pub relevant_system_packages: Vec<ObjDumpFormat>,
6090 pub shared_objects: Vec<ObjDumpFormat>,
6091 pub loaded_child_objects: Vec<ObjDumpFormat>,
6092 pub modified_at_versions: Vec<ObjDumpFormat>,
6093 pub runtime_reads: Vec<ObjDumpFormat>,
6094 pub input_objects: Vec<ObjDumpFormat>,
6095}
6096
6097impl NodeStateDump {
6098 pub fn new(
6099 tx_digest: &TransactionDigest,
6100 effects: &TransactionEffects,
6101 expected_effects_digest: TransactionEffectsDigest,
6102 object_store: &dyn ObjectStore,
6103 epoch_store: &Arc<AuthorityPerEpochStore>,
6104 inner_temporary_store: &InnerTemporaryStore,
6105 certificate: &VerifiedExecutableTransaction,
6106 ) -> IotaResult<Self> {
6107 let executed_epoch = epoch_store.epoch();
6109 let reference_gas_price = epoch_store.reference_gas_price();
6110 let epoch_start_config = epoch_store.epoch_start_config();
6111 let protocol_version = epoch_store.protocol_version().as_u64();
6112 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6113
6114 let mut relevant_system_packages = Vec::new();
6116 for sys_package_id in BuiltInFramework::all_package_ids() {
6117 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6118 relevant_system_packages.push(ObjDumpFormat::new(w))
6119 }
6120 }
6121
6122 let mut shared_objects = Vec::new();
6124 for kind in effects.input_shared_objects() {
6125 match kind {
6126 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6127 if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
6128 shared_objects.push(ObjDumpFormat::new(w))
6129 }
6130 }
6131 InputSharedObject::ReadDeleted(..)
6132 | InputSharedObject::MutateDeleted(..)
6133 | InputSharedObject::Cancelled(..) => (), }
6136 }
6137
6138 let mut loaded_child_objects = Vec::new();
6141 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6142 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6143 loaded_child_objects.push(ObjDumpFormat::new(w))
6144 }
6145 }
6146
6147 let mut modified_at_versions = Vec::new();
6149 for (id, ver) in effects.modified_at_versions() {
6150 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6151 modified_at_versions.push(ObjDumpFormat::new(w))
6152 }
6153 }
6154
6155 let mut runtime_reads = Vec::new();
6159 for obj in inner_temporary_store
6160 .runtime_packages_loaded_from_db
6161 .values()
6162 {
6163 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6164 }
6165
6166 Ok(Self {
6169 tx_digest: *tx_digest,
6170 executed_epoch,
6171 reference_gas_price,
6172 epoch_start_timestamp_ms,
6173 protocol_version,
6174 relevant_system_packages,
6175 shared_objects,
6176 loaded_child_objects,
6177 modified_at_versions,
6178 runtime_reads,
6179 sender_signed_data: certificate.clone().into_message(),
6180 input_objects: inner_temporary_store
6181 .input_objects
6182 .values()
6183 .map(|o| ObjDumpFormat::new(o.clone()))
6184 .collect(),
6185 computed_effects: effects.clone(),
6186 expected_effects_digest,
6187 })
6188 }
6189
6190 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6191 let mut objects = Vec::new();
6192 objects.extend(self.relevant_system_packages.clone());
6193 objects.extend(self.shared_objects.clone());
6194 objects.extend(self.loaded_child_objects.clone());
6195 objects.extend(self.modified_at_versions.clone());
6196 objects.extend(self.runtime_reads.clone());
6197 objects.extend(self.input_objects.clone());
6198 objects
6199 }
6200
6201 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6202 let file_name = format!(
6203 "{}_{}_NODE_DUMP.json",
6204 self.tx_digest,
6205 AuthorityState::unixtime_now_ms()
6206 );
6207 let mut path = path.to_path_buf();
6208 path.push(&file_name);
6209 let mut file = File::create(path.clone())?;
6210 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6211 Ok(path)
6212 }
6213
6214 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6215 let file = File::open(path)?;
6216 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6217 }
6218}