1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage as FrameworkSystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49 key_value_store::{
50 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51 },
52 key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57 account_abstraction::{
58 account::AuthenticatorFunctionRefV1Key,
59 authenticator_function::{
60 AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
61 AuthenticatorFunctionRefV1,
62 },
63 },
64 base_types::{
65 AuthorityName, ConciseableName, IotaAddress, ObjectID, ObjectInfo, ObjectRef, ObjectType,
66 SequenceNumber, StructTag, TypeTag, VersionNumber,
67 },
68 committee::{Committee, EpochId, ProtocolVersion},
69 crypto::{AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer},
70 deny_list_v1::check_coin_deny_list_v1_during_signing,
71 digests::{ChainIdentifier, Digest, ObjectDigest, TransactionDigest, TransactionEffectsDigest},
72 dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
73 effects::{
74 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
75 TransactionEffectsExt, TransactionEvents, VerifiedSignedTransactionEffects,
76 },
77 error::{ExecutionError, IotaError, IotaResult, UserInputError},
78 event::{Event, EventID, SystemEpochInfoEvent},
79 executable_transaction::VerifiedExecutableTransaction,
80 execution_config_utils::to_binary_config,
81 execution_status::ExecutionStatus,
82 fp_ensure,
83 gas::{GasCostSummary, IotaGasStatus},
84 gas_coin::NANOS_PER_IOTA,
85 inner_temporary_store::{
86 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
87 WrittenObjects,
88 },
89 iota_sdk_types_conversions::type_tag_core_to_sdk,
90 iota_system_state::{
91 IotaSystemState, IotaSystemStateTrait,
92 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
93 },
94 layout_resolver::{LayoutResolver, into_struct_layout},
95 message_envelope::Message,
96 messages_checkpoint::{
97 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
98 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
99 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
100 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
101 },
102 messages_consensus::AuthorityCapabilitiesV1,
103 messages_grpc::{
104 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
105 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
106 TransactionStatus,
107 },
108 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
109 move_authenticator::MoveAuthenticator,
110 object::{
111 MoveObject, MoveObjectExt, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
112 bounded_visitor::BoundedVisitor,
113 },
114 storage::{
115 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
116 },
117 supported_protocol_versions::{
118 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
119 },
120 transaction::*,
121 transaction_executor::{SimulateTransactionResult, VmChecks},
122};
123use itertools::Itertools;
124use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
125use move_core_types::{
126 account_address::AccountAddress, annotated_value::MoveStructLayout, language_storage::ModuleId,
127};
128use parking_lot::Mutex;
129use prometheus::{
130 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131 register_histogram_vec_with_registry, register_histogram_with_registry,
132 register_int_counter_vec_with_registry, register_int_counter_with_registry,
133 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139 task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152 authority::{
153 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157 authority_store_tables::AuthorityPrunerTables,
158 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159 },
160 authority_client::NetworkAuthorityClient,
161 checkpoint_progress_tracker::CheckpointProgressTracker,
162 checkpoints::CheckpointStore,
163 congestion_tracker::CongestionTracker,
164 consensus_adapter::ConsensusAdapter,
165 epoch::committee_store::CommitteeStore,
166 execution_cache::{
167 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
168 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
169 TransactionCacheRead,
170 },
171 execution_driver::execution_process,
172 global_state_hasher::{GlobalStateHashStore, GlobalStateHasher},
173 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
174 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
175 metrics::{LatencyObserver, RateTracker},
176 module_cache_metrics::ResolverMetrics,
177 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
178 stake_aggregator::StakeAggregator,
179 subscription_handler::SubscriptionHandler,
180 transaction_input_loader::TransactionInputLoader,
181 transaction_manager::TransactionManager,
182 transaction_outputs::TransactionOutputs,
183 validator_tx_finalizer::ValidatorTxFinalizer,
184 verify_indexes::verify_indexes,
185};
186
187#[cfg(test)]
188#[path = "unit_tests/authority_tests.rs"]
189pub mod authority_tests;
190
191#[cfg(test)]
192#[path = "unit_tests/transaction_tests.rs"]
193pub mod transaction_tests;
194
195#[cfg(test)]
196#[path = "unit_tests/batch_transaction_tests.rs"]
197mod batch_transaction_tests;
198
199#[cfg(test)]
200#[path = "unit_tests/move_integration_tests.rs"]
201pub mod move_integration_tests;
202
203#[cfg(test)]
204#[path = "unit_tests/gas_tests.rs"]
205mod gas_tests;
206
207#[cfg(test)]
208#[path = "unit_tests/batch_verification_tests.rs"]
209mod batch_verification_tests;
210
211#[cfg(test)]
212#[path = "unit_tests/coin_deny_list_tests.rs"]
213mod coin_deny_list_tests;
214
215#[cfg(test)]
216#[path = "unit_tests/auth_unit_test_utils.rs"]
217pub mod auth_unit_test_utils;
218
219pub mod authority_test_utils;
220
221pub mod authority_per_epoch_store;
222pub mod authority_per_epoch_store_pruner;
223
224mod authority_store_migrations;
225pub mod authority_store_pruner;
226pub mod authority_store_tables;
227pub mod authority_store_types;
228pub mod epoch_start_configuration;
229pub mod shared_object_congestion_tracker;
230pub mod shared_object_version_manager;
231pub mod suggested_gas_price_calculator;
232pub mod test_authority_builder;
233pub mod transaction_deferral;
234
235pub(crate) mod authority_store;
236pub mod backpressure;
237
238pub struct AuthorityMetrics {
240 tx_orders: IntCounter,
241 total_certs: IntCounter,
242 total_cert_attempts: IntCounter,
243 total_effects: IntCounter,
244 pub shared_obj_tx: IntCounter,
245 sponsored_tx: IntCounter,
246 tx_already_processed: IntCounter,
247 num_input_objs: Histogram,
248 num_shared_objects: Histogram,
249 batch_size: Histogram,
250
251 authority_state_handle_transaction_latency: Histogram,
252
253 execute_certificate_latency_single_writer: Histogram,
254 execute_certificate_latency_shared_object: Histogram,
255
256 internal_execution_latency: Histogram,
257 execution_load_input_objects_latency: Histogram,
258 prepare_certificate_latency: Histogram,
259 commit_certificate_latency: Histogram,
260 db_checkpoint_latency: Histogram,
261
262 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
263 pub(crate) transaction_manager_num_missing_objects: IntGauge,
264 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
265 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
266 pub(crate) transaction_manager_num_ready: IntGauge,
267 pub(crate) transaction_manager_object_cache_size: IntGauge,
268 pub(crate) transaction_manager_object_cache_hits: IntCounter,
269 pub(crate) transaction_manager_object_cache_misses: IntCounter,
270 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
271 pub(crate) transaction_manager_package_cache_size: IntGauge,
272 pub(crate) transaction_manager_package_cache_hits: IntCounter,
273 pub(crate) transaction_manager_package_cache_misses: IntCounter,
274 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
275 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
276
277 pub(crate) execution_driver_executed_transactions: IntCounter,
278 pub(crate) execution_driver_dispatch_queue: IntGauge,
279 pub(crate) execution_queueing_delay_s: Histogram,
280 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
281 pub(crate) execution_gas_latency_ratio: Histogram,
282
283 pub(crate) skipped_consensus_txns: IntCounter,
284 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
285
286 pub(crate) authority_overload_status: IntGauge,
287 pub(crate) authority_load_shedding_percentage: IntGauge,
288
289 pub(crate) transaction_overload_sources: IntCounterVec,
290
291 post_processing_total_events_emitted: IntCounter,
293 post_processing_total_tx_indexed: IntCounter,
294 post_processing_total_tx_had_event_processed: IntCounter,
295 post_processing_total_failures: IntCounter,
296
297 pub consensus_handler_processed: IntCounterVec,
299 pub consensus_handler_transaction_sizes: HistogramVec,
300 pub consensus_handler_num_low_scoring_authorities: IntGauge,
301 pub consensus_handler_scores: IntGaugeVec,
302 pub consensus_handler_deferred_transactions: IntCounter,
303 pub consensus_handler_congested_transactions: IntCounter,
304 pub consensus_handler_cancelled_transactions: IntCounter,
305 pub consensus_handler_max_object_costs: IntGaugeVec,
306 pub consensus_committed_subdags: IntCounterVec,
307 pub consensus_committed_messages: IntGaugeVec,
308 pub consensus_committed_user_transactions: IntGaugeVec,
309 pub consensus_handler_leader_round: IntGauge,
310 pub consensus_calculated_throughput: IntGauge,
311 pub consensus_calculated_throughput_profile: IntGauge,
312
313 pub limits_metrics: Arc<LimitsMetrics>,
314
315 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
317
318 pub multisig_sig_count: IntCounter,
320
321 pub execution_queueing_latency: LatencyObserver,
324
325 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
332
333 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
336}
337
338const POSITIVE_INT_BUCKETS: &[f64] = &[
340 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
341 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
342 7000000., 10000000.,
343];
344
345const LATENCY_SEC_BUCKETS: &[f64] = &[
346 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
347 10., 20., 30., 60., 90.,
348];
349
350const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
352 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
353 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
354];
355
356const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
357 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
358 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
359];
360
361pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
365 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
366 let execute_certificate_latency = register_histogram_vec_with_registry!(
367 "authority_state_execute_certificate_latency",
368 "Latency of executing certificates, including waiting for inputs",
369 &["tx_type"],
370 LATENCY_SEC_BUCKETS.to_vec(),
371 registry,
372 )
373 .unwrap();
374
375 let execute_certificate_latency_single_writer =
376 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
377 let execute_certificate_latency_shared_object =
378 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
379
380 Self {
381 tx_orders: register_int_counter_with_registry!(
382 "total_transaction_orders",
383 "Total number of transaction orders",
384 registry,
385 )
386 .unwrap(),
387 total_certs: register_int_counter_with_registry!(
388 "total_transaction_certificates",
389 "Total number of transaction certificates handled",
390 registry,
391 )
392 .unwrap(),
393 total_cert_attempts: register_int_counter_with_registry!(
394 "total_handle_certificate_attempts",
395 "Number of calls to handle_certificate",
396 registry,
397 )
398 .unwrap(),
399 total_effects: register_int_counter_with_registry!(
401 "total_transaction_effects",
402 "Total number of transaction effects produced",
403 registry,
404 )
405 .unwrap(),
406
407 shared_obj_tx: register_int_counter_with_registry!(
408 "num_shared_obj_tx",
409 "Number of transactions involving shared objects",
410 registry,
411 )
412 .unwrap(),
413
414 sponsored_tx: register_int_counter_with_registry!(
415 "num_sponsored_tx",
416 "Number of sponsored transactions",
417 registry,
418 )
419 .unwrap(),
420
421 tx_already_processed: register_int_counter_with_registry!(
422 "num_tx_already_processed",
423 "Number of transaction orders already processed previously",
424 registry,
425 )
426 .unwrap(),
427 num_input_objs: register_histogram_with_registry!(
428 "num_input_objects",
429 "Distribution of number of input TX objects per TX",
430 POSITIVE_INT_BUCKETS.to_vec(),
431 registry,
432 )
433 .unwrap(),
434 num_shared_objects: register_histogram_with_registry!(
435 "num_shared_objects",
436 "Number of shared input objects per TX",
437 POSITIVE_INT_BUCKETS.to_vec(),
438 registry,
439 )
440 .unwrap(),
441 batch_size: register_histogram_with_registry!(
442 "batch_size",
443 "Distribution of size of transaction batch",
444 POSITIVE_INT_BUCKETS.to_vec(),
445 registry,
446 )
447 .unwrap(),
448 authority_state_handle_transaction_latency: register_histogram_with_registry!(
449 "authority_state_handle_transaction_latency",
450 "Latency of handling transactions",
451 LATENCY_SEC_BUCKETS.to_vec(),
452 registry,
453 )
454 .unwrap(),
455 execute_certificate_latency_single_writer,
456 execute_certificate_latency_shared_object,
457 internal_execution_latency: register_histogram_with_registry!(
458 "authority_state_internal_execution_latency",
459 "Latency of actual certificate executions",
460 LATENCY_SEC_BUCKETS.to_vec(),
461 registry,
462 )
463 .unwrap(),
464 execution_load_input_objects_latency: register_histogram_with_registry!(
465 "authority_state_execution_load_input_objects_latency",
466 "Latency of loading input objects for execution",
467 LOW_LATENCY_SEC_BUCKETS.to_vec(),
468 registry,
469 )
470 .unwrap(),
471 prepare_certificate_latency: register_histogram_with_registry!(
472 "authority_state_prepare_certificate_latency",
473 "Latency of executing certificates, before committing the results",
474 LATENCY_SEC_BUCKETS.to_vec(),
475 registry,
476 )
477 .unwrap(),
478 commit_certificate_latency: register_histogram_with_registry!(
479 "authority_state_commit_certificate_latency",
480 "Latency of committing certificate execution results",
481 LATENCY_SEC_BUCKETS.to_vec(),
482 registry,
483 )
484 .unwrap(),
485 db_checkpoint_latency: register_histogram_with_registry!(
486 "db_checkpoint_latency",
487 "Latency of checkpointing dbs",
488 LATENCY_SEC_BUCKETS.to_vec(),
489 registry,
490 ).unwrap(),
491 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
492 "transaction_manager_num_enqueued_certificates",
493 "Current number of certificates enqueued to TransactionManager",
494 &["result"],
495 registry,
496 )
497 .unwrap(),
498 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
499 "transaction_manager_num_missing_objects",
500 "Current number of missing objects in TransactionManager",
501 registry,
502 )
503 .unwrap(),
504 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
505 "transaction_manager_num_pending_certificates",
506 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
507 registry,
508 )
509 .unwrap(),
510 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
511 "transaction_manager_num_executing_certificates",
512 "Number of executing certificates, including queued and actually running certificates",
513 registry,
514 )
515 .unwrap(),
516 transaction_manager_num_ready: register_int_gauge_with_registry!(
517 "transaction_manager_num_ready",
518 "Number of ready transactions in TransactionManager",
519 registry,
520 )
521 .unwrap(),
522 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
523 "transaction_manager_object_cache_size",
524 "Current size of object-availability cache in TransactionManager",
525 registry,
526 )
527 .unwrap(),
528 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
529 "transaction_manager_object_cache_hits",
530 "Number of object-availability cache hits in TransactionManager",
531 registry,
532 )
533 .unwrap(),
534 authority_overload_status: register_int_gauge_with_registry!(
535 "authority_overload_status",
536 "Whether authority is current experiencing overload and enters load shedding mode.",
537 registry)
538 .unwrap(),
539 authority_load_shedding_percentage: register_int_gauge_with_registry!(
540 "authority_load_shedding_percentage",
541 "The percentage of transactions is shed when the authority is in load shedding mode.",
542 registry)
543 .unwrap(),
544 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
545 "transaction_manager_object_cache_misses",
546 "Number of object-availability cache misses in TransactionManager",
547 registry,
548 )
549 .unwrap(),
550 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
551 "transaction_manager_object_cache_evictions",
552 "Number of object-availability cache evictions in TransactionManager",
553 registry,
554 )
555 .unwrap(),
556 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
557 "transaction_manager_package_cache_size",
558 "Current size of package-availability cache in TransactionManager",
559 registry,
560 )
561 .unwrap(),
562 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
563 "transaction_manager_package_cache_hits",
564 "Number of package-availability cache hits in TransactionManager",
565 registry,
566 )
567 .unwrap(),
568 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
569 "transaction_manager_package_cache_misses",
570 "Number of package-availability cache misses in TransactionManager",
571 registry,
572 )
573 .unwrap(),
574 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
575 "transaction_manager_package_cache_evictions",
576 "Number of package-availability cache evictions in TransactionManager",
577 registry,
578 )
579 .unwrap(),
580 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
581 "transaction_manager_transaction_queue_age_s",
582 "Time spent in waiting for transaction in the queue",
583 LATENCY_SEC_BUCKETS.to_vec(),
584 registry,
585 )
586 .unwrap(),
587 transaction_overload_sources: register_int_counter_vec_with_registry!(
588 "transaction_overload_sources",
589 "Number of times each source indicates transaction overload.",
590 &["source"],
591 registry)
592 .unwrap(),
593 execution_driver_executed_transactions: register_int_counter_with_registry!(
594 "execution_driver_executed_transactions",
595 "Cumulative number of transaction executed by execution driver",
596 registry,
597 )
598 .unwrap(),
599 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
600 "execution_driver_dispatch_queue",
601 "Number of transaction pending in execution driver dispatch queue",
602 registry,
603 )
604 .unwrap(),
605 execution_queueing_delay_s: register_histogram_with_registry!(
606 "execution_queueing_delay_s",
607 "Queueing delay between a transaction is ready for execution until it starts executing.",
608 LATENCY_SEC_BUCKETS.to_vec(),
609 registry
610 )
611 .unwrap(),
612 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
613 "prepare_cert_gas_latency_ratio",
614 "The ratio of computation gas divided by VM execution latency.",
615 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
616 registry
617 )
618 .unwrap(),
619 execution_gas_latency_ratio: register_histogram_with_registry!(
620 "execution_gas_latency_ratio",
621 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
622 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
623 registry
624 )
625 .unwrap(),
626 skipped_consensus_txns: register_int_counter_with_registry!(
627 "skipped_consensus_txns",
628 "Total number of consensus transactions skipped",
629 registry,
630 )
631 .unwrap(),
632 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
633 "skipped_consensus_txns_cache_hit",
634 "Total number of consensus transactions skipped because of local cache hit",
635 registry,
636 )
637 .unwrap(),
638 post_processing_total_events_emitted: register_int_counter_with_registry!(
639 "post_processing_total_events_emitted",
640 "Total number of events emitted in post processing",
641 registry,
642 )
643 .unwrap(),
644 post_processing_total_tx_indexed: register_int_counter_with_registry!(
645 "post_processing_total_tx_indexed",
646 "Total number of txes indexed in post processing",
647 registry,
648 )
649 .unwrap(),
650 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
651 "post_processing_total_tx_had_event_processed",
652 "Total number of txes finished event processing in post processing",
653 registry,
654 )
655 .unwrap(),
656 post_processing_total_failures: register_int_counter_with_registry!(
657 "post_processing_total_failures",
658 "Total number of failure in post processing",
659 registry,
660 )
661 .unwrap(),
662 consensus_handler_processed: register_int_counter_vec_with_registry!(
663 "consensus_handler_processed",
664 "Number of transactions processed by consensus handler",
665 &["class"],
666 registry
667 ).unwrap(),
668 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
669 "consensus_handler_transaction_sizes",
670 "Sizes of each type of transactions processed by consensus handler",
671 &["class"],
672 POSITIVE_INT_BUCKETS.to_vec(),
673 registry
674 ).unwrap(),
675 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
676 "consensus_handler_num_low_scoring_authorities",
677 "Number of low scoring authorities based on reputation scores from consensus",
678 registry
679 ).unwrap(),
680 consensus_handler_scores: register_int_gauge_vec_with_registry!(
681 "consensus_handler_scores",
682 "scores from consensus for each authority",
683 &["authority"],
684 registry,
685 ).unwrap(),
686 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
687 "consensus_handler_deferred_transactions",
688 "Number of transactions deferred by consensus handler",
689 registry,
690 ).unwrap(),
691 consensus_handler_congested_transactions: register_int_counter_with_registry!(
692 "consensus_handler_congested_transactions",
693 "Number of transactions deferred by consensus handler due to congestion",
694 registry,
695 ).unwrap(),
696 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
697 "consensus_handler_cancelled_transactions",
698 "Number of transactions cancelled by consensus handler",
699 registry,
700 ).unwrap(),
701 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
702 "consensus_handler_max_congestion_control_object_costs",
703 "Max object costs for congestion control in the current consensus commit",
704 &["commit_type"],
705 registry,
706 ).unwrap(),
707 consensus_committed_subdags: register_int_counter_vec_with_registry!(
708 "consensus_committed_subdags",
709 "Number of committed subdags, sliced by leader",
710 &["authority"],
711 registry,
712 ).unwrap(),
713 consensus_committed_messages: register_int_gauge_vec_with_registry!(
714 "consensus_committed_messages",
715 "Total number of committed consensus messages, sliced by author",
716 &["authority"],
717 registry,
718 ).unwrap(),
719 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
720 "consensus_committed_user_transactions",
721 "Number of committed user transactions, sliced by submitter",
722 &["authority"],
723 registry,
724 ).unwrap(),
725 consensus_handler_leader_round: register_int_gauge_with_registry!(
726 "consensus_handler_leader_round",
727 "The leader round of the current consensus output being processed in the consensus handler",
728 registry,
729 ).unwrap(),
730 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
731 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
732 multisig_sig_count: register_int_counter_with_registry!(
733 "multisig_sig_count",
734 "Count of multisig signatures",
735 registry,
736 )
737 .unwrap(),
738 consensus_calculated_throughput: register_int_gauge_with_registry!(
739 "consensus_calculated_throughput",
740 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
741 registry,
742 ).unwrap(),
743 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
744 "consensus_calculated_throughput_profile",
745 "The current active calculated throughput profile",
746 registry
747 ).unwrap(),
748 execution_queueing_latency: LatencyObserver::new(),
749 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
750 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
751 }
752 }
753
754 pub fn reset_on_reconfigure(&self) {
758 self.consensus_committed_messages.reset();
759 self.consensus_handler_scores.reset();
760 self.consensus_committed_user_transactions.reset();
761 }
762}
763
764pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
771
772pub struct AuthorityState {
773 pub name: AuthorityName,
776 pub secret: StableSyncAuthoritySigner,
778
779 input_loader: TransactionInputLoader,
781 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
782
783 epoch_store: ArcSwap<AuthorityPerEpochStore>,
784
785 execution_lock: RwLock<EpochId>,
791
792 pub indexes: Option<Arc<IndexStore>>,
793 pub grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
794
795 pub subscription_handler: Arc<SubscriptionHandler>,
796 pub checkpoint_store: Arc<CheckpointStore>,
797
798 committee_store: Arc<CommitteeStore>,
799
800 transaction_manager: Arc<TransactionManager>,
802
803 #[cfg_attr(not(test), expect(unused))]
805 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
806
807 pub metrics: Arc<AuthorityMetrics>,
808 _pruner: AuthorityStorePruner,
809 authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
810 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
811
812 db_checkpoint_config: DBCheckpointConfig,
814
815 pub config: NodeConfig,
816
817 pub overload_info: AuthorityOverloadInfo,
819
820 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
821
822 chain_identifier: ChainIdentifier,
825
826 pub(crate) congestion_tracker: Arc<CongestionTracker>,
827}
828
829impl AuthorityState {
838 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
839 epoch_store.committee().authority_exists(&self.name)
840 }
841
842 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
843 epoch_store
844 .active_validators()
845 .iter()
846 .any(|a| AuthorityName::from(a) == self.name)
847 }
848
849 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
850 !self.is_committee_validator(epoch_store)
851 }
852
853 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
854 &self.committee_store
855 }
856
857 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
858 self.committee_store.clone()
859 }
860
861 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
862 &self.config.authority_overload_config
863 }
864
865 pub fn get_epoch_state_commitments(
866 &self,
867 epoch: EpochId,
868 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
869 self.checkpoint_store.get_epoch_state_commitments(epoch)
870 }
871
872 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
876 async fn handle_transaction_impl(
877 &self,
878 transaction: VerifiedTransaction,
879 epoch_store: &Arc<AuthorityPerEpochStore>,
880 ) -> IotaResult<VerifiedSignedTransaction> {
881 let _execution_lock = self.execution_lock_for_signing()?;
883
884 let protocol_config = epoch_store.protocol_config();
885 let reference_gas_price = epoch_store.reference_gas_price();
886
887 let epoch = epoch_store.epoch();
888
889 let tx_data = transaction.data().transaction_data();
890
891 iota_transaction_checks::deny::check_transaction_for_signing(
895 tx_data,
896 transaction.tx_signatures(),
897 &transaction.input_objects()?,
898 &tx_data.receiving_objects(),
899 &self.config.transaction_deny_config,
900 self.get_backing_package_store().as_ref(),
901 )?;
902
903 let (tx_input_objects, tx_receiving_objects, per_authenticator_inputs) =
908 self.read_objects_for_signing(&transaction, epoch)?;
909
910 let move_authenticators = transaction.move_authenticators();
911
912 let (gas_status, tx_checked_input_objects, per_authenticator_checked_inputs) = self
918 .check_transaction_inputs_for_signing(
919 protocol_config,
920 reference_gas_price,
921 tx_data,
922 tx_input_objects,
923 &tx_receiving_objects,
924 &move_authenticators,
925 per_authenticator_inputs,
926 )?;
927
928 let per_authenticator_checked_input_objects = per_authenticator_checked_inputs
931 .iter()
932 .map(|i| &i.0)
933 .collect();
934
935 check_coin_deny_list_v1_during_signing(
939 tx_data.sender(),
940 &tx_checked_input_objects,
941 &tx_receiving_objects,
942 &per_authenticator_checked_input_objects,
943 &self.get_object_store(),
944 )?;
945
946 let pre_consensus_move_authenticators =
951 pre_consensus_move_authenticators(&transaction, protocol_config);
952 let (move_authenticators, per_authenticator_checked_inputs): (Vec<_>, Vec<_>) =
953 move_authenticators
954 .into_iter()
955 .zip(per_authenticator_checked_inputs)
956 .filter(|(a, _)| pre_consensus_move_authenticators.contains(a))
957 .unzip();
958 let per_authenticator_checked_input_objects: Vec<_> = per_authenticator_checked_inputs
959 .iter()
960 .map(|i| &i.0)
961 .collect();
962
963 if !move_authenticators.is_empty() {
966 let aggregated_authenticator_input_objects =
967 iota_transaction_checks::aggregate_authenticator_input_objects(
968 &per_authenticator_checked_input_objects,
969 )?;
970
971 debug_assert_eq!(
972 move_authenticators.len(),
973 per_authenticator_checked_inputs.len(),
974 "Move authenticators amount must match the number of checked authenticator inputs"
975 );
976
977 let move_authenticators = move_authenticators
978 .into_iter()
979 .zip(per_authenticator_checked_inputs)
980 .map(
981 |(
982 move_authenticator,
983 (authenticator_checked_input_objects, authenticator_function_ref),
984 )| {
985 (
986 move_authenticator.to_owned(),
987 authenticator_function_ref,
988 authenticator_checked_input_objects,
989 )
990 },
991 )
992 .collect();
993
994 let tx_data_bytes =
999 bcs::to_bytes(&tx_data).expect("TransactionData serialization cannot fail");
1000
1001 let (sender_auth_digest, sponsor_auth_digest) =
1002 transaction.data().compute_auth_digests()?;
1003
1004 let (kind, signer, gas_data) = tx_data.execution_parts();
1005
1006 let validation_result = epoch_store.executor().authenticate_transaction(
1008 self.get_backing_store().as_ref(),
1009 protocol_config,
1010 self.metrics.limits_metrics.clone(),
1011 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1012 epoch_store
1013 .epoch_start_config()
1014 .epoch_data()
1015 .epoch_start_timestamp(),
1016 gas_data,
1017 gas_status,
1018 move_authenticators,
1019 aggregated_authenticator_input_objects,
1020 kind,
1021 signer,
1022 transaction.digest().to_owned(),
1023 tx_data_bytes,
1024 sender_auth_digest,
1025 sponsor_auth_digest,
1026 &mut None,
1027 );
1028
1029 if let Err(validation_error) = validation_result {
1030 return Err(IotaError::MoveAuthenticatorExecutionFailure {
1031 error: validation_error.to_string(),
1032 });
1033 }
1034 }
1035
1036 let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
1037
1038 let signed_transaction =
1039 VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
1040
1041 self.get_cache_writer().try_acquire_transaction_locks(
1046 epoch_store,
1047 &owned_objects,
1048 signed_transaction.clone(),
1049 )?;
1050
1051 Ok(signed_transaction)
1052 }
1053
1054 #[instrument(name = "handle_transaction", level = "trace", skip_all, fields(tx_digest = ?transaction.digest(), sender = transaction.data().transaction_data().gas_owner().to_string()))]
1056 pub async fn handle_transaction(
1057 &self,
1058 epoch_store: &Arc<AuthorityPerEpochStore>,
1059 transaction: VerifiedTransaction,
1060 ) -> IotaResult<HandleTransactionResponse> {
1061 let tx_digest = *transaction.digest();
1062 debug!("handle_transaction");
1063
1064 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1066 return Ok(HandleTransactionResponse { status });
1067 }
1068
1069 let _metrics_guard = self
1070 .metrics
1071 .authority_state_handle_transaction_latency
1072 .start_timer();
1073 self.metrics.tx_orders.inc();
1074
1075 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1076 match signed {
1077 Ok(s) => {
1078 if self.is_committee_validator(epoch_store) {
1079 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1080 let tx = s.clone();
1081 let validator_tx_finalizer = validator_tx_finalizer.clone();
1082 let cache_reader = self.get_transaction_cache_reader().clone();
1083 let epoch_store = epoch_store.clone();
1084 spawn_monitored_task!(epoch_store.within_alive_epoch(
1085 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1086 ));
1087 }
1088 }
1089 Ok(HandleTransactionResponse {
1090 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1091 })
1092 }
1093 Err(err) => Ok(HandleTransactionResponse {
1097 status: self
1098 .get_transaction_status(&tx_digest, epoch_store)?
1099 .ok_or(err)?
1100 .1,
1101 }),
1102 }
1103 }
1104
1105 pub fn check_system_overload_at_signing(&self) -> bool {
1106 self.config
1107 .authority_overload_config
1108 .check_system_overload_at_signing
1109 }
1110
1111 pub fn check_system_overload_at_execution(&self) -> bool {
1112 self.config
1113 .authority_overload_config
1114 .check_system_overload_at_execution
1115 }
1116
1117 pub(crate) fn check_system_overload(
1118 &self,
1119 consensus_adapter: &Arc<ConsensusAdapter>,
1120 tx_data: &SenderSignedData,
1121 do_authority_overload_check: bool,
1122 ) -> IotaResult {
1123 if do_authority_overload_check {
1124 self.check_authority_overload(tx_data).tap_err(|_| {
1125 self.update_overload_metrics("execution_queue");
1126 })?;
1127 }
1128 self.transaction_manager
1129 .check_execution_overload(self.overload_config(), tx_data)
1130 .tap_err(|_| {
1131 self.update_overload_metrics("execution_pending");
1132 })?;
1133 consensus_adapter.check_consensus_overload().tap_err(|_| {
1134 self.update_overload_metrics("consensus");
1135 })?;
1136
1137 let pending_tx_count = self
1138 .get_cache_commit()
1139 .approximate_pending_transaction_count();
1140 if pending_tx_count
1141 > self
1142 .config
1143 .execution_cache_config
1144 .writeback_cache
1145 .backpressure_threshold_for_rpc()
1146 {
1147 return Err(IotaError::ValidatorOverloadedRetryAfter {
1148 retry_after_secs: 10,
1149 });
1150 }
1151
1152 Ok(())
1153 }
1154
1155 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1156 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1157 return Ok(());
1158 }
1159
1160 let load_shedding_percentage = self
1161 .overload_info
1162 .load_shedding_percentage
1163 .load(Ordering::Relaxed);
1164 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1165 }
1166
1167 fn update_overload_metrics(&self, source: &str) {
1168 self.metrics
1169 .transaction_overload_sources
1170 .with_label_values(&[source])
1171 .inc();
1172 }
1173
1174 #[instrument(level = "trace", skip_all)]
1176 pub async fn execute_certificate(
1177 &self,
1178 certificate: &VerifiedCertificate,
1179 epoch_store: &Arc<AuthorityPerEpochStore>,
1180 ) -> IotaResult<TransactionEffects> {
1181 let _metrics_guard = if certificate.contains_shared_object() {
1182 self.metrics
1183 .execute_certificate_latency_shared_object
1184 .start_timer()
1185 } else {
1186 self.metrics
1187 .execute_certificate_latency_single_writer
1188 .start_timer()
1189 };
1190 trace!("execute_certificate");
1191
1192 self.metrics.total_cert_attempts.inc();
1193
1194 if !certificate.contains_shared_object() {
1195 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1200 }
1201
1202 epoch_store
1205 .within_alive_epoch(self.notify_read_effects(certificate))
1206 .await
1207 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1208 .and_then(|r| r)
1209 }
1210
1211 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1226 pub fn try_execute_immediately(
1227 &self,
1228 certificate: &VerifiedExecutableTransaction,
1229 expected_effects_digest: Option<TransactionEffectsDigest>,
1230 epoch_store: &Arc<AuthorityPerEpochStore>,
1231 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1232 let _scope = monitored_scope("Execution::try_execute_immediately");
1233 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1234
1235 let tx_digest = certificate.digest();
1236
1237 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1239
1240 if let Some(effects) = self
1243 .get_transaction_cache_reader()
1244 .try_get_executed_effects(tx_digest)?
1245 {
1246 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1247 assert_eq!(
1248 effects.digest(),
1249 expected_effects_digest_inner,
1250 "Unexpected effects digest for transaction {tx_digest}"
1251 );
1252 }
1253 tx_guard.release();
1254 return Ok((effects, None));
1255 }
1256
1257 let (tx_input_objects, per_authenticator_inputs) =
1258 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1259
1260 let expected_effects_digest =
1266 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1267
1268 self.process_certificate(
1269 tx_guard,
1270 certificate,
1271 tx_input_objects,
1272 per_authenticator_inputs,
1273 expected_effects_digest,
1274 epoch_store,
1275 )
1276 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1277 .tap_ok(
1278 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1279 )
1280 }
1281
1282 pub fn read_objects_for_execution(
1283 &self,
1284 tx_lock: &CertLockGuard,
1285 certificate: &VerifiedExecutableTransaction,
1286 epoch_store: &Arc<AuthorityPerEpochStore>,
1287 ) -> IotaResult<(InputObjects, Vec<(InputObjects, ObjectReadResult)>)> {
1288 let _scope = monitored_scope("Execution::load_input_objects");
1289 let _metrics_guard = self
1290 .metrics
1291 .execution_load_input_objects_latency
1292 .start_timer();
1293
1294 let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1295
1296 let input_objects = self.input_loader.read_objects_for_execution(
1297 epoch_store,
1298 &certificate.key(),
1299 tx_lock,
1300 &input_objects,
1301 epoch_store.epoch(),
1302 )?;
1303
1304 certificate.split_input_objects_into_groups_for_reading(input_objects)
1305 }
1306
1307 pub fn try_execute_for_test(
1311 &self,
1312 certificate: &VerifiedCertificate,
1313 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1314 let epoch_store = self.epoch_store_for_testing();
1315 let (effects, execution_error_opt) = self.try_execute_immediately(
1316 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1317 None,
1318 &epoch_store,
1319 )?;
1320 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1321 Ok((signed_effects, execution_error_opt))
1322 }
1323
1324 pub fn execute_for_test(
1326 &self,
1327 certificate: &VerifiedCertificate,
1328 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1329 self.try_execute_for_test(certificate)
1330 .expect("try_execute_for_test should not fail")
1331 }
1332
1333 pub async fn notify_read_effects(
1334 &self,
1335 certificate: &VerifiedCertificate,
1336 ) -> IotaResult<TransactionEffects> {
1337 self.get_transaction_cache_reader()
1338 .try_notify_read_executed_effects(&[*certificate.digest()])
1339 .await
1340 .map(|mut r| r.pop().expect("must return correct number of effects"))
1341 }
1342
1343 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1344 self.get_object_cache_reader()
1345 .try_check_owned_objects_are_live(owned_object_refs)
1346 }
1347
1348 pub(crate) fn debug_dump_transaction_state(
1353 &self,
1354 tx_digest: &TransactionDigest,
1355 effects: &TransactionEffects,
1356 expected_effects_digest: TransactionEffectsDigest,
1357 inner_temporary_store: &InnerTemporaryStore,
1358 certificate: &VerifiedExecutableTransaction,
1359 debug_dump_config: &StateDebugDumpConfig,
1360 ) -> IotaResult<PathBuf> {
1361 let dump_dir = debug_dump_config
1364 .dump_file_directory
1365 .as_ref()
1366 .cloned()
1367 .unwrap_or(std::env::temp_dir());
1368 let epoch_store = self.load_epoch_store_one_call_per_task();
1369
1370 NodeStateDump::new(
1371 tx_digest,
1372 effects,
1373 expected_effects_digest,
1374 self.get_object_store().as_ref(),
1375 &epoch_store,
1376 inner_temporary_store,
1377 certificate,
1378 )?
1379 .write_to_file(&dump_dir)
1380 .map_err(|e| IotaError::FileIO(e.to_string()))
1381 }
1382
1383 #[instrument(name = "process_certificate", level = "trace", skip_all, fields(tx_digest = ?certificate.digest(), sender = ?certificate.data().transaction_data().gas_owner().to_string()))]
1384 pub(crate) fn process_certificate(
1385 &self,
1386 tx_guard: CertTxGuard,
1387 certificate: &VerifiedExecutableTransaction,
1388 tx_input_objects: InputObjects,
1389 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1390 expected_effects_digest: Option<TransactionEffectsDigest>,
1391 epoch_store: &Arc<AuthorityPerEpochStore>,
1392 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1393 let process_certificate_start_time = tokio::time::Instant::now();
1394 let digest = *certificate.digest();
1395
1396 let _scope = monitored_scope("Execution::process_certificate");
1397
1398 fail_point_if!("correlated-crash-process-certificate", || {
1399 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1400 iota_simulator::task::kill_current_node(None);
1401 }
1402 });
1403
1404 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1405 let execution_guard = match execution_guard {
1411 Ok(execution_guard) => execution_guard,
1412 Err(err) => {
1413 tx_guard.release();
1414 return Err(err);
1415 }
1416 };
1417 if *execution_guard != epoch_store.epoch() {
1421 tx_guard.release();
1422 info!("The epoch of the execution_guard doesn't match the epoch store");
1423 return Err(IotaError::WrongEpoch {
1424 expected_epoch: epoch_store.epoch(),
1425 actual_epoch: *execution_guard,
1426 });
1427 }
1428
1429 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1435 &execution_guard,
1436 certificate,
1437 tx_input_objects,
1438 per_authenticator_inputs,
1439 epoch_store,
1440 ) {
1441 Err(e) => {
1442 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1443 tx_guard.release();
1444 return Err(e);
1445 }
1446 Ok(res) => res,
1447 };
1448
1449 if let Some(expected_effects_digest) = expected_effects_digest {
1450 if effects.digest() != expected_effects_digest {
1451 match self.debug_dump_transaction_state(
1453 &digest,
1454 &effects,
1455 expected_effects_digest,
1456 &inner_temporary_store,
1457 certificate,
1458 &self.config.state_debug_dump_config,
1459 ) {
1460 Ok(out_path) => {
1461 info!(
1462 "Dumped node state for transaction {} to {}",
1463 digest,
1464 out_path.as_path().display().to_string()
1465 );
1466 }
1467 Err(e) => {
1468 error!("Error dumping state for transaction {}: {e}", digest);
1469 }
1470 }
1471 error!(
1472 tx_digest = ?digest,
1473 ?expected_effects_digest,
1474 actual_effects = ?effects,
1475 "fork detected!"
1476 );
1477 panic!(
1478 "Transaction {} is expected to have effects digest {}, but got {}!",
1479 digest,
1480 expected_effects_digest,
1481 effects.digest(),
1482 );
1483 }
1484 }
1485
1486 fail_point!("crash");
1487
1488 self.commit_certificate(
1489 certificate,
1490 inner_temporary_store,
1491 &effects,
1492 tx_guard,
1493 execution_guard,
1494 epoch_store,
1495 )?;
1496
1497 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1498 if elapsed > 0.0 {
1499 self.metrics
1500 .execution_gas_latency_ratio
1501 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1502 };
1503 Ok((effects, execution_error_opt))
1504 }
1505
1506 #[instrument(level = "trace", skip_all)]
1507 fn commit_certificate(
1508 &self,
1509 certificate: &VerifiedExecutableTransaction,
1510 inner_temporary_store: InnerTemporaryStore,
1511 effects: &TransactionEffects,
1512 tx_guard: CertTxGuard,
1513 _execution_guard: ExecutionLockReadGuard<'_>,
1514 epoch_store: &Arc<AuthorityPerEpochStore>,
1515 ) -> IotaResult {
1516 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1517 monitored_scope("Execution::commit_certificate");
1518 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1519
1520 let tx_key = certificate.key();
1521 let tx_digest = certificate.digest();
1522 let input_object_count = inner_temporary_store.input_objects.len();
1523 let shared_object_count = effects.input_shared_objects().len();
1524
1525 let output_keys = inner_temporary_store.get_output_keys(effects);
1526
1527 let _ = self
1529 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1530 .tap_err(|e| {
1531 self.metrics.post_processing_total_failures.inc();
1532 error!(?tx_digest, "tx post processing failed: {e}");
1533 });
1534
1535 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1539
1540 fail_point!("crash");
1542
1543 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1544 certificate.clone().into_unsigned(),
1545 effects.clone(),
1546 inner_temporary_store,
1547 );
1548 self.get_cache_writer()
1549 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1550
1551 if certificate.transaction_data().is_end_of_epoch_tx() {
1552 self.get_object_cache_reader()
1555 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1556 }
1557
1558 tx_guard.commit_tx();
1560
1561 self.transaction_manager
1565 .notify_commit(tx_digest, output_keys, epoch_store);
1566
1567 self.update_metrics(certificate, input_object_count, shared_object_count);
1568
1569 Ok(())
1570 }
1571
1572 fn update_metrics(
1573 &self,
1574 certificate: &VerifiedExecutableTransaction,
1575 input_object_count: usize,
1576 shared_object_count: usize,
1577 ) {
1578 if certificate.has_upgraded_multisig() {
1580 self.metrics.multisig_sig_count.inc();
1581 }
1582
1583 self.metrics.total_effects.inc();
1584 self.metrics.total_certs.inc();
1585
1586 if shared_object_count > 0 {
1587 self.metrics.shared_obj_tx.inc();
1588 }
1589
1590 if certificate.is_sponsored_tx() {
1591 self.metrics.sponsored_tx.inc();
1592 }
1593
1594 self.metrics
1595 .num_input_objs
1596 .observe(input_object_count as f64);
1597 self.metrics
1598 .num_shared_objects
1599 .observe(shared_object_count as f64);
1600 self.metrics.batch_size.observe(
1601 certificate
1602 .data()
1603 .intent_message()
1604 .value
1605 .kind()
1606 .num_commands() as f64,
1607 );
1608 }
1609
1610 #[instrument(level = "trace", skip_all)]
1622 fn prepare_certificate(
1623 &self,
1624 _execution_guard: &ExecutionLockReadGuard<'_>,
1625 certificate: &VerifiedExecutableTransaction,
1626 tx_input_objects: InputObjects,
1627 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
1628 epoch_store: &Arc<AuthorityPerEpochStore>,
1629 ) -> IotaResult<(
1630 InnerTemporaryStore,
1631 TransactionEffects,
1632 Option<ExecutionError>,
1633 )> {
1634 let _scope = monitored_scope("Execution::prepare_certificate");
1635 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1636 let prepare_certificate_start_time = tokio::time::Instant::now();
1637
1638 let protocol_config = epoch_store.protocol_config();
1639
1640 let reference_gas_price = epoch_store.reference_gas_price();
1641
1642 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1643 let epoch_start_timestamp = epoch_store
1644 .epoch_start_config()
1645 .epoch_data()
1646 .epoch_start_timestamp();
1647
1648 let backing_store = self.get_backing_store().as_ref();
1649
1650 let tx_digest = *certificate.digest();
1651
1652 let tx_data = certificate.data().transaction_data();
1655 tx_data.validity_check(protocol_config)?;
1656
1657 let (kind, signer, gas_data) = tx_data.execution_parts();
1658
1659 let move_authenticators = certificate.move_authenticators();
1660
1661 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1662 let (inner_temp_store, _, mut effects, execution_error_opt) = if move_authenticators
1663 .is_empty()
1664 {
1665 let (tx_gas_status, tx_checked_input_objects) =
1670 iota_transaction_checks::check_certificate_input(
1671 certificate,
1672 tx_input_objects,
1673 protocol_config,
1674 reference_gas_price,
1675 )?;
1676
1677 let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1678 self.check_owned_locks(&owned_object_refs)?;
1679 epoch_store.executor().execute_transaction_to_effects(
1680 backing_store,
1681 protocol_config,
1682 self.metrics.limits_metrics.clone(),
1683 self.config
1686 .expensive_safety_check_config
1687 .enable_deep_per_tx_iota_conservation_check(),
1688 self.config.certificate_deny_config.certificate_deny_set(),
1689 &epoch_id,
1690 epoch_start_timestamp,
1691 tx_checked_input_objects,
1692 gas_data,
1693 tx_gas_status,
1694 kind,
1695 signer,
1696 tx_digest,
1697 &mut None,
1698 )
1699 } else {
1700 debug_assert_eq!(
1706 move_authenticators.len(),
1707 per_authenticator_inputs.len(),
1708 "Move authenticators amount must match the number of authenticator inputs"
1709 );
1710
1711 let per_authenticator_inputs = move_authenticators
1712 .iter()
1713 .zip(per_authenticator_inputs)
1714 .map(
1715 |(move_authenticator, (authenticator_input_objects, account_object))| {
1716 let (
1719 auth_account_object_id,
1720 auth_account_object_seq_number,
1721 auth_account_object_digest,
1722 ) = move_authenticator.object_to_authenticate_components()?;
1723
1724 let signer = move_authenticator.address()?;
1725
1726 let authenticator_function_ref_for_execution = self.check_move_account(
1727 auth_account_object_id,
1728 auth_account_object_seq_number,
1729 auth_account_object_digest,
1730 account_object,
1731 &signer,
1732 )?;
1733
1734 Ok((
1735 authenticator_input_objects,
1736 authenticator_function_ref_for_execution,
1737 ))
1738 },
1739 )
1740 .collect::<IotaResult<Vec<_>>>()?;
1741
1742 let per_authenticator_input_objects = per_authenticator_inputs
1743 .iter()
1744 .map(|(authenticator_input_objects, _)| authenticator_input_objects.clone())
1745 .collect::<Vec<_>>();
1746
1747 let tx_data_bytes =
1749 bcs::to_bytes(tx_data).expect("TransactionData serialization cannot fail");
1750
1751 let (sender_auth_digest, sponsor_auth_digest) =
1752 certificate.data().compute_auth_digests()?;
1753
1754 let authenticator_gas_budget = protocol_config.max_auth_gas();
1759 let (
1760 gas_status,
1761 per_authenticator_checked_input_objects,
1762 authenticator_and_tx_checked_input_objects,
1763 ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1764 certificate,
1765 tx_input_objects,
1766 per_authenticator_input_objects,
1767 authenticator_gas_budget,
1768 protocol_config,
1769 reference_gas_price,
1770 )?;
1771
1772 debug_assert_eq!(
1773 move_authenticators.len(),
1774 per_authenticator_checked_input_objects.len(),
1775 "Move authenticators amount must match the number of checked authenticator inputs"
1776 );
1777
1778 let move_authenticators = move_authenticators
1779 .into_iter()
1780 .zip(per_authenticator_inputs)
1781 .zip(per_authenticator_checked_input_objects)
1782 .map(
1783 |(
1784 (move_authenticator, (_, authenticator_function_ref_for_execution)),
1785 authenticator_checked_input_objects,
1786 )| {
1787 (
1788 move_authenticator.to_owned(),
1789 authenticator_function_ref_for_execution,
1790 authenticator_checked_input_objects,
1791 )
1792 },
1793 )
1794 .collect::<Vec<_>>();
1795
1796 let owned_object_refs = authenticator_and_tx_checked_input_objects
1797 .inner()
1798 .filter_owned_objects();
1799 self.check_owned_locks(&owned_object_refs)?;
1800
1801 epoch_store
1802 .executor()
1803 .authenticate_then_execute_transaction_to_effects(
1804 backing_store,
1805 protocol_config,
1806 self.metrics.limits_metrics.clone(),
1807 self.config
1808 .expensive_safety_check_config
1809 .enable_deep_per_tx_iota_conservation_check(),
1810 self.config.certificate_deny_config.certificate_deny_set(),
1811 &epoch_id,
1812 epoch_start_timestamp,
1813 gas_data,
1814 gas_status,
1815 move_authenticators,
1816 authenticator_and_tx_checked_input_objects,
1817 kind,
1818 signer,
1819 tx_digest,
1820 tx_data_bytes,
1821 sender_auth_digest,
1822 sponsor_auth_digest,
1823 &mut None,
1824 )
1825 };
1826
1827 fail_point_if!("cp_execution_nondeterminism", || {
1828 #[cfg(msim)]
1829 self.create_fail_state(certificate, epoch_store, &mut effects);
1830 });
1831
1832 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1833 if elapsed > 0.0 {
1834 self.metrics
1835 .prepare_cert_gas_latency_ratio
1836 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1837 }
1838
1839 Ok((inner_temp_store, effects, execution_error_opt.err()))
1840 }
1841
1842 pub fn prepare_certificate_for_benchmark(
1843 &self,
1844 certificate: &VerifiedExecutableTransaction,
1845 input_objects: InputObjects,
1846 epoch_store: &Arc<AuthorityPerEpochStore>,
1847 ) -> IotaResult<(
1848 InnerTemporaryStore,
1849 TransactionEffects,
1850 Option<ExecutionError>,
1851 )> {
1852 let lock = RwLock::new(epoch_store.epoch());
1853 let execution_guard = lock.try_read().unwrap();
1854
1855 self.prepare_certificate(
1856 &execution_guard,
1857 certificate,
1858 input_objects,
1859 vec![],
1860 epoch_store,
1861 )
1862 }
1863
1864 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1867 #[allow(clippy::type_complexity)]
1868 pub fn dry_exec_transaction(
1869 &self,
1870 transaction: TransactionData,
1871 transaction_digest: TransactionDigest,
1872 ) -> IotaResult<(
1873 DryRunTransactionBlockResponse,
1874 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1875 TransactionEffects,
1876 Option<ObjectID>,
1877 )> {
1878 let epoch_store = self.load_epoch_store_one_call_per_task();
1879 if !self.is_fullnode(&epoch_store) {
1880 return Err(IotaError::UnsupportedFeature {
1881 error: "dry-exec is only supported on fullnodes".to_string(),
1882 });
1883 }
1884
1885 if transaction.kind().is_system() {
1886 return Err(IotaError::UnsupportedFeature {
1887 error: "dry-exec does not support system transactions".to_string(),
1888 });
1889 }
1890
1891 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1892 }
1893
1894 #[allow(clippy::type_complexity)]
1895 pub fn dry_exec_transaction_for_benchmark(
1896 &self,
1897 transaction: TransactionData,
1898 transaction_digest: TransactionDigest,
1899 ) -> IotaResult<(
1900 DryRunTransactionBlockResponse,
1901 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1902 TransactionEffects,
1903 Option<ObjectID>,
1904 )> {
1905 let epoch_store = self.load_epoch_store_one_call_per_task();
1906 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1907 }
1908
1909 #[instrument(level = "trace", skip_all)]
1910 #[allow(clippy::type_complexity)]
1911 fn dry_exec_transaction_impl(
1912 &self,
1913 epoch_store: &AuthorityPerEpochStore,
1914 transaction: TransactionData,
1915 transaction_digest: TransactionDigest,
1916 ) -> IotaResult<(
1917 DryRunTransactionBlockResponse,
1918 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1919 TransactionEffects,
1920 Option<ObjectID>,
1921 )> {
1922 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1924
1925 let input_object_kinds = transaction.input_objects()?;
1926 let receiving_object_refs = transaction.receiving_objects();
1927
1928 iota_transaction_checks::deny::check_transaction_for_signing(
1929 &transaction,
1930 &[],
1931 &input_object_kinds,
1932 &receiving_object_refs,
1933 &self.config.transaction_deny_config,
1934 self.get_backing_package_store().as_ref(),
1935 )?;
1936
1937 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1938 None,
1940 &input_object_kinds,
1941 &receiving_object_refs,
1942 epoch_store.epoch(),
1943 )?;
1944
1945 let mut transaction = transaction;
1947 let reference_gas_price = epoch_store.reference_gas_price();
1948 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1949 let sender = transaction.gas_owner();
1950 let gas_object_id = ObjectID::random();
1951 let gas_object = Object::new_move(
1952 MoveObject::new_gas_coin(
1953 OBJECT_START_VERSION,
1954 gas_object_id,
1955 SIMULATION_GAS_COIN_VALUE,
1956 ),
1957 Owner::Address(sender),
1958 TransactionDigest::GENESIS_MARKER,
1959 );
1960 let gas_object_ref = gas_object.compute_object_reference();
1961 transaction.gas_data_mut().objects = vec![gas_object_ref];
1963 (
1964 iota_transaction_checks::check_transaction_input_with_given_gas(
1965 epoch_store.protocol_config(),
1966 reference_gas_price,
1967 &transaction,
1968 input_objects,
1969 receiving_objects,
1970 gas_object,
1971 &self.metrics.bytecode_verifier_metrics,
1972 &self.config.verifier_signing_config,
1973 )?,
1974 Some(gas_object_id),
1975 )
1976 } else {
1977 let authenticator_gas_budget = 0;
1980
1981 (
1982 iota_transaction_checks::check_transaction_input(
1983 epoch_store.protocol_config(),
1984 reference_gas_price,
1985 &transaction,
1986 input_objects,
1987 &receiving_objects,
1988 &self.metrics.bytecode_verifier_metrics,
1989 &self.config.verifier_signing_config,
1990 authenticator_gas_budget,
1991 )?,
1992 None,
1993 )
1994 };
1995
1996 let protocol_config = epoch_store.protocol_config();
1997 let (kind, signer, gas_data) = transaction.execution_parts();
1998
1999 let silent = true;
2000 let executor = iota_execution::executor(protocol_config, silent, None)
2001 .expect("Creating an executor should not fail here");
2002
2003 let expensive_checks = false;
2004 let (inner_temp_store, _, effects, execution_error) = executor
2005 .execute_transaction_to_effects(
2006 self.get_backing_store().as_ref(),
2007 protocol_config,
2008 self.metrics.limits_metrics.clone(),
2009 expensive_checks,
2010 self.config.certificate_deny_config.certificate_deny_set(),
2011 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2012 epoch_store
2013 .epoch_start_config()
2014 .epoch_data()
2015 .epoch_start_timestamp(),
2016 checked_input_objects,
2017 gas_data,
2018 gas_status,
2019 kind,
2020 signer,
2021 transaction_digest,
2022 &mut None,
2023 );
2024 let tx_digest = *effects.transaction_digest();
2025
2026 let module_cache =
2027 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
2028
2029 let mut layout_resolver =
2030 epoch_store
2031 .executor()
2032 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2033 &inner_temp_store,
2034 self.get_backing_package_store(),
2035 )));
2036 let object_changes = Vec::new();
2038
2039 let balance_changes = Vec::new();
2041
2042 let written_with_kind = effects
2043 .created()
2044 .into_iter()
2045 .map(|(oref, _)| (oref, WriteKind::Create))
2046 .chain(
2047 effects
2048 .unwrapped()
2049 .into_iter()
2050 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
2051 )
2052 .chain(
2053 effects
2054 .mutated()
2055 .into_iter()
2056 .map(|(oref, _)| (oref, WriteKind::Mutate)),
2057 )
2058 .map(|(oref, kind)| {
2059 let obj = inner_temp_store.written.get(&oref.object_id).unwrap();
2060 (oref.object_id, (oref, obj.clone(), kind))
2062 })
2063 .collect();
2064
2065 let execution_error_source = execution_error
2066 .as_ref()
2067 .err()
2068 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2069
2070 Ok((
2071 DryRunTransactionBlockResponse {
2072 suggested_gas_price: self
2074 .congestion_tracker
2075 .get_prediction_suggested_gas_price(&transaction),
2076 input: IotaTransactionBlockData::try_from_with_module_cache(
2077 transaction,
2078 &module_cache,
2079 tx_digest,
2080 )
2081 .map_err(|e| IotaError::TransactionSerialization {
2082 error: format!(
2083 "Failed to convert transaction to IotaTransactionBlockData: {e}",
2084 ),
2085 })?, effects: effects.clone().try_into()?,
2087 events: IotaTransactionBlockEvents::try_from(
2088 inner_temp_store.events.clone(),
2089 tx_digest,
2090 None,
2091 layout_resolver.as_mut(),
2092 )?,
2093 object_changes,
2094 balance_changes,
2095 execution_error_source,
2096 },
2097 written_with_kind,
2098 effects,
2099 mock_gas,
2100 ))
2101 }
2102
2103 pub fn simulate_transaction(
2104 &self,
2105 mut transaction: TransactionData,
2106 checks: VmChecks,
2107 ) -> IotaResult<SimulateTransactionResult> {
2108 if transaction.kind().is_system() {
2109 return Err(IotaError::UnsupportedFeature {
2110 error: "simulate does not support system transactions".to_string(),
2111 });
2112 }
2113
2114 let epoch_store = self.load_epoch_store_one_call_per_task();
2115 if !self.is_fullnode(&epoch_store) {
2116 return Err(IotaError::UnsupportedFeature {
2117 error: "simulate is only supported on fullnodes".to_string(),
2118 });
2119 }
2120
2121 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2125
2126 let input_object_kinds = transaction.input_objects()?;
2127 let receiving_object_refs = transaction.receiving_objects();
2128
2129 iota_transaction_checks::deny::check_transaction_for_signing(
2132 &transaction,
2133 &[],
2134 &input_object_kinds,
2135 &receiving_object_refs,
2136 &self.config.transaction_deny_config,
2137 self.get_backing_package_store().as_ref(),
2138 )?;
2139
2140 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2142 None,
2144 &input_object_kinds,
2145 &receiving_object_refs,
2146 epoch_store.epoch(),
2147 )?;
2148
2149 let mock_gas_id = if transaction.gas().is_empty() {
2151 let mock_gas_object = Object::new_move(
2152 MoveObject::new_gas_coin(
2153 OBJECT_START_VERSION,
2154 ObjectID::MAX,
2155 SIMULATION_GAS_COIN_VALUE,
2156 ),
2157 Owner::Address(transaction.gas_data().owner),
2158 TransactionDigest::GENESIS_MARKER,
2159 );
2160 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2161 transaction.gas_data_mut().objects = vec![mock_gas_object_ref];
2162 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2163 Some(mock_gas_object.id())
2164 } else {
2165 None
2166 };
2167
2168 let protocol_config = epoch_store.protocol_config();
2169
2170 let authenticator_gas_budget = 0;
2173
2174 let (gas_status, checked_input_objects) = if checks.enabled() {
2177 iota_transaction_checks::check_transaction_input(
2178 protocol_config,
2179 epoch_store.reference_gas_price(),
2180 &transaction,
2181 input_objects,
2182 &receiving_objects,
2183 &self.metrics.bytecode_verifier_metrics,
2184 &self.config.verifier_signing_config,
2185 authenticator_gas_budget,
2186 )?
2187 } else {
2188 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2189 protocol_config,
2190 transaction.kind(),
2191 input_objects,
2192 receiving_objects,
2193 )?;
2194 let gas_status = IotaGasStatus::new(
2195 transaction.gas_budget(),
2196 transaction.gas_price(),
2197 epoch_store.reference_gas_price(),
2198 protocol_config,
2199 )?;
2200
2201 (gas_status, checked_input_objects)
2202 };
2203
2204 let executor = iota_execution::executor(
2206 protocol_config,
2207 true, None,
2209 )
2210 .expect("Creating an executor should not fail here");
2211
2212 let (kind, signer, gas_data) = transaction.execution_parts();
2214 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2215 self.get_backing_store().as_ref(),
2216 protocol_config,
2217 self.metrics.limits_metrics.clone(),
2218 false, self.config.certificate_deny_config.certificate_deny_set(),
2220 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2221 epoch_store
2222 .epoch_start_config()
2223 .epoch_data()
2224 .epoch_start_timestamp(),
2225 checked_input_objects,
2226 gas_data,
2227 gas_status,
2228 kind,
2229 signer,
2230 transaction.digest(),
2231 checks.disabled(),
2232 );
2233
2234 Ok(SimulateTransactionResult {
2237 input_objects: inner_temp_store.input_objects,
2238 output_objects: inner_temp_store.written,
2239 events: effects.events_digest().map(|_| inner_temp_store.events),
2240 effects,
2241 execution_result,
2242 suggested_gas_price: self
2243 .congestion_tracker
2244 .get_prediction_suggested_gas_price(&transaction),
2245 mock_gas_id,
2246 })
2247 }
2248
2249 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2254 pub async fn dev_inspect_transaction_block(
2255 &self,
2256 sender: IotaAddress,
2257 transaction_kind: TransactionKind,
2258 gas_price: Option<u64>,
2259 gas_budget: Option<u64>,
2260 gas_sponsor: Option<IotaAddress>,
2261 gas_objects: Option<Vec<ObjectRef>>,
2262 show_raw_txn_data_and_effects: Option<bool>,
2263 skip_checks: Option<bool>,
2264 ) -> IotaResult<DevInspectResults> {
2265 let epoch_store = self.load_epoch_store_one_call_per_task();
2266
2267 if !self.is_fullnode(&epoch_store) {
2268 return Err(IotaError::UnsupportedFeature {
2269 error: "dev-inspect is only supported on fullnodes".to_string(),
2270 });
2271 }
2272
2273 if transaction_kind.is_system() {
2274 return Err(IotaError::UnsupportedFeature {
2275 error: "system transactions are not supported".to_string(),
2276 });
2277 }
2278
2279 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2280 let skip_checks = skip_checks.unwrap_or(true);
2281 let reference_gas_price = epoch_store.reference_gas_price();
2282 let protocol_config = epoch_store.protocol_config();
2283 let max_tx_gas = protocol_config.max_tx_gas();
2284
2285 let price = gas_price.unwrap_or(reference_gas_price);
2286 let budget = gas_budget.unwrap_or(max_tx_gas);
2287 let owner = gas_sponsor.unwrap_or(sender);
2288 let payment = gas_objects.unwrap_or_default();
2291 let mut transaction = TransactionData::V1(TransactionDataV1 {
2292 kind: transaction_kind.clone(),
2293 sender,
2294 gas_payment: GasData {
2295 objects: payment,
2296 owner,
2297 price,
2298 budget,
2299 },
2300 expiration: TransactionExpiration::None,
2301 });
2302
2303 let raw_txn_data = if show_raw_txn_data_and_effects {
2304 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2305 error: "Failed to serialize transaction during dev inspect".to_string(),
2306 })?
2307 } else {
2308 vec![]
2309 };
2310
2311 transaction.validity_check_no_gas_check(protocol_config)?;
2312
2313 let input_object_kinds = transaction.input_objects()?;
2314 let receiving_object_refs = transaction.receiving_objects();
2315
2316 iota_transaction_checks::deny::check_transaction_for_signing(
2317 &transaction,
2318 &[],
2319 &input_object_kinds,
2320 &receiving_object_refs,
2321 &self.config.transaction_deny_config,
2322 self.get_backing_package_store().as_ref(),
2323 )?;
2324
2325 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2326 None,
2328 &input_object_kinds,
2329 &receiving_object_refs,
2330 epoch_store.epoch(),
2331 )?;
2332
2333 let (gas_status, checked_input_objects) = if skip_checks {
2334 if transaction.gas().is_empty() {
2339 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2341 SIMULATION_GAS_COIN_VALUE,
2342 transaction.gas_owner(),
2343 );
2344 let gas_object_ref = dummy_gas_object.compute_object_reference();
2345 transaction.gas_data_mut().objects = vec![gas_object_ref];
2346 input_objects.push(ObjectReadResult::new(
2347 InputObjectKind::ImmOrOwnedMoveObject(gas_object_ref),
2348 dummy_gas_object.into(),
2349 ));
2350 }
2351 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2352 protocol_config,
2353 &transaction_kind,
2354 input_objects,
2355 receiving_objects,
2356 )?;
2357 let gas_status = IotaGasStatus::new(
2358 max_tx_gas,
2359 transaction.gas_price(),
2360 reference_gas_price,
2361 protocol_config,
2362 )?;
2363
2364 (gas_status, checked_input_objects)
2365 } else {
2366 if transaction.gas().is_empty() {
2370 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2372 SIMULATION_GAS_COIN_VALUE,
2373 transaction.gas_owner(),
2374 );
2375 let gas_object_ref = dummy_gas_object.compute_object_reference();
2376 transaction.gas_data_mut().objects = vec![gas_object_ref];
2377 iota_transaction_checks::check_transaction_input_with_given_gas(
2378 epoch_store.protocol_config(),
2379 reference_gas_price,
2380 &transaction,
2381 input_objects,
2382 receiving_objects,
2383 dummy_gas_object,
2384 &self.metrics.bytecode_verifier_metrics,
2385 &self.config.verifier_signing_config,
2386 )?
2387 } else {
2388 let authenticator_gas_budget = 0;
2391
2392 iota_transaction_checks::check_transaction_input(
2393 epoch_store.protocol_config(),
2394 reference_gas_price,
2395 &transaction,
2396 input_objects,
2397 &receiving_objects,
2398 &self.metrics.bytecode_verifier_metrics,
2399 &self.config.verifier_signing_config,
2400 authenticator_gas_budget,
2401 )?
2402 }
2403 };
2404
2405 let executor = iota_execution::executor(protocol_config, true, None)
2406 .expect("Creating an executor should not fail here");
2407 let gas_data = transaction.gas_data().clone();
2408 let intent_msg = IntentMessage::new(
2409 Intent {
2410 version: IntentVersion::V0,
2411 scope: IntentScope::TransactionData,
2412 app_id: IntentAppId::Iota,
2413 },
2414 transaction,
2415 );
2416 let transaction_digest = TransactionDigest::new(intent_msg.value.digest().into_inner());
2417 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2418 self.get_backing_store().as_ref(),
2419 protocol_config,
2420 self.metrics.limits_metrics.clone(),
2421 false,
2423 self.config.certificate_deny_config.certificate_deny_set(),
2424 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2425 epoch_store
2426 .epoch_start_config()
2427 .epoch_data()
2428 .epoch_start_timestamp(),
2429 checked_input_objects,
2430 gas_data,
2431 gas_status,
2432 transaction_kind,
2433 sender,
2434 transaction_digest,
2435 skip_checks,
2436 );
2437
2438 let raw_effects = if show_raw_txn_data_and_effects {
2439 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2440 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2441 })?
2442 } else {
2443 vec![]
2444 };
2445
2446 let mut layout_resolver =
2447 epoch_store
2448 .executor()
2449 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2450 &inner_temp_store,
2451 self.get_backing_package_store(),
2452 )));
2453
2454 DevInspectResults::new(
2455 effects,
2456 inner_temp_store.events.clone(),
2457 execution_result,
2458 raw_txn_data,
2459 raw_effects,
2460 layout_resolver.as_mut(),
2461 )
2462 }
2463
2464 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2466 let epoch_store = self.epoch_store_for_testing();
2467 Ok(epoch_store.reference_gas_price())
2468 }
2469
2470 #[instrument(level = "trace", skip_all)]
2471 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2472 self.get_transaction_cache_reader()
2473 .try_is_tx_already_executed(digest)
2474 }
2475
2476 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2478 self.try_is_tx_already_executed(digest)
2479 .expect("storage access failed")
2480 }
2481
2482 #[instrument(level = "debug", skip_all, err)]
2484 fn index_tx(
2485 &self,
2486 indexes: &IndexStore,
2487 digest: &TransactionDigest,
2488 cert: &VerifiedExecutableTransaction,
2490 effects: &TransactionEffects,
2491 events: &TransactionEvents,
2492 timestamp_ms: u64,
2493 tx_coins: Option<TxCoins>,
2494 written: &WrittenObjects,
2495 inner_temporary_store: &InnerTemporaryStore,
2496 ) -> IotaResult<u64> {
2497 let changes = self
2498 .process_object_index(effects, written, inner_temporary_store)
2499 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2500
2501 indexes.index_tx(
2502 cert.data().intent_message().value.sender(),
2503 cert.data()
2504 .intent_message()
2505 .value
2506 .input_objects()?
2507 .iter()
2508 .map(|o| o.object_id()),
2509 effects
2510 .all_changed_objects()
2511 .into_iter()
2512 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2513 cert.data()
2514 .intent_message()
2515 .value
2516 .move_calls()
2517 .into_iter()
2518 .map(|(package, module, function)| {
2519 (*package, module.to_owned(), function.to_owned())
2520 }),
2521 events,
2522 changes,
2523 digest,
2524 timestamp_ms,
2525 tx_coins,
2526 )
2527 }
2528
2529 #[cfg(msim)]
2530 fn create_fail_state(
2531 &self,
2532 certificate: &VerifiedExecutableTransaction,
2533 epoch_store: &Arc<AuthorityPerEpochStore>,
2534 effects: &mut TransactionEffects,
2535 ) {
2536 use std::cell::RefCell;
2537
2538 use iota_types::effects::TransactionEffectsAPIForTesting;
2539 thread_local! {
2540 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2541 }
2542 if !certificate.data().intent_message().value.is_system_tx() {
2543 let committee = epoch_store.committee();
2544 let cur_stake = (**committee).weight(&self.name);
2545 if cur_stake > 0 {
2546 FAIL_STATE.with_borrow_mut(|fail_state| {
2547 if fail_state.0 < committee.validity_threshold() {
2549 fail_state.0 += cur_stake;
2550 fail_state.1.insert(self.name);
2551 }
2552
2553 if fail_state.1.contains(&self.name) {
2554 info!("cp_exec failing tx");
2555 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2556 }
2557 });
2558 }
2559 }
2560 }
2561
2562 fn process_object_index(
2563 &self,
2564 effects: &TransactionEffects,
2565 written: &WrittenObjects,
2566 inner_temporary_store: &InnerTemporaryStore,
2567 ) -> IotaResult<ObjectIndexChanges> {
2568 let epoch_store = self.load_epoch_store_one_call_per_task();
2569 let mut layout_resolver =
2570 epoch_store
2571 .executor()
2572 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2573 inner_temporary_store,
2574 self.get_backing_package_store(),
2575 )));
2576
2577 let modified_at_version = effects
2578 .modified_at_versions()
2579 .into_iter()
2580 .collect::<HashMap<_, _>>();
2581
2582 let tx_digest = effects.transaction_digest();
2583 let mut deleted_owners = vec![];
2584 let mut deleted_dynamic_fields = vec![];
2585 for object_ref in effects.deleted().into_iter().chain(effects.wrapped()) {
2586 let old_version = modified_at_version.get(&object_ref.object_id).unwrap();
2587 match self.get_owner_at_version(&object_ref.object_id, *old_version).unwrap_or_else(
2590 |e| panic!("tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {} at version {old_version:?}. Err: {e:?}",object_ref.object_id)
2591 ) {
2592 Owner::Address(addr) => deleted_owners.push((addr, object_ref.object_id)),
2593 Owner::Object(object_id) => {
2594 deleted_dynamic_fields.push((object_id, object_ref.object_id))
2595 }
2596 _ => {}
2597 }
2598 }
2599
2600 let mut new_owners = vec![];
2601 let mut new_dynamic_fields = vec![];
2602
2603 for (oref, owner, kind) in effects.all_changed_objects() {
2604 let id = &oref.object_id;
2605 if let WriteKind::Mutate = kind {
2608 let Some(old_version) = modified_at_version.get(id) else {
2609 panic!(
2610 "tx_digest={tx_digest}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2611 );
2612 };
2613 let Some(old_object) = self
2616 .get_object_store()
2617 .try_get_object_by_key(id, *old_version)?
2618 else {
2619 panic!(
2620 "tx_digest={tx_digest}, error processing object owner index, cannot find owner for object {id} at version {old_version:?}"
2621 );
2622 };
2623 if old_object.owner != owner {
2624 match old_object.owner {
2625 Owner::Address(addr) => {
2626 deleted_owners.push((addr, *id));
2627 }
2628 Owner::Object(object_id) => deleted_dynamic_fields.push((object_id, *id)),
2629 _ => {}
2630 }
2631 }
2632 }
2633
2634 match owner {
2635 Owner::Address(addr) => {
2636 let new_object = written.get(id).unwrap_or_else(
2639 || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2640 );
2641 assert_eq!(
2642 new_object.version(),
2643 oref.version,
2644 "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2645 tx_digest,
2646 id,
2647 new_object.version(),
2648 oref.version
2649 );
2650
2651 let type_ = new_object
2652 .type_()
2653 .map(|type_| ObjectType::Struct(type_.clone()))
2654 .unwrap_or(ObjectType::Package);
2655
2656 new_owners.push((
2657 (addr, *id),
2658 ObjectInfo {
2659 object_id: *id,
2660 version: oref.version,
2661 digest: oref.digest,
2662 type_,
2663 owner,
2664 previous_transaction: *effects.transaction_digest(),
2665 },
2666 ));
2667 }
2668 Owner::Object(owner) => {
2669 let new_object = written.get(id).unwrap_or_else(
2670 || panic!("tx_digest={tx_digest}, error processing object owner index, written does not contain object {id}")
2671 );
2672 assert_eq!(
2673 new_object.version(),
2674 oref.version,
2675 "tx_digest={} error processing object owner index, object {} from written has mismatched version. Actual: {}, expected: {}",
2676 tx_digest,
2677 id,
2678 new_object.version(),
2679 oref.version
2680 );
2681
2682 let Some(df_info) = self
2683 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2684 .unwrap_or_else(|e| {
2685 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2686 None
2687 }
2688 )
2689 else {
2690 continue;
2692 };
2693 new_dynamic_fields.push(((owner, *id), df_info))
2694 }
2695 _ => {}
2696 }
2697 }
2698
2699 Ok(ObjectIndexChanges {
2700 deleted_owners,
2701 deleted_dynamic_fields,
2702 new_owners,
2703 new_dynamic_fields,
2704 })
2705 }
2706
2707 fn try_create_dynamic_field_info(
2708 &self,
2709 o: &Object,
2710 written: &WrittenObjects,
2711 resolver: &mut dyn LayoutResolver,
2712 ) -> IotaResult<Option<DynamicFieldInfo>> {
2713 let Some(move_object) = o.data.as_struct_opt().cloned() else {
2715 return Ok(None);
2716 };
2717
2718 if !move_object.struct_tag().is_dynamic_field() {
2720 return Ok(None);
2721 }
2722
2723 let layout = resolver
2724 .get_annotated_layout(move_object.struct_tag())?
2725 .into_layout();
2726
2727 let field =
2728 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2729 IotaError::ObjectDeserialization {
2730 error: e.to_string(),
2731 }
2732 })?;
2733
2734 let type_ = field.kind;
2735 let name_type: TypeTag = type_tag_core_to_sdk(&field.name_layout.into());
2736 let bcs_name = field.name_bytes.to_owned();
2737
2738 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2739 .map_err(|e| {
2740 warn!("{e}");
2741 IotaError::ObjectDeserialization {
2742 error: e.to_string(),
2743 }
2744 })?;
2745
2746 let name = DynamicFieldName {
2747 type_: name_type,
2748 value: IotaMoveValue::from(name_value).to_json_value(),
2749 };
2750
2751 let value_metadata = field.value_metadata().map_err(|e| {
2752 warn!("{e}");
2753 IotaError::ObjectDeserialization {
2754 error: e.to_string(),
2755 }
2756 })?;
2757
2758 Ok(Some(match value_metadata {
2759 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2760 name,
2761 bcs_name,
2762 type_,
2763 object_type: object_type.to_canonical_string(true),
2764 object_id: o.id(),
2765 version: o.version(),
2766 digest: o.digest(),
2767 },
2768
2769 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2770 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2775 (
2776 object.version(),
2777 object.digest(),
2778 object.data.object_type().unwrap().clone(),
2779 )
2780 } else {
2781 let object = self
2783 .get_object_store()
2784 .try_get_object_by_key(&object_id, o.version())?
2785 .ok_or_else(|| UserInputError::ObjectNotFound {
2786 object_id,
2787 version: Some(o.version()),
2788 })?;
2789 let version = object.version();
2790 let digest = object.digest();
2791 let object_type = object.data.object_type().unwrap().clone();
2792 (version, digest, object_type)
2793 };
2794
2795 DynamicFieldInfo {
2796 name,
2797 bcs_name,
2798 type_,
2799 object_type: object_type.to_string(),
2800 object_id,
2801 version,
2802 digest,
2803 }
2804 }
2805 }))
2806 }
2807
2808 #[instrument(level = "trace", skip_all, err)]
2809 fn post_process_one_tx(
2810 &self,
2811 certificate: &VerifiedExecutableTransaction,
2812 effects: &TransactionEffects,
2813 inner_temporary_store: &InnerTemporaryStore,
2814 epoch_store: &Arc<AuthorityPerEpochStore>,
2815 ) -> IotaResult {
2816 if self.indexes.is_none() {
2817 return Ok(());
2818 }
2819
2820 let _scope = monitored_scope("Execution::post_process_one_tx");
2821
2822 let tx_digest = certificate.digest();
2823 let timestamp_ms = Self::unixtime_now_ms();
2824 let events = &inner_temporary_store.events;
2825 let written = &inner_temporary_store.written;
2826 let tx_coins = self.fullnode_only_get_tx_coins_for_indexing(
2827 effects,
2828 inner_temporary_store,
2829 epoch_store,
2830 );
2831
2832 if let Some(indexes) = &self.indexes {
2834 let _ = self
2835 .index_tx(
2836 indexes.as_ref(),
2837 tx_digest,
2838 certificate,
2839 effects,
2840 events,
2841 timestamp_ms,
2842 tx_coins,
2843 written,
2844 inner_temporary_store,
2845 )
2846 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2847 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2848 .expect("Indexing tx should not fail");
2849
2850 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2851 let events = self.make_transaction_block_events(
2852 events.clone(),
2853 *tx_digest,
2854 timestamp_ms,
2855 epoch_store,
2856 inner_temporary_store,
2857 )?;
2858 self.subscription_handler
2860 .process_tx(certificate.data().transaction_data(), &effects, &events)
2861 .tap_ok(|_| {
2862 self.metrics
2863 .post_processing_total_tx_had_event_processed
2864 .inc()
2865 })
2866 .tap_err(|e| {
2867 warn!(
2868 ?tx_digest,
2869 "Post processing - Couldn't process events for tx: {}", e
2870 )
2871 })?;
2872
2873 self.metrics
2874 .post_processing_total_events_emitted
2875 .inc_by(events.data.len() as u64);
2876 };
2877 Ok(())
2878 }
2879
2880 fn make_transaction_block_events(
2881 &self,
2882 transaction_events: TransactionEvents,
2883 digest: TransactionDigest,
2884 timestamp_ms: u64,
2885 epoch_store: &Arc<AuthorityPerEpochStore>,
2886 inner_temporary_store: &InnerTemporaryStore,
2887 ) -> IotaResult<IotaTransactionBlockEvents> {
2888 let mut layout_resolver =
2889 epoch_store
2890 .executor()
2891 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2892 inner_temporary_store,
2893 self.get_backing_package_store(),
2894 )));
2895 IotaTransactionBlockEvents::try_from(
2896 transaction_events,
2897 digest,
2898 Some(timestamp_ms),
2899 layout_resolver.as_mut(),
2900 )
2901 }
2902
2903 pub fn unixtime_now_ms() -> u64 {
2904 let now = SystemTime::now()
2905 .duration_since(UNIX_EPOCH)
2906 .expect("Time went backwards")
2907 .as_millis();
2908 u64::try_from(now).expect("Travelling in time machine")
2909 }
2910
2911 #[instrument(level = "trace", skip_all)]
2912 pub async fn handle_transaction_info_request(
2913 &self,
2914 request: TransactionInfoRequest,
2915 ) -> IotaResult<TransactionInfoResponse> {
2916 let epoch_store = self.load_epoch_store_one_call_per_task();
2917 let (transaction, status) = self
2918 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2919 .ok_or(IotaError::TransactionNotFound {
2920 digest: request.transaction_digest,
2921 })?;
2922 Ok(TransactionInfoResponse {
2923 transaction,
2924 status,
2925 })
2926 }
2927
2928 #[instrument(level = "trace", skip_all)]
2929 pub async fn handle_object_info_request(
2930 &self,
2931 request: ObjectInfoRequest,
2932 ) -> IotaResult<ObjectInfoResponse> {
2933 let epoch_store = self.load_epoch_store_one_call_per_task();
2934
2935 let requested_object_seq = match request.request_kind {
2936 ObjectInfoRequestKind::LatestObjectInfo => {
2937 self.try_get_object_or_tombstone(request.object_id)
2938 .await?
2939 .ok_or_else(|| {
2940 IotaError::from(UserInputError::ObjectNotFound {
2941 object_id: request.object_id,
2942 version: None,
2943 })
2944 })?
2945 .version
2946 }
2947 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2948 };
2949
2950 let object = self
2951 .get_object_store()
2952 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2953 .ok_or_else(|| {
2954 IotaError::from(UserInputError::ObjectNotFound {
2955 object_id: request.object_id,
2956 version: Some(requested_object_seq),
2957 })
2958 })?;
2959
2960 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2961 (request.generate_layout, object.data.as_struct_opt())
2962 {
2963 Some(into_struct_layout(
2964 epoch_store
2965 .executor()
2966 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2967 .get_annotated_layout(move_obj.struct_tag())?,
2968 )?)
2969 } else {
2970 None
2971 };
2972
2973 let lock = if !object.is_address_owned() {
2974 None
2976 } else {
2977 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2978 .await?
2979 .map(|s| s.into_inner())
2980 };
2981
2982 Ok(ObjectInfoResponse {
2983 object,
2984 layout,
2985 lock_for_debugging: lock,
2986 })
2987 }
2988
2989 #[instrument(level = "trace", skip_all)]
2990 pub fn handle_checkpoint_request(
2991 &self,
2992 request: &CheckpointRequest,
2993 ) -> IotaResult<CheckpointResponse> {
2994 let summary = if request.certified {
2995 let summary = match request.sequence_number {
2996 Some(seq) => self
2997 .checkpoint_store
2998 .get_checkpoint_by_sequence_number(seq)?,
2999 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
3000 }
3001 .map(|v| v.into_inner());
3002 summary.map(CheckpointSummaryResponse::Certified)
3003 } else {
3004 let summary = match request.sequence_number {
3005 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
3006 None => self
3007 .checkpoint_store
3008 .get_latest_locally_computed_checkpoint()?,
3009 };
3010 summary.map(CheckpointSummaryResponse::Pending)
3011 };
3012 let contents = match &summary {
3013 Some(s) => self
3014 .checkpoint_store
3015 .get_checkpoint_contents(&s.content_digest())?,
3016 None => None,
3017 };
3018 Ok(CheckpointResponse {
3019 checkpoint: summary,
3020 contents,
3021 })
3022 }
3023
3024 fn check_protocol_version(
3025 supported_protocol_versions: SupportedProtocolVersions,
3026 current_version: ProtocolVersion,
3027 ) {
3028 info!("current protocol version is now {:?}", current_version);
3029 info!("supported versions are: {:?}", supported_protocol_versions);
3030 if !supported_protocol_versions.is_version_supported(current_version) {
3031 let msg = format!(
3032 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
3033 );
3034
3035 error!("{}", msg);
3036 eprintln!("{msg}");
3037
3038 #[cfg(not(msim))]
3039 std::process::exit(1);
3040
3041 #[cfg(msim)]
3042 iota_simulator::task::shutdown_current_node();
3043 }
3044 }
3045
3046 #[expect(clippy::disallowed_methods)] pub async fn new(
3048 name: AuthorityName,
3049 secret: StableSyncAuthoritySigner,
3050 supported_protocol_versions: SupportedProtocolVersions,
3051 store: Arc<AuthorityStore>,
3052 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
3053 epoch_store: Arc<AuthorityPerEpochStore>,
3054 committee_store: Arc<CommitteeStore>,
3055 indexes: Option<Arc<IndexStore>>,
3056 grpc_indexes_store: Option<Arc<GrpcIndexesStore>>,
3057 checkpoint_store: Arc<CheckpointStore>,
3058 prometheus_registry: &Registry,
3059 genesis_objects: &[Object],
3060 db_checkpoint_config: &DBCheckpointConfig,
3061 config: NodeConfig,
3062 archive_readers: ArchiveReaderBalancer,
3063 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
3064 chain_identifier: ChainIdentifier,
3065 pruner_db: Option<Arc<AuthorityPrunerTables>>,
3066 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
3067 ) -> Arc<Self> {
3068 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
3069
3070 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
3071 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
3072 let transaction_manager = Arc::new(TransactionManager::new(
3073 execution_cache_trait_pointers.object_cache_reader.clone(),
3074 execution_cache_trait_pointers
3075 .transaction_cache_reader
3076 .clone(),
3077 &epoch_store,
3078 tx_ready_certificates,
3079 metrics.clone(),
3080 ));
3081 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3082
3083 let authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3084 epoch_store.get_parent_path(),
3085 config
3086 .authority_store_pruning_config
3087 .num_latest_epoch_dbs_to_retain,
3088 )
3089 .await;
3090 let _pruner = AuthorityStorePruner::new(
3091 store.perpetual_tables.clone(),
3092 checkpoint_store.clone(),
3093 grpc_indexes_store.clone(),
3094 indexes.clone(),
3095 config.authority_store_pruning_config.clone(),
3096 epoch_store.committee().authority_exists(&name),
3097 epoch_store.epoch_start_state().epoch_duration_ms(),
3098 prometheus_registry,
3099 archive_readers,
3100 pruner_db,
3101 checkpoint_progress_tracker.clone(),
3102 );
3103 let input_loader =
3104 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3105 let epoch = epoch_store.epoch();
3106 let rgp = epoch_store.reference_gas_price();
3107 let state = Arc::new(AuthorityState {
3108 name,
3109 secret,
3110 execution_lock: RwLock::new(epoch),
3111 epoch_store: ArcSwap::new(epoch_store.clone()),
3112 input_loader,
3113 execution_cache_trait_pointers,
3114 indexes,
3115 grpc_indexes_store,
3116 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3117 checkpoint_store,
3118 committee_store,
3119 transaction_manager,
3120 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3121 metrics,
3122 _pruner,
3123 authority_per_epoch_pruner,
3124 checkpoint_progress_tracker,
3125 db_checkpoint_config: db_checkpoint_config.clone(),
3126 config,
3127 overload_info: AuthorityOverloadInfo::default(),
3128 validator_tx_finalizer,
3129 chain_identifier,
3130 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3131 });
3132
3133 let authority_state = Arc::downgrade(&state);
3135 spawn_monitored_task!(execution_process(
3136 authority_state,
3137 rx_ready_certificates,
3138 rx_execution_shutdown,
3139 ));
3140 spawn_monitored_task!(authority_store_migrations::migrate_events(store));
3141
3142 state
3144 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3145 .expect("Error indexing genesis objects.");
3146
3147 state
3148 }
3149
3150 pub fn epoch_db_pruner(&self) -> &AuthorityPerEpochStorePruner {
3151 &self.authority_per_epoch_pruner
3152 }
3153
3154 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3156 &self.execution_cache_trait_pointers.object_cache_reader
3157 }
3158
3159 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3160 &self.execution_cache_trait_pointers.transaction_cache_reader
3161 }
3162
3163 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3164 &self.execution_cache_trait_pointers.cache_writer
3165 }
3166
3167 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3168 &self.execution_cache_trait_pointers.backing_store
3169 }
3170
3171 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3172 &self.execution_cache_trait_pointers.backing_package_store
3173 }
3174
3175 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3176 &self.execution_cache_trait_pointers.object_store
3177 }
3178
3179 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3180 &self.execution_cache_trait_pointers.reconfig_api
3181 }
3182
3183 pub fn get_global_state_hash_store(&self) -> &Arc<dyn GlobalStateHashStore> {
3184 &self.execution_cache_trait_pointers.global_state_hash_store
3185 }
3186
3187 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3188 &self.execution_cache_trait_pointers.checkpoint_cache
3189 }
3190
3191 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3192 &self.execution_cache_trait_pointers.state_sync_store
3193 }
3194
3195 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3196 &self.execution_cache_trait_pointers.cache_commit
3197 }
3198
3199 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3200 self.execution_cache_trait_pointers
3201 .testing_api
3202 .database_for_testing()
3203 }
3204
3205 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3206 &self,
3207 config: NodeConfig,
3208 metrics: Arc<AuthorityStorePruningMetrics>,
3209 ) -> anyhow::Result<()> {
3210 let archive_readers =
3211 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3212 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3213 &self.database_for_testing().perpetual_tables,
3214 &self.checkpoint_store,
3215 self.grpc_indexes_store.as_deref(),
3216 None,
3217 config.authority_store_pruning_config,
3218 metrics,
3219 archive_readers,
3220 EPOCH_DURATION_MS_FOR_TESTING,
3221 self.checkpoint_progress_tracker.as_ref(),
3222 )
3223 .await
3224 }
3225
3226 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3227 &self.transaction_manager
3228 }
3229
3230 pub fn enqueue_transactions_for_execution(
3233 &self,
3234 txns: Vec<VerifiedExecutableTransaction>,
3235 epoch_store: &Arc<AuthorityPerEpochStore>,
3236 ) {
3237 self.transaction_manager.enqueue(txns, epoch_store)
3238 }
3239
3240 pub fn enqueue_certificates_for_execution(
3242 &self,
3243 certs: Vec<VerifiedCertificate>,
3244 epoch_store: &Arc<AuthorityPerEpochStore>,
3245 ) {
3246 self.transaction_manager
3247 .enqueue_certificates(certs, epoch_store)
3248 }
3249
3250 pub fn enqueue_with_expected_effects_digest(
3251 &self,
3252 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3253 epoch_store: &AuthorityPerEpochStore,
3254 ) {
3255 self.transaction_manager
3256 .enqueue_with_expected_effects_digest(certs, epoch_store)
3257 }
3258
3259 fn create_owner_index_if_empty(
3260 &self,
3261 genesis_objects: &[Object],
3262 epoch_store: &Arc<AuthorityPerEpochStore>,
3263 ) -> IotaResult {
3264 let Some(index_store) = &self.indexes else {
3265 return Ok(());
3266 };
3267 if !index_store.is_empty() {
3268 return Ok(());
3269 }
3270
3271 let mut new_owners = vec![];
3272 let mut new_dynamic_fields = vec![];
3273 let mut layout_resolver = epoch_store
3274 .executor()
3275 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3276 for o in genesis_objects.iter() {
3277 match o.owner {
3278 Owner::Address(addr) => new_owners.push((
3279 (addr, o.id()),
3280 ObjectInfo::new(&o.compute_object_reference(), o),
3281 )),
3282 Owner::Object(object_id) => {
3283 let id = o.id();
3284 let info = match self.try_create_dynamic_field_info(
3285 o,
3286 &BTreeMap::new(),
3287 layout_resolver.as_mut(),
3288 ) {
3289 Ok(Some(info)) => info,
3290 Ok(None) => continue,
3291 Err(IotaError::UserInput {
3292 error:
3293 UserInputError::ObjectNotFound {
3294 object_id: not_found_id,
3295 version,
3296 },
3297 }) => {
3298 warn!(
3299 ?not_found_id,
3300 ?version,
3301 object_owner=?object_id,
3302 field=?id,
3303 "Skipping dynamic field: referenced genesis object not found"
3304 );
3305 continue;
3306 }
3307 Err(e) => return Err(e),
3308 };
3309 new_dynamic_fields.push(((object_id, id), info));
3310 }
3311 _ => {}
3312 }
3313 }
3314
3315 index_store.insert_genesis_objects(ObjectIndexChanges {
3316 deleted_owners: vec![],
3317 deleted_dynamic_fields: vec![],
3318 new_owners,
3319 new_dynamic_fields,
3320 })
3321 }
3322
3323 pub fn execution_lock_for_executable_transaction(
3327 &self,
3328 transaction: &VerifiedExecutableTransaction,
3329 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3330 let lock = self
3331 .execution_lock
3332 .try_read()
3333 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3334 if *lock == transaction.auth_sig().epoch() {
3335 Ok(lock)
3336 } else {
3337 Err(IotaError::WrongEpoch {
3338 expected_epoch: *lock,
3339 actual_epoch: transaction.auth_sig().epoch(),
3340 })
3341 }
3342 }
3343
3344 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3350 self.execution_lock
3351 .try_read()
3352 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3353 }
3354
3355 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3356 self.execution_lock.write().await
3357 }
3358
3359 #[instrument(level = "error", skip_all)]
3360 pub async fn reconfigure(
3361 &self,
3362 cur_epoch_store: &AuthorityPerEpochStore,
3363 supported_protocol_versions: SupportedProtocolVersions,
3364 new_committee: Committee,
3365 epoch_start_configuration: EpochStartConfiguration,
3366 state_hasher: Arc<GlobalStateHasher>,
3367 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3368 epoch_supply_change: i64,
3369 epoch_last_checkpoint: CheckpointSequenceNumber,
3370 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3371 Self::check_protocol_version(
3372 supported_protocol_versions,
3373 epoch_start_configuration
3374 .epoch_start_state()
3375 .protocol_version(),
3376 );
3377 self.metrics.reset_on_reconfigure();
3378 self.committee_store.insert_new_committee(&new_committee)?;
3379
3380 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3382
3383 cur_epoch_store.epoch_terminated().await;
3385
3386 let highest_locally_built_checkpoint_seq = self
3387 .checkpoint_store
3388 .get_latest_locally_computed_checkpoint()?
3389 .map(|c| *c.sequence_number())
3390 .unwrap_or(0);
3391
3392 assert!(
3393 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3394 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3395 );
3396 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint
3397 || self.is_fullnode(cur_epoch_store)
3398 {
3399 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3403 if num_shared_version_assignments > 10 {
3410 debug_fatal!(
3412 "all shared_version_assignments should have been removed \
3413 (num_shared_version_assignments: {num_shared_version_assignments})"
3414 );
3415 }
3416 }
3417
3418 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3424 .await?;
3425 self.get_reconfig_api()
3426 .clear_state_end_of_epoch(&execution_lock);
3427 self.check_system_consistency(
3428 cur_epoch_store,
3429 state_hasher,
3430 expensive_safety_check_config,
3431 epoch_supply_change,
3432 )?;
3433 self.get_reconfig_api()
3434 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3435 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3436 if self
3437 .db_checkpoint_config
3438 .perform_db_checkpoints_at_epoch_end
3439 {
3440 let checkpoint_indexes = self
3441 .db_checkpoint_config
3442 .perform_index_db_checkpoints_at_epoch_end
3443 .unwrap_or(false);
3444 let current_epoch = cur_epoch_store.epoch();
3445 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3446 self.checkpoint_all_dbs(
3447 &epoch_checkpoint_path,
3448 cur_epoch_store,
3449 checkpoint_indexes,
3450 )?;
3451 }
3452 }
3453
3454 self.get_reconfig_api()
3455 .reconfigure_cache(&epoch_start_configuration)
3456 .await;
3457
3458 let new_epoch = new_committee.epoch;
3459 let new_epoch_store = self
3460 .reopen_epoch_db(
3461 cur_epoch_store,
3462 new_committee,
3463 epoch_start_configuration,
3464 expensive_safety_check_config,
3465 epoch_last_checkpoint,
3466 )
3467 .await?;
3468 assert_eq!(new_epoch_store.epoch(), new_epoch);
3469 self.transaction_manager.reconfigure(new_epoch);
3470 *execution_lock = new_epoch;
3471 Ok(new_epoch_store)
3475 }
3476
3477 pub async fn reconfigure_for_testing(&self) {
3482 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3483 let epoch_store = self.epoch_store_for_testing().clone();
3484 let protocol_config = epoch_store.protocol_config().clone();
3485 let _guard =
3493 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3494 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3495 self.get_backing_package_store().clone(),
3496 &self.config.expensive_safety_check_config,
3497 self.checkpoint_store
3498 .get_epoch_last_checkpoint(epoch_store.epoch())
3499 .unwrap()
3500 .map(|c| *c.sequence_number())
3501 .unwrap_or_default(),
3502 );
3503 let new_epoch = new_epoch_store.epoch();
3504 self.transaction_manager.reconfigure(new_epoch);
3505 self.epoch_store.store(new_epoch_store);
3506 epoch_store.epoch_terminated().await;
3507 *execution_lock = new_epoch;
3508 }
3509
3510 #[instrument(level = "error", skip_all)]
3511 fn check_system_consistency(
3512 &self,
3513 cur_epoch_store: &AuthorityPerEpochStore,
3514 state_hasher: Arc<GlobalStateHasher>,
3515 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3516 epoch_supply_change: i64,
3517 ) -> IotaResult<()> {
3518 info!(
3519 "Performing iota conservation consistency check for epoch {}",
3520 cur_epoch_store.epoch()
3521 );
3522
3523 if cfg!(debug_assertions) {
3524 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3525 }
3526
3527 self.get_reconfig_api()
3528 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3529
3530 if expensive_safety_check_config.enable_state_consistency_check() {
3532 info!(
3533 "Performing state consistency check for epoch {}",
3534 cur_epoch_store.epoch()
3535 );
3536 self.expensive_check_is_consistent_state(
3537 state_hasher,
3538 cur_epoch_store,
3539 cfg!(debug_assertions), );
3541 }
3542
3543 if expensive_safety_check_config.enable_secondary_index_checks() {
3544 if let Some(indexes) = self.indexes.clone() {
3545 verify_indexes(self.get_global_state_hash_store().as_ref(), indexes)
3546 .expect("secondary indexes are inconsistent");
3547 }
3548 }
3549
3550 Ok(())
3551 }
3552
3553 fn expensive_check_is_consistent_state(
3554 &self,
3555 state_hasher: Arc<GlobalStateHasher>,
3556 cur_epoch_store: &AuthorityPerEpochStore,
3557 panic: bool,
3558 ) {
3559 let live_object_set_hash = state_hasher.digest_live_object_set();
3560
3561 let root_state_hash: ECMHLiveObjectSetDigest = self
3562 .get_global_state_hash_store()
3563 .get_root_state_hash_for_epoch(cur_epoch_store.epoch())
3564 .expect("Retrieving root state hash cannot fail")
3565 .expect("Root state hash for epoch must exist")
3566 .1
3567 .digest()
3568 .into();
3569
3570 let is_inconsistent = root_state_hash != live_object_set_hash;
3571 if is_inconsistent {
3572 if panic {
3573 panic!(
3574 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3575 );
3576 } else {
3577 error!(
3578 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3579 root_state_hash, live_object_set_hash
3580 );
3581 }
3582 } else {
3583 info!("State consistency check passed");
3584 }
3585
3586 if !panic {
3587 state_hasher.set_inconsistent_state(is_inconsistent);
3588 }
3589 }
3590
3591 pub fn current_epoch_for_testing(&self) -> EpochId {
3592 self.epoch_store_for_testing().epoch()
3593 }
3594
3595 #[instrument(level = "error", skip_all)]
3596 pub fn checkpoint_all_dbs(
3597 &self,
3598 checkpoint_path: &Path,
3599 cur_epoch_store: &AuthorityPerEpochStore,
3600 checkpoint_indexes: bool,
3601 ) -> IotaResult {
3602 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3603 let current_epoch = cur_epoch_store.epoch();
3604
3605 if checkpoint_path.exists() {
3606 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3607 return Ok(());
3608 }
3609
3610 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3611 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3612
3613 if checkpoint_path_tmp.exists() {
3614 fs::remove_dir_all(&checkpoint_path_tmp)
3615 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3616 }
3617
3618 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3619 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3620
3621 self.checkpoint_store
3624 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3625
3626 self.get_reconfig_api()
3627 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3628
3629 self.committee_store
3630 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3631
3632 if checkpoint_indexes {
3633 if let Some(indexes) = self.indexes.as_ref() {
3634 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3635 }
3636 if let Some(grpc_indexes_store) = self.grpc_indexes_store.as_ref() {
3637 grpc_indexes_store.checkpoint_db(&checkpoint_path_tmp.join(GRPC_INDEXES_DIR))?;
3638 }
3639 }
3640
3641 fs::rename(checkpoint_path_tmp, checkpoint_path)
3642 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3643 Ok(())
3644 }
3645
3646 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3653 self.epoch_store.load()
3654 }
3655
3656 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3658 self.load_epoch_store_one_call_per_task()
3659 }
3660
3661 pub fn clone_committee_for_testing(&self) -> Committee {
3662 Committee::clone(self.epoch_store_for_testing().committee())
3663 }
3664
3665 #[instrument(level = "trace", skip_all)]
3666 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3667 self.get_object_store()
3668 .try_get_object(object_id)
3669 .map_err(Into::into)
3670 }
3671
3672 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3674 self.try_get_object(object_id)
3675 .await
3676 .expect("storage access failed")
3677 }
3678
3679 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3680 Ok(self
3681 .try_get_object(&ObjectID::SYSTEM)
3682 .await?
3683 .expect("system package should always exist")
3684 .compute_object_reference())
3685 }
3686
3687 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3689 self.get_object_cache_reader()
3690 .try_get_iota_system_state_object_unsafe()
3691 }
3692
3693 #[instrument(level = "trace", skip_all)]
3694 pub fn get_checkpoint_by_sequence_number(
3695 &self,
3696 sequence_number: CheckpointSequenceNumber,
3697 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3698 Ok(self
3699 .checkpoint_store
3700 .get_checkpoint_by_sequence_number(sequence_number)?)
3701 }
3702
3703 pub async fn wait_for_checkpoint_inclusion(
3710 &self,
3711 digests: &[TransactionDigest],
3712 timeout: Duration,
3713 ) -> IotaResult<BTreeMap<TransactionDigest, (CheckpointSequenceNumber, u64)>> {
3714 let epoch_store = self.load_epoch_store_one_call_per_task();
3715
3716 let mut checkpoint_timestamp_cache = HashMap::<CheckpointSequenceNumber, u64>::new();
3719
3720 let results = epoch_store
3721 .wait_for_transactions_in_checkpoint_with_timeout(digests, timeout, |seq| {
3722 *checkpoint_timestamp_cache.entry(seq).or_insert_with(|| {
3723 self.get_checkpoint_by_sequence_number(seq)
3724 .ok()
3725 .flatten()
3726 .map(|c| c.timestamp_ms)
3727 .unwrap_or(0)
3728 })
3729 })
3730 .await?;
3731
3732 Ok(digests
3733 .iter()
3734 .copied()
3735 .zip(results)
3736 .filter_map(|(digest, opt)| opt.map(|seq_and_ts| (digest, seq_and_ts)))
3737 .collect())
3738 }
3739
3740 #[instrument(level = "trace", skip_all)]
3741 pub fn get_transaction_checkpoint_for_tests(
3742 &self,
3743 digest: &TransactionDigest,
3744 epoch_store: &AuthorityPerEpochStore,
3745 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3746 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3747 let Some(checkpoint) = checkpoint else {
3748 return Ok(None);
3749 };
3750 let checkpoint = self
3751 .checkpoint_store
3752 .get_checkpoint_by_sequence_number(checkpoint)?;
3753 Ok(checkpoint)
3754 }
3755
3756 #[instrument(level = "trace", skip_all)]
3757 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3758 Ok(
3759 match self
3760 .get_object_cache_reader()
3761 .try_get_latest_object_or_tombstone(*object_id)?
3762 {
3763 Some((_, ObjectOrTombstone::Object(object))) => {
3764 let layout = self.get_object_layout(&object)?;
3765 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3766 }
3767 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3768 None => ObjectRead::NotExists(*object_id),
3769 },
3770 )
3771 }
3772
3773 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3775 self.chain_identifier
3776 }
3777
3778 #[instrument(level = "trace", skip_all)]
3779 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3780 where
3781 T: DeserializeOwned,
3782 {
3783 let o = self.get_object_read(object_id)?.into_object()?;
3784 if let Some(move_object) = o.data.as_struct_opt() {
3785 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3786 IotaError::ObjectDeserialization {
3787 error: format!("{e}"),
3788 }
3789 })?)
3790 } else {
3791 Err(IotaError::ObjectDeserialization {
3792 error: format!("Provided object : [{object_id}] is not a Move object."),
3793 })
3794 }
3795 }
3796
3797 #[instrument(level = "trace", skip_all)]
3803 pub fn get_past_object_read(
3804 &self,
3805 object_id: &ObjectID,
3806 version: SequenceNumber,
3807 ) -> IotaResult<PastObjectRead> {
3808 let Some(obj_ref) = self
3810 .get_object_cache_reader()
3811 .try_get_latest_object_ref_or_tombstone(*object_id)?
3812 else {
3813 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3814 };
3815
3816 if version > obj_ref.version {
3817 return Ok(PastObjectRead::VersionTooHigh {
3818 object_id: *object_id,
3819 asked_version: version,
3820 latest_version: obj_ref.version,
3821 });
3822 }
3823
3824 if version < obj_ref.version {
3825 return Ok(match self.read_object_at_version(object_id, version)? {
3827 Some((object, layout)) => {
3828 let obj_ref = object.compute_object_reference();
3829 PastObjectRead::VersionFound(obj_ref, object, layout)
3830 }
3831
3832 None => PastObjectRead::VersionNotFound(*object_id, version),
3833 });
3834 }
3835
3836 if !obj_ref.digest.is_object_alive() {
3837 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3838 }
3839
3840 match self.read_object_at_version(object_id, obj_ref.version)? {
3841 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3842 None => {
3843 error!(
3844 "Object with in parent_entry is missing from object store, datastore is \
3845 inconsistent",
3846 );
3847 Err(UserInputError::ObjectNotFound {
3848 object_id: *object_id,
3849 version: Some(obj_ref.version),
3850 }
3851 .into())
3852 }
3853 }
3854 }
3855
3856 #[instrument(level = "trace", skip_all)]
3857 fn read_object_at_version(
3858 &self,
3859 object_id: &ObjectID,
3860 version: SequenceNumber,
3861 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3862 let Some(object) = self
3863 .get_object_cache_reader()
3864 .try_get_object_by_key(object_id, version)?
3865 else {
3866 return Ok(None);
3867 };
3868
3869 let layout = self.get_object_layout(&object)?;
3870 Ok(Some((object, layout)))
3871 }
3872
3873 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3874 let layout = object
3875 .data
3876 .as_struct_opt()
3877 .map(|object| {
3878 into_struct_layout(
3879 self.load_epoch_store_one_call_per_task()
3880 .executor()
3881 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3883 .get_annotated_layout(object.struct_tag())?,
3884 )
3885 })
3886 .transpose()?;
3887 Ok(layout)
3888 }
3889
3890 fn get_owner_at_version(
3891 &self,
3892 object_id: &ObjectID,
3893 version: SequenceNumber,
3894 ) -> IotaResult<Owner> {
3895 self.get_object_store()
3896 .try_get_object_by_key(object_id, version)?
3897 .ok_or_else(|| {
3898 IotaError::from(UserInputError::ObjectNotFound {
3899 object_id: *object_id,
3900 version: Some(version),
3901 })
3902 })
3903 .map(|o| o.owner)
3904 }
3905
3906 #[instrument(level = "trace", skip_all)]
3907 pub fn get_owner_objects(
3908 &self,
3909 owner: IotaAddress,
3910 cursor: Option<ObjectID>,
3912 limit: usize,
3913 filter: Option<IotaObjectDataFilter>,
3914 ) -> IotaResult<Vec<ObjectInfo>> {
3915 if let Some(indexes) = &self.indexes {
3916 indexes.get_owner_objects(owner, cursor, limit, filter)
3917 } else {
3918 Err(IotaError::IndexStoreNotAvailable)
3919 }
3920 }
3921
3922 #[instrument(level = "trace", skip_all)]
3923 pub fn get_owned_coins_iterator_with_cursor(
3924 &self,
3925 owner: IotaAddress,
3926 cursor: (String, ObjectID),
3928 limit: usize,
3929 one_coin_type_only: bool,
3930 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3931 if let Some(indexes) = &self.indexes {
3932 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3933 } else {
3934 Err(IotaError::IndexStoreNotAvailable)
3935 }
3936 }
3937
3938 #[instrument(level = "trace", skip_all)]
3939 pub fn get_owner_objects_iterator(
3940 &self,
3941 owner: IotaAddress,
3942 cursor: Option<ObjectID>,
3944 filter: Option<IotaObjectDataFilter>,
3945 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3946 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3947 if let Some(indexes) = &self.indexes {
3948 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3949 } else {
3950 Err(IotaError::IndexStoreNotAvailable)
3951 }
3952 }
3953
3954 #[instrument(level = "trace", skip_all)]
3955 pub async fn get_move_objects<T>(
3956 &self,
3957 owner: IotaAddress,
3958 tag: StructTag,
3959 ) -> IotaResult<Vec<T>>
3960 where
3961 T: DeserializeOwned,
3962 {
3963 let object_ids = self
3964 .get_owner_objects_iterator(owner, None, None)?
3965 .filter(|o| match &o.type_ {
3966 ObjectType::Struct(s) => *s == tag,
3967 ObjectType::Package => false,
3968 })
3969 .map(|info| ObjectKey(info.object_id, info.version))
3970 .collect::<Vec<_>>();
3971 let mut move_objects = vec![];
3972
3973 let objects = self
3974 .get_object_store()
3975 .try_multi_get_objects_by_key(&object_ids)?;
3976
3977 for (o, id) in objects.into_iter().zip(object_ids) {
3978 let object = o.ok_or_else(|| {
3979 IotaError::from(UserInputError::ObjectNotFound {
3980 object_id: id.0,
3981 version: Some(id.1),
3982 })
3983 })?;
3984 let move_object = object.data.as_struct_opt().ok_or_else(|| {
3985 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3986 })?;
3987 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3988 IotaError::ObjectDeserialization {
3989 error: format!("{e}"),
3990 }
3991 })?);
3992 }
3993 Ok(move_objects)
3994 }
3995
3996 #[instrument(level = "trace", skip_all)]
3997 pub fn get_dynamic_fields(
3998 &self,
3999 owner: ObjectID,
4000 cursor: Option<ObjectID>,
4002 limit: usize,
4003 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
4004 Ok(self
4005 .get_dynamic_fields_iterator(owner, cursor)?
4006 .take(limit)
4007 .collect::<Result<Vec<_>, _>>()?)
4008 }
4009
4010 fn get_dynamic_fields_iterator(
4011 &self,
4012 owner: ObjectID,
4013 cursor: Option<ObjectID>,
4015 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
4016 {
4017 if let Some(indexes) = &self.indexes {
4018 indexes.get_dynamic_fields_iterator(owner, cursor)
4019 } else {
4020 Err(IotaError::IndexStoreNotAvailable)
4021 }
4022 }
4023
4024 #[instrument(level = "trace", skip_all)]
4025 pub fn get_dynamic_field_object_id(
4026 &self,
4027 owner: ObjectID,
4028 name_type: TypeTag,
4029 name_bcs_bytes: &[u8],
4030 ) -> IotaResult<Option<ObjectID>> {
4031 if let Some(indexes) = &self.indexes {
4032 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
4033 } else {
4034 Err(IotaError::IndexStoreNotAvailable)
4035 }
4036 }
4037
4038 #[instrument(level = "trace", skip_all)]
4039 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
4040 Ok(self.get_indexes()?.next_sequence_number())
4041 }
4042
4043 #[instrument(level = "trace", skip_all)]
4044 pub async fn get_executed_transaction_and_effects(
4045 &self,
4046 digest: TransactionDigest,
4047 kv_store: Arc<TransactionKeyValueStore>,
4048 ) -> IotaResult<(Transaction, TransactionEffects)> {
4049 let transaction = kv_store.get_tx(digest).await?;
4050 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
4051 Ok((transaction, effects))
4052 }
4053
4054 #[instrument(level = "trace", skip_all)]
4055 pub fn multi_get_checkpoint_by_sequence_number(
4056 &self,
4057 sequence_numbers: &[CheckpointSequenceNumber],
4058 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
4059 Ok(self
4060 .checkpoint_store
4061 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
4062 }
4063
4064 #[instrument(level = "trace", skip_all)]
4065 pub fn get_transaction_events(
4066 &self,
4067 digest: &TransactionDigest,
4068 ) -> IotaResult<TransactionEvents> {
4069 self.get_transaction_cache_reader()
4070 .try_get_events(digest)?
4071 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
4072 }
4073
4074 pub fn get_transaction_input_objects(
4075 &self,
4076 effects: &TransactionEffects,
4077 ) -> anyhow::Result<Vec<Object>> {
4078 iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
4079 .map_err(Into::into)
4080 }
4081
4082 pub fn get_transaction_output_objects(
4083 &self,
4084 effects: &TransactionEffects,
4085 ) -> anyhow::Result<Vec<Object>> {
4086 iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
4087 .map_err(Into::into)
4088 }
4089
4090 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
4091 match &self.indexes {
4092 Some(i) => Ok(i.clone()),
4093 None => Err(IotaError::UnsupportedFeature {
4094 error: "extended object indexing is not enabled on this server".into(),
4095 }),
4096 }
4097 }
4098
4099 pub async fn get_transactions_for_tests(
4100 self: &Arc<Self>,
4101 filter: Option<TransactionFilter>,
4102 cursor: Option<TransactionDigest>,
4103 limit: Option<usize>,
4104 reverse: bool,
4105 ) -> IotaResult<Vec<TransactionDigest>> {
4106 let metrics = KeyValueStoreMetrics::new_for_tests();
4107 let kv_store = Arc::new(TransactionKeyValueStore::new(
4108 "rocksdb",
4109 metrics,
4110 self.clone(),
4111 ));
4112 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
4113 .await
4114 }
4115
4116 #[instrument(level = "trace", skip_all)]
4117 pub async fn get_transactions(
4118 &self,
4119 kv_store: &Arc<TransactionKeyValueStore>,
4120 filter: Option<TransactionFilter>,
4121 cursor: Option<TransactionDigest>,
4123 limit: Option<usize>,
4124 reverse: bool,
4125 ) -> IotaResult<Vec<TransactionDigest>> {
4126 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
4127 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
4128 let iter = checkpoint_contents.iter().map(|c| c.transaction);
4129 if reverse {
4130 let iter = iter
4131 .rev()
4132 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4133 .skip(usize::from(cursor.is_some()));
4134 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4135 } else {
4136 let iter = iter
4137 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4138 .skip(usize::from(cursor.is_some()));
4139 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4140 }
4141 }
4142 self.get_indexes()?
4143 .get_transactions(filter, cursor, limit, reverse)
4144 }
4145
4146 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4147 &self.checkpoint_store
4148 }
4149
4150 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4151 self.get_checkpoint_store()
4152 .get_highest_executed_checkpoint_seq_number()?
4153 .ok_or(IotaError::UserInput {
4154 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4155 })
4156 }
4157
4158 #[cfg(msim)]
4159 pub fn get_highest_pruned_checkpoint_for_testing(
4160 &self,
4161 ) -> IotaResult<CheckpointSequenceNumber> {
4162 self.database_for_testing()
4163 .perpetual_tables
4164 .get_highest_pruned_checkpoint()
4165 .map(|c| c.unwrap_or(0))
4166 .map_err(Into::into)
4167 }
4168
4169 #[instrument(level = "trace", skip_all)]
4170 pub fn get_checkpoint_summary_by_sequence_number(
4171 &self,
4172 sequence_number: CheckpointSequenceNumber,
4173 ) -> IotaResult<CheckpointSummary> {
4174 let verified_checkpoint = self
4175 .get_checkpoint_store()
4176 .get_checkpoint_by_sequence_number(sequence_number)?;
4177 match verified_checkpoint {
4178 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4179 None => Err(IotaError::UserInput {
4180 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4181 }),
4182 }
4183 }
4184
4185 #[instrument(level = "trace", skip_all)]
4186 pub fn get_checkpoint_summary_by_digest(
4187 &self,
4188 digest: CheckpointDigest,
4189 ) -> IotaResult<CheckpointSummary> {
4190 let verified_checkpoint = self
4191 .get_checkpoint_store()
4192 .get_checkpoint_by_digest(&digest)?;
4193 match verified_checkpoint {
4194 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4195 None => Err(IotaError::UserInput {
4196 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4197 }),
4198 }
4199 }
4200
4201 #[instrument(level = "trace", skip_all)]
4202 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4203 if package_id.is_system_package() {
4204 return self.find_genesis_txn_digest();
4205 }
4206 Ok(self
4207 .get_object_read(&package_id)?
4208 .into_object()?
4209 .previous_transaction)
4210 }
4211
4212 #[instrument(level = "trace", skip_all)]
4213 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4214 let summary = self
4215 .get_verified_checkpoint_by_sequence_number(0)?
4216 .into_message();
4217 let content = self.get_checkpoint_contents(summary.content_digest)?;
4218 let genesis_transaction = content.enumerate_transactions(&summary).next();
4219 Ok(genesis_transaction
4220 .ok_or(IotaError::UserInput {
4221 error: UserInputError::GenesisTransactionNotFound,
4222 })?
4223 .1
4224 .transaction)
4225 }
4226
4227 #[instrument(level = "trace", skip_all)]
4228 pub fn get_verified_checkpoint_by_sequence_number(
4229 &self,
4230 sequence_number: CheckpointSequenceNumber,
4231 ) -> IotaResult<VerifiedCheckpoint> {
4232 let verified_checkpoint = self
4233 .get_checkpoint_store()
4234 .get_checkpoint_by_sequence_number(sequence_number)?;
4235 match verified_checkpoint {
4236 Some(verified_checkpoint) => Ok(verified_checkpoint),
4237 None => Err(IotaError::UserInput {
4238 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4239 }),
4240 }
4241 }
4242
4243 #[instrument(level = "trace", skip_all)]
4244 pub fn get_verified_checkpoint_summary_by_digest(
4245 &self,
4246 digest: CheckpointDigest,
4247 ) -> IotaResult<VerifiedCheckpoint> {
4248 let verified_checkpoint = self
4249 .get_checkpoint_store()
4250 .get_checkpoint_by_digest(&digest)?;
4251 match verified_checkpoint {
4252 Some(verified_checkpoint) => Ok(verified_checkpoint),
4253 None => Err(IotaError::UserInput {
4254 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4255 }),
4256 }
4257 }
4258
4259 #[instrument(level = "trace", skip_all)]
4260 pub fn get_checkpoint_contents(
4261 &self,
4262 digest: CheckpointContentsDigest,
4263 ) -> IotaResult<CheckpointContents> {
4264 self.get_checkpoint_store()
4265 .get_checkpoint_contents(&digest)?
4266 .ok_or(IotaError::UserInput {
4267 error: UserInputError::CheckpointContentsNotFound(digest),
4268 })
4269 }
4270
4271 #[instrument(level = "trace", skip_all)]
4272 pub fn get_checkpoint_contents_by_sequence_number(
4273 &self,
4274 sequence_number: CheckpointSequenceNumber,
4275 ) -> IotaResult<CheckpointContents> {
4276 let verified_checkpoint = self
4277 .get_checkpoint_store()
4278 .get_checkpoint_by_sequence_number(sequence_number)?;
4279 match verified_checkpoint {
4280 Some(verified_checkpoint) => {
4281 let content_digest = verified_checkpoint.into_inner().content_digest;
4282 self.get_checkpoint_contents(content_digest)
4283 }
4284 None => Err(IotaError::UserInput {
4285 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4286 }),
4287 }
4288 }
4289
4290 #[instrument(level = "trace", skip_all)]
4291 pub async fn query_events(
4292 &self,
4293 kv_store: &Arc<TransactionKeyValueStore>,
4294 query: EventFilter,
4295 cursor: Option<EventID>,
4297 limit: usize,
4298 descending: bool,
4299 ) -> IotaResult<Vec<IotaEvent>> {
4300 let index_store = self.get_indexes()?;
4301
4302 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4304 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4305 IotaError::TransactionNotFound {
4306 digest: cursor.tx_digest,
4307 },
4308 )?;
4309 (tx_seq, cursor.event_seq as usize)
4310 } else if descending {
4311 (u64::MAX, usize::MAX)
4312 } else {
4313 (0, 0)
4314 };
4315
4316 let limit = limit + 1;
4317 let mut event_keys = match query {
4318 EventFilter::All(filters) => {
4319 if filters.is_empty() {
4320 index_store.all_events(tx_num, event_num, limit, descending)?
4321 } else {
4322 return Err(IotaError::UserInput {
4323 error: UserInputError::Unsupported(
4324 "This query type does not currently support filter combinations"
4325 .to_string(),
4326 ),
4327 });
4328 }
4329 }
4330 EventFilter::Transaction(digest) => {
4331 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4332 }
4333 EventFilter::MoveModule { package, module } => {
4334 let module_id = ModuleId::new(
4335 AccountAddress::new(package.into_bytes()),
4336 move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4337 );
4338 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4339 }
4340 EventFilter::MoveEventType(struct_name) => index_store
4341 .events_by_move_event_struct_name(
4342 &struct_name,
4343 tx_num,
4344 event_num,
4345 limit,
4346 descending,
4347 )?,
4348 EventFilter::Sender(sender) => {
4349 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4350 }
4351 EventFilter::TimeRange {
4352 start_time,
4353 end_time,
4354 } => index_store
4355 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4356 EventFilter::MoveEventModule { package, module } => index_store
4357 .events_by_move_event_module(
4358 &ModuleId::new(
4359 AccountAddress::new(package.into_bytes()),
4360 move_core_types::identifier::Identifier::new(module.as_str()).unwrap(),
4361 ),
4362 tx_num,
4363 event_num,
4364 limit,
4365 descending,
4366 )?,
4367 EventFilter::Package(_)
4369 | EventFilter::MoveEventField { .. }
4370 | EventFilter::Any(_)
4371 | EventFilter::And(_, _)
4372 | EventFilter::Or(_, _) => {
4373 return Err(IotaError::UserInput {
4374 error: UserInputError::Unsupported(
4375 "This query type is not supported by the full node.".to_string(),
4376 ),
4377 });
4378 }
4379 };
4380
4381 if cursor.is_some() {
4384 if !event_keys.is_empty() {
4385 event_keys.remove(0);
4386 }
4387 } else {
4388 event_keys.truncate(limit - 1);
4389 }
4390
4391 let transaction_digests = event_keys
4393 .iter()
4394 .map(|(_, digest, _, _)| *digest)
4395 .collect::<HashSet<_>>()
4396 .into_iter()
4397 .collect::<Vec<_>>();
4398
4399 let events = kv_store
4400 .multi_get_events_by_tx_digests(&transaction_digests)
4401 .await?;
4402
4403 let events_map: HashMap<_, _> =
4404 transaction_digests.iter().zip(events.into_iter()).collect();
4405
4406 let stored_events = event_keys
4407 .into_iter()
4408 .map(|k| {
4409 (
4410 k,
4411 events_map
4412 .get(&k.1)
4413 .expect("fetched digest is missing")
4414 .clone()
4415 .and_then(|e| e.data.get(k.2).cloned()),
4416 )
4417 })
4418 .map(
4419 |((_event_digest, tx_digest, event_seq, timestamp), event)| {
4420 event
4421 .map(|e| (e, tx_digest, event_seq, timestamp))
4422 .ok_or(IotaError::TransactionEventsNotFound { digest: tx_digest })
4423 },
4424 )
4425 .collect::<Result<Vec<_>, _>>()?;
4426
4427 let epoch_store = self.load_epoch_store_one_call_per_task();
4428 let backing_store = self.get_backing_package_store().as_ref();
4429 let mut layout_resolver = epoch_store
4430 .executor()
4431 .type_layout_resolver(Box::new(backing_store));
4432 let mut events = vec![];
4433 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4434 events.push(IotaEvent::try_from(
4435 e.clone(),
4436 tx_digest,
4437 event_seq as u64,
4438 Some(timestamp),
4439 layout_resolver.get_annotated_layout(&e.type_)?,
4440 )?)
4441 }
4442 Ok(events)
4443 }
4444
4445 pub async fn insert_genesis_object(&self, object: Object) {
4446 self.get_reconfig_api()
4447 .try_insert_genesis_object(object)
4448 .expect("Cannot insert genesis object")
4449 }
4450
4451 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4452 futures::future::join_all(
4453 objects
4454 .iter()
4455 .map(|o| self.insert_genesis_object(o.clone())),
4456 )
4457 .await;
4458 }
4459
4460 #[instrument(level = "trace", skip_all)]
4462 pub fn get_transaction_status(
4463 &self,
4464 transaction_digest: &TransactionDigest,
4465 epoch_store: &Arc<AuthorityPerEpochStore>,
4466 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4467 if let Some(effects) =
4469 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4470 {
4471 if let Some(transaction) = self
4472 .get_transaction_cache_reader()
4473 .try_get_transaction_block(transaction_digest)?
4474 {
4475 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4476 let events = if effects.events_digest().is_some() {
4477 self.get_transaction_events(effects.transaction_digest())?
4478 } else {
4479 TransactionEvents::default()
4480 };
4481 return Ok(Some((
4482 (*transaction).clone().into_message(),
4483 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4484 )));
4485 } else {
4486 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4491 }
4492 }
4493 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4494 self.metrics.tx_already_processed.inc();
4495 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4496 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4497 } else {
4498 Ok(None)
4499 }
4500 }
4501
4502 #[instrument(level = "trace", skip_all)]
4506 pub fn get_signed_effects_and_maybe_resign(
4507 &self,
4508 transaction_digest: &TransactionDigest,
4509 epoch_store: &Arc<AuthorityPerEpochStore>,
4510 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4511 let effects = self
4512 .get_transaction_cache_reader()
4513 .try_get_executed_effects(transaction_digest)?;
4514 match effects {
4515 Some(effects) => {
4516 if effects.epoch() != epoch_store.epoch() {
4539 debug!(
4540 tx_digest=?transaction_digest,
4541 effects_epoch=?effects.epoch(),
4542 epoch=?epoch_store.epoch(),
4543 "Re-signing the effects with the current epoch"
4544 );
4545 }
4546 Ok(Some(self.sign_effects(effects, epoch_store)?))
4547 }
4548 None => Ok(None),
4549 }
4550 }
4551
4552 #[instrument(level = "trace", skip_all)]
4553 pub(crate) fn sign_effects(
4554 &self,
4555 effects: TransactionEffects,
4556 epoch_store: &Arc<AuthorityPerEpochStore>,
4557 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4558 let tx_digest = *effects.transaction_digest();
4559 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4560 Some(sig) => {
4561 debug_assert!(sig.epoch == epoch_store.epoch());
4562 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4563 }
4564 _ => {
4565 let sig = AuthoritySignInfo::new(
4566 epoch_store.epoch(),
4567 &effects,
4568 Intent::iota_app(IntentScope::TransactionEffects),
4569 self.name,
4570 &*self.secret,
4571 );
4572
4573 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4574
4575 epoch_store.insert_effects_digest_and_signature(
4576 &tx_digest,
4577 effects.digest(),
4578 &sig,
4579 )?;
4580
4581 effects
4582 }
4583 };
4584
4585 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4586 signed_effects,
4587 ))
4588 }
4589
4590 #[instrument(level = "trace", skip_all)]
4592 fn fullnode_only_get_tx_coins_for_indexing(
4593 &self,
4594 effects: &TransactionEffects,
4595 inner_temporary_store: &InnerTemporaryStore,
4596 epoch_store: &Arc<AuthorityPerEpochStore>,
4597 ) -> Option<TxCoins> {
4598 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4599 return None;
4600 }
4601 let written_coin_objects = inner_temporary_store
4602 .written
4603 .iter()
4604 .filter_map(|(k, v)| {
4605 if v.is_coin() {
4606 Some((*k, v.clone()))
4607 } else {
4608 None
4609 }
4610 })
4611 .collect();
4612 let mut input_coin_objects = inner_temporary_store
4613 .input_objects
4614 .iter()
4615 .filter_map(|(k, v)| {
4616 if v.is_coin() {
4617 Some((*k, v.clone()))
4618 } else {
4619 None
4620 }
4621 })
4622 .collect::<ObjectMap>();
4623
4624 for (object_id, version) in effects.modified_at_versions() {
4629 if inner_temporary_store
4630 .loaded_runtime_objects
4631 .contains_key(&object_id)
4632 {
4633 if let Some(object) = self
4634 .get_object_store()
4635 .get_object_by_key(&object_id, version)
4636 {
4637 if object.is_coin() {
4638 input_coin_objects.insert(object_id, object);
4639 }
4640 }
4641 }
4642 }
4643
4644 Some((input_coin_objects, written_coin_objects))
4645 }
4646
4647 #[instrument(level = "trace", skip_all)]
4659 pub async fn get_transaction_lock(
4660 &self,
4661 object_ref: &ObjectRef,
4662 epoch_store: &AuthorityPerEpochStore,
4663 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4664 let lock_info = self
4665 .get_object_cache_reader()
4666 .try_get_lock(*object_ref, epoch_store)?;
4667 let lock_info = match lock_info {
4668 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4669 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4670 provided_obj_ref: *object_ref,
4671 current_version: locked_ref.version,
4672 }
4673 .into());
4674 }
4675 ObjectLockStatus::Initialized => {
4676 return Ok(None);
4677 }
4678 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4679 };
4680
4681 epoch_store.get_signed_transaction(&lock_info)
4682 }
4683
4684 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4685 self.get_object_cache_reader().try_get_objects(objects)
4686 }
4687
4688 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4690 self.try_get_objects(objects)
4691 .await
4692 .expect("storage access failed")
4693 }
4694
4695 pub async fn try_get_object_or_tombstone(
4696 &self,
4697 object_id: ObjectID,
4698 ) -> IotaResult<Option<ObjectRef>> {
4699 self.get_object_cache_reader()
4700 .try_get_latest_object_ref_or_tombstone(object_id)
4701 }
4702
4703 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4705 self.try_get_object_or_tombstone(object_id)
4706 .await
4707 .expect("storage access failed")
4708 }
4709
4710 pub fn set_override_protocol_upgrade_buffer_stake(
4720 &self,
4721 expected_epoch: EpochId,
4722 buffer_stake_bps: u64,
4723 ) -> IotaResult {
4724 let epoch_store = self.load_epoch_store_one_call_per_task();
4725 let actual_epoch = epoch_store.epoch();
4726 if actual_epoch != expected_epoch {
4727 return Err(IotaError::WrongEpoch {
4728 expected_epoch,
4729 actual_epoch,
4730 });
4731 }
4732
4733 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4734 }
4735
4736 pub fn clear_override_protocol_upgrade_buffer_stake(
4737 &self,
4738 expected_epoch: EpochId,
4739 ) -> IotaResult {
4740 let epoch_store = self.load_epoch_store_one_call_per_task();
4741 let actual_epoch = epoch_store.epoch();
4742 if actual_epoch != expected_epoch {
4743 return Err(IotaError::WrongEpoch {
4744 expected_epoch,
4745 actual_epoch,
4746 });
4747 }
4748
4749 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4750 }
4751
4752 pub async fn get_available_system_packages(
4756 &self,
4757 binary_config: &BinaryConfig,
4758 ) -> Vec<ObjectRef> {
4759 let mut results = vec![];
4760
4761 let system_packages = BuiltInFramework::iter_system_packages();
4762
4763 #[cfg(msim)]
4765 let extra_packages = framework_injection::get_extra_packages(self.name);
4766 #[cfg(msim)]
4767 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4768
4769 for system_package in system_packages {
4770 let modules = system_package.modules().to_vec();
4771 #[cfg(msim)]
4773 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4774 .unwrap_or(modules);
4775
4776 let Some(obj_ref) = iota_framework::compare_system_package(
4777 &self.get_object_store(),
4778 &system_package.id,
4779 &modules,
4780 system_package.dependencies.to_vec(),
4781 binary_config,
4782 )
4783 .await
4784 else {
4785 return vec![];
4786 };
4787 results.push(obj_ref);
4788 }
4789
4790 results
4791 }
4792
4793 async fn get_system_package_bytes(
4810 &self,
4811 system_packages: Vec<ObjectRef>,
4812 binary_config: &BinaryConfig,
4813 ) -> Option<Vec<SystemPackage>> {
4814 let ids: Vec<_> = system_packages
4815 .iter()
4816 .map(|object_ref| object_ref.object_id)
4817 .collect();
4818 let objects = self.get_objects(&ids).await;
4819
4820 let mut res = Vec::with_capacity(system_packages.len());
4821 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4822 let prev_transaction = match object {
4823 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4824 info!(
4826 "Framework {} does not need updating",
4827 system_package_ref.object_id
4828 );
4829 continue;
4830 }
4831
4832 Some(cur_object) => cur_object.previous_transaction,
4833 None => TransactionDigest::GENESIS_MARKER,
4834 };
4835
4836 #[cfg(msim)]
4837 let FrameworkSystemPackage {
4838 id: _,
4839 bytes,
4840 dependencies,
4841 } = framework_injection::get_override_system_package(
4842 &system_package_ref.object_id,
4843 self.name,
4844 )
4845 .unwrap_or_else(|| {
4846 BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone()
4847 });
4848
4849 #[cfg(not(msim))]
4850 let FrameworkSystemPackage {
4851 id: _,
4852 bytes,
4853 dependencies,
4854 } = BuiltInFramework::get_package_by_id(&system_package_ref.object_id).clone();
4855
4856 let modules: Vec<_> = bytes
4857 .iter()
4858 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4859 .collect();
4860
4861 let new_object = Object::new_system_package(
4862 &modules,
4863 system_package_ref.version,
4864 dependencies.clone(),
4865 prev_transaction,
4866 );
4867
4868 let new_ref = new_object.compute_object_reference();
4869 if new_ref != system_package_ref {
4870 error!(
4871 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4872 );
4873 return None;
4874 }
4875
4876 res.push(SystemPackage {
4877 version: system_package_ref.version,
4878 modules: bytes,
4879 dependencies,
4880 });
4881 }
4882
4883 Some(res)
4884 }
4885
4886 fn is_protocol_version_supported_v1(
4890 proposed_protocol_version: ProtocolVersion,
4891 committee: &Committee,
4892 capabilities: Vec<AuthorityCapabilitiesV1>,
4893 mut buffer_stake_bps: u64,
4894 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4895 if buffer_stake_bps > 10000 {
4896 warn!("clamping buffer_stake_bps to 10000");
4897 buffer_stake_bps = 10000;
4898 }
4899
4900 let mut desired_upgrades: Vec<_> = capabilities
4903 .into_iter()
4904 .filter_map(|mut cap| {
4905 if cap.available_system_packages.is_empty() {
4907 return None;
4908 }
4909
4910 cap.available_system_packages.sort();
4911
4912 info!(
4913 "validator {:?} supports {:?} with system packages: {:?}",
4914 cap.authority.concise(),
4915 cap.supported_protocol_versions,
4916 cap.available_system_packages,
4917 );
4918
4919 cap.supported_protocol_versions
4923 .get_version_digest(proposed_protocol_version)
4924 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4925 })
4926 .collect();
4927
4928 desired_upgrades.sort();
4931 desired_upgrades
4932 .into_iter()
4933 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4934 .into_iter()
4935 .find_map(|((digest, packages), group)| {
4936 assert!(!packages.is_empty());
4938
4939 let mut stake_aggregator: StakeAggregator<(), true> =
4940 StakeAggregator::new(Arc::new(committee.clone()));
4941
4942 for (_, _, authority) in group {
4943 stake_aggregator.insert_generic(authority, ());
4944 }
4945
4946 let total_votes = stake_aggregator.total_votes();
4947 let quorum_threshold = committee.quorum_threshold();
4948 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4949
4950 info!(
4951 protocol_config_digest = ?digest,
4952 ?total_votes,
4953 ?quorum_threshold,
4954 ?buffer_stake_bps,
4955 ?effective_threshold,
4956 ?proposed_protocol_version,
4957 ?packages,
4958 "support for upgrade"
4959 );
4960
4961 let has_support = total_votes >= effective_threshold;
4962 has_support.then_some((proposed_protocol_version, digest, packages))
4963 })
4964 }
4965
4966 fn choose_protocol_version_and_system_packages_v1(
4970 current_protocol_version: ProtocolVersion,
4971 current_protocol_digest: Digest,
4972 committee: &Committee,
4973 capabilities: Vec<AuthorityCapabilitiesV1>,
4974 buffer_stake_bps: u64,
4975 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4976 let mut next_protocol_version = current_protocol_version;
4977 let mut system_packages = vec![];
4978 let mut protocol_version_digest = current_protocol_digest;
4979
4980 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4984 next_protocol_version + 1,
4985 committee,
4986 capabilities.clone(),
4987 buffer_stake_bps,
4988 ) {
4989 next_protocol_version = version;
4990 protocol_version_digest = digest;
4991 system_packages = packages;
4992 }
4993
4994 (
4995 next_protocol_version,
4996 protocol_version_digest,
4997 system_packages,
4998 )
4999 }
5000
5001 fn get_validators_supporting_protocol_version(
5006 target_protocol_version: ProtocolVersion,
5007 target_digest: Digest,
5008 active_validators: &[AuthorityPublicKey],
5009 capabilities: &[AuthorityCapabilitiesV1],
5010 ) -> Vec<u64> {
5011 let mut eligible_validators = Vec::new();
5012
5013 for capability in capabilities {
5014 if let Some(digest) = capability
5016 .supported_protocol_versions
5017 .get_version_digest(target_protocol_version)
5018 {
5019 if digest == target_digest {
5020 if let Some(index) = active_validators
5022 .iter()
5023 .position(|name| AuthorityName::from(name) == capability.authority)
5024 {
5025 eligible_validators.push(index as u64);
5026 }
5027 }
5028 }
5029 }
5030
5031 eligible_validators.sort();
5033 eligible_validators
5034 }
5035
5036 fn calculate_eligible_validators_weight(
5041 eligible_validator_indices: &[u64],
5042 active_validators: &[AuthorityPublicKey],
5043 committee: &Committee,
5044 ) -> u64 {
5045 let mut total_weight = 0u64;
5046
5047 for &index in eligible_validator_indices {
5048 let authority_pubkey = &active_validators[index as usize];
5049 if let Some((_, weight)) = committee
5051 .members()
5052 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
5053 {
5054 total_weight += weight;
5055 }
5056 }
5057
5058 total_weight
5059 }
5060
5061 #[instrument(level = "error", skip_all)]
5074 pub async fn create_and_execute_advance_epoch_tx(
5075 &self,
5076 epoch_store: &Arc<AuthorityPerEpochStore>,
5077 gas_cost_summary: &GasCostSummary,
5078 checkpoint: CheckpointSequenceNumber,
5079 epoch_start_timestamp_ms: CheckpointTimestamp,
5080 scores: Vec<u64>,
5081 ) -> anyhow::Result<(
5082 IotaSystemState,
5083 Option<SystemEpochInfoEvent>,
5084 TransactionEffects,
5085 )> {
5086 let mut txns = Vec::new();
5087
5088 let next_epoch = epoch_store.epoch() + 1;
5089
5090 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
5091 let authority_capabilities = epoch_store
5092 .get_capabilities_v1()
5093 .expect("read capabilities from db cannot fail");
5094 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
5095 Self::choose_protocol_version_and_system_packages_v1(
5096 epoch_store.protocol_version(),
5097 SupportedProtocolVersionsWithHashes::protocol_config_digest(
5098 epoch_store.protocol_config(),
5099 ),
5100 epoch_store.committee(),
5101 authority_capabilities.clone(),
5102 buffer_stake_bps,
5103 );
5104
5105 let config = epoch_store.protocol_config();
5109 let binary_config = to_binary_config(config);
5110 let Some(next_epoch_system_package_bytes) = self
5111 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
5112 .await
5113 else {
5114 error!(
5115 "upgraded system packages {:?} are not locally available, cannot create \
5116 ChangeEpochTx. validator binary must be upgraded to the correct version!",
5117 next_epoch_system_packages
5118 );
5119 bail!("missing system packages: cannot form ChangeEpochTx");
5129 };
5130
5131 if config.select_committee_from_eligible_validators() {
5134 let active_validators = epoch_store.epoch_start_state().get_active_validators();
5136
5137 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5138
5139 if config.select_committee_supporting_next_epoch_version() {
5143 eligible_active_validators = Self::get_validators_supporting_protocol_version(
5144 next_epoch_protocol_version,
5145 next_epoch_protocol_digest,
5146 &active_validators,
5147 &authority_capabilities,
5148 );
5149
5150 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5152 &eligible_active_validators,
5153 &active_validators,
5154 epoch_store.committee(),
5155 );
5156
5157 let committee = epoch_store.committee();
5161 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5162
5163 if eligible_validators_weight < effective_threshold {
5164 error!(
5165 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5166 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5167 );
5168 eligible_active_validators = (0..active_validators.len() as u64).collect();
5171 }
5172 }
5173
5174 if config.pass_validator_scores_to_advance_epoch() {
5177 txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5178 next_epoch,
5179 next_epoch_protocol_version.as_u64(),
5180 gas_cost_summary.storage_cost,
5181 gas_cost_summary.computation_cost,
5182 gas_cost_summary.computation_cost_burned,
5183 gas_cost_summary.storage_rebate,
5184 gas_cost_summary.non_refundable_storage_fee,
5185 epoch_start_timestamp_ms,
5186 next_epoch_system_package_bytes,
5187 eligible_active_validators,
5188 scores,
5189 config.adjust_rewards_by_score(),
5190 ));
5191 } else {
5192 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5193 next_epoch,
5194 next_epoch_protocol_version.as_u64(),
5195 gas_cost_summary.storage_cost,
5196 gas_cost_summary.computation_cost,
5197 gas_cost_summary.computation_cost_burned,
5198 gas_cost_summary.storage_rebate,
5199 gas_cost_summary.non_refundable_storage_fee,
5200 epoch_start_timestamp_ms,
5201 next_epoch_system_package_bytes,
5202 eligible_active_validators,
5203 ));
5204 }
5205 } else if config.protocol_defined_base_fee()
5206 && config.max_committee_members_count_as_option().is_some()
5207 {
5208 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5209 next_epoch,
5210 next_epoch_protocol_version.as_u64(),
5211 gas_cost_summary.storage_cost,
5212 gas_cost_summary.computation_cost,
5213 gas_cost_summary.computation_cost_burned,
5214 gas_cost_summary.storage_rebate,
5215 gas_cost_summary.non_refundable_storage_fee,
5216 epoch_start_timestamp_ms,
5217 next_epoch_system_package_bytes,
5218 ));
5219 } else {
5220 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5221 next_epoch,
5222 next_epoch_protocol_version.as_u64(),
5223 gas_cost_summary.storage_cost,
5224 gas_cost_summary.computation_cost,
5225 gas_cost_summary.storage_rebate,
5226 gas_cost_summary.non_refundable_storage_fee,
5227 epoch_start_timestamp_ms,
5228 next_epoch_system_package_bytes,
5229 ));
5230 }
5231
5232 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5233
5234 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5235 tx.clone(),
5236 epoch_store.epoch(),
5237 checkpoint,
5238 );
5239
5240 let tx_digest = executable_tx.digest();
5241
5242 info!(
5243 ?next_epoch,
5244 ?next_epoch_protocol_version,
5245 ?next_epoch_system_packages,
5246 computation_cost=?gas_cost_summary.computation_cost,
5247 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5248 storage_cost=?gas_cost_summary.storage_cost,
5249 storage_rebate=?gas_cost_summary.storage_rebate,
5250 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5251 ?tx_digest,
5252 "Creating advance epoch transaction"
5253 );
5254
5255 fail_point_async!("change_epoch_tx_delay");
5256 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5257
5258 if self
5262 .get_transaction_cache_reader()
5263 .try_is_tx_already_executed(tx_digest)?
5264 {
5265 warn!("change epoch tx has already been executed via state sync");
5266 bail!("change epoch tx has already been executed via state sync",);
5267 }
5268
5269 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5270
5271 epoch_store.assign_shared_object_versions_idempotent(
5275 self.get_object_cache_reader().as_ref(),
5276 std::slice::from_ref(&executable_tx),
5277 )?;
5278
5279 let (input_objects, _) =
5280 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5281
5282 let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5283 &execution_guard,
5284 &executable_tx,
5285 input_objects,
5286 vec![],
5287 epoch_store,
5288 )?;
5289 let system_obj = get_iota_system_state(&temporary_store.written)
5290 .expect("change epoch tx must write to system object");
5291 let system_epoch_info_event = temporary_store
5293 .events
5294 .data
5295 .into_iter()
5296 .find(|event| event.is_system_epoch_info_event())
5297 .map(SystemEpochInfoEvent::from);
5298 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5301
5302 self.get_state_sync_store()
5306 .try_insert_transaction_and_effects(&tx, &effects)
5307 .map_err(|err| {
5308 let err: anyhow::Error = err.into();
5309 err
5310 })?;
5311
5312 info!(
5313 "Effects summary of the change epoch transaction: {:?}",
5314 effects.summary_for_debug()
5315 );
5316 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5317 assert!(effects.status().is_success());
5319 Ok((system_obj, system_epoch_info_event, effects))
5320 }
5321
5322 #[instrument(level = "error", skip_all)]
5326 async fn revert_uncommitted_epoch_transactions(
5327 &self,
5328 epoch_store: &AuthorityPerEpochStore,
5329 ) -> IotaResult {
5330 {
5331 let state = epoch_store.get_reconfig_state_write_lock_guard();
5332 if state.should_accept_user_certs() {
5333 epoch_store.close_user_certs(state);
5342 }
5343 }
5345 let pending_certificates = epoch_store.pending_consensus_certificates();
5346 info!(
5347 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5348 pending_certificates.len(),
5349 pending_certificates,
5350 );
5351 for digest in pending_certificates {
5352 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5353 info!(
5354 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5355 digest
5356 );
5357 continue;
5358 }
5359 info!("Reverting {:?} at the end of epoch", digest);
5360 epoch_store.revert_executed_transaction(&digest)?;
5361 self.get_reconfig_api().try_revert_state_update(&digest)?;
5362 }
5363 info!("All uncommitted local transactions reverted");
5364 Ok(())
5365 }
5366
5367 #[instrument(level = "error", skip_all)]
5368 async fn reopen_epoch_db(
5369 &self,
5370 cur_epoch_store: &AuthorityPerEpochStore,
5371 new_committee: Committee,
5372 epoch_start_configuration: EpochStartConfiguration,
5373 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5374 epoch_last_checkpoint: CheckpointSequenceNumber,
5375 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5376 let new_epoch = new_committee.epoch;
5377 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5378 assert_eq!(
5379 epoch_start_configuration.epoch_start_state().epoch(),
5380 new_committee.epoch
5381 );
5382 fail_point!("before-open-new-epoch-store");
5383 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5384 self.name,
5385 new_committee,
5386 epoch_start_configuration,
5387 self.get_backing_package_store().clone(),
5388 expensive_safety_check_config,
5389 epoch_last_checkpoint,
5390 )?;
5391 self.epoch_store.store(new_epoch_store.clone());
5392 Ok(new_epoch_store)
5393 }
5394
5395 fn check_move_account(
5398 &self,
5399 auth_account_object_id: ObjectID,
5400 auth_account_object_seq_number: Option<SequenceNumber>,
5401 auth_account_object_digest: Option<ObjectDigest>,
5402 account_object: ObjectReadResult,
5403 signer: &IotaAddress,
5404 ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5405 let account_object = match account_object.object {
5406 ObjectReadResultKind::Object(object) => Ok(object),
5407 ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5408 Err(UserInputError::AccountObjectDeleted {
5409 account_id: account_object.id(),
5410 account_version: version,
5411 transaction_digest: digest,
5412 })
5413 }
5414 ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5417 Err(UserInputError::AccountObjectInCanceledTransaction {
5418 account_id: account_object.id(),
5419 account_version: version,
5420 })
5421 }
5422 }?;
5423
5424 let account_object_addr = IotaAddress::from(auth_account_object_id);
5425
5426 fp_ensure!(
5427 signer == &account_object_addr,
5428 UserInputError::IncorrectUserSignature {
5429 error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5430 }
5431 .into()
5432 );
5433
5434 fp_ensure!(
5435 account_object.is_shared() || account_object.is_immutable(),
5436 UserInputError::AccountObjectNotSupported {
5437 object_id: auth_account_object_id
5438 }
5439 .into()
5440 );
5441
5442 let auth_account_object_seq_number =
5443 if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5444 let account_object_version = account_object.version();
5445
5446 fp_ensure!(
5447 account_object_version == auth_account_object_seq_number,
5448 UserInputError::AccountObjectVersionMismatch {
5449 object_id: auth_account_object_id,
5450 expected_version: auth_account_object_seq_number,
5451 actual_version: account_object_version,
5452 }
5453 .into()
5454 );
5455
5456 auth_account_object_seq_number
5457 } else {
5458 account_object.version()
5459 };
5460
5461 if let Some(auth_account_object_digest) = auth_account_object_digest {
5462 let expected_digest = account_object.digest();
5463 fp_ensure!(
5464 expected_digest == auth_account_object_digest,
5465 UserInputError::InvalidAccountObjectDigest {
5466 object_id: auth_account_object_id,
5467 expected_digest,
5468 actual_digest: auth_account_object_digest,
5469 }
5470 .into()
5471 );
5472 }
5473
5474 let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5475 auth_account_object_id,
5476 &AuthenticatorFunctionRefV1Key::tag().into(),
5477 &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5478 )
5479 .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5480 account_object_id: auth_account_object_id,
5481 })?;
5482
5483 let authenticator_function_ref_field = self
5484 .get_object_cache_reader()
5485 .try_find_object_lt_or_eq_version(
5486 authenticator_function_ref_field_id,
5487 auth_account_object_seq_number,
5488 )?;
5489
5490 if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5491 let field_move_object = authenticator_function_ref_field_obj
5492 .data
5493 .as_struct_opt()
5494 .expect("dynamic field should never be a package object");
5495
5496 let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5497 field_move_object.to_rust().ok_or(
5498 UserInputError::InvalidAuthenticatorFunctionRefField {
5499 account_object_id: auth_account_object_id,
5500 },
5501 )?;
5502
5503 Ok(AuthenticatorFunctionRefForExecution::new_v1(
5504 field.value,
5505 authenticator_function_ref_field_obj.compute_object_reference(),
5506 authenticator_function_ref_field_obj.owner,
5507 authenticator_function_ref_field_obj.storage_rebate,
5508 authenticator_function_ref_field_obj.previous_transaction,
5509 ))
5510 } else {
5511 Err(UserInputError::MoveAuthenticatorNotFound {
5512 authenticator_function_ref_id: authenticator_function_ref_field_id,
5513 account_object_id: auth_account_object_id,
5514 account_object_version: auth_account_object_seq_number,
5515 }
5516 .into())
5517 }
5518 }
5519
5520 #[allow(clippy::type_complexity)]
5521 fn read_objects_for_signing(
5522 &self,
5523 transaction: &VerifiedTransaction,
5524 epoch: u64,
5525 ) -> IotaResult<(
5526 InputObjects,
5527 ReceivingObjects,
5528 Vec<(InputObjects, ObjectReadResult)>,
5529 )> {
5530 let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5531 Some(transaction.digest()),
5532 &transaction.collect_all_input_object_kind_for_reading()?,
5533 &transaction.data().transaction_data().receiving_objects(),
5534 epoch,
5535 )?;
5536
5537 transaction
5538 .split_input_objects_into_groups_for_reading(input_objects)
5539 .map(|(tx_input_objects, per_authenticator_inputs)| {
5540 (
5541 tx_input_objects,
5542 tx_receiving_objects,
5543 per_authenticator_inputs,
5544 )
5545 })
5546 }
5547
5548 #[allow(clippy::type_complexity)]
5549 fn check_transaction_inputs_for_signing(
5550 &self,
5551 protocol_config: &ProtocolConfig,
5552 reference_gas_price: u64,
5553 tx_data: &TransactionData,
5554 tx_input_objects: InputObjects,
5555 tx_receiving_objects: &ReceivingObjects,
5556 move_authenticators: &Vec<&MoveAuthenticator>,
5557 per_authenticator_inputs: Vec<(InputObjects, ObjectReadResult)>,
5558 ) -> IotaResult<(
5559 IotaGasStatus,
5560 CheckedInputObjects,
5561 Vec<(CheckedInputObjects, AuthenticatorFunctionRef)>,
5562 )> {
5563 let authenticator_gas_budget = if move_authenticators.is_empty() {
5564 0
5565 } else {
5566 protocol_config.max_auth_gas()
5569 };
5570
5571 debug_assert_eq!(
5572 move_authenticators.len(),
5573 per_authenticator_inputs.len(),
5574 "Move authenticators amount must match the number of authenticator inputs"
5575 );
5576
5577 let per_authenticator_checked_inputs = move_authenticators
5578 .iter()
5579 .zip(per_authenticator_inputs)
5580 .map(
5581 |(move_authenticator, (authenticator_input_objects, account_object))| {
5582 let (
5584 auth_account_object_id,
5585 auth_account_object_seq_number,
5586 auth_account_object_digest,
5587 ) = move_authenticator.object_to_authenticate_components()?;
5588
5589 let signer = move_authenticator.address()?;
5590
5591 let AuthenticatorFunctionRefForExecution {
5593 authenticator_function_ref,
5594 ..
5595 } = self.check_move_account(
5596 auth_account_object_id,
5597 auth_account_object_seq_number,
5598 auth_account_object_digest,
5599 account_object,
5600 &signer,
5601 )?;
5602
5603 let authenticator_checked_input_objects =
5605 iota_transaction_checks::check_move_authenticator_input_for_signing(
5606 authenticator_input_objects,
5607 )?;
5608
5609 Ok((
5610 authenticator_checked_input_objects,
5611 authenticator_function_ref,
5612 ))
5613 },
5614 )
5615 .collect::<IotaResult<Vec<_>>>()?;
5616
5617 let (gas_status, tx_checked_input_objects) =
5619 iota_transaction_checks::check_transaction_input(
5620 protocol_config,
5621 reference_gas_price,
5622 tx_data,
5623 tx_input_objects,
5624 tx_receiving_objects,
5625 &self.metrics.bytecode_verifier_metrics,
5626 &self.config.verifier_signing_config,
5627 authenticator_gas_budget,
5628 )?;
5629
5630 Ok((
5631 gas_status,
5632 tx_checked_input_objects,
5633 per_authenticator_checked_inputs,
5634 ))
5635 }
5636
5637 #[cfg(test)]
5638 pub(crate) fn iter_live_object_set_for_testing(
5639 &self,
5640 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5641 self.get_global_state_hash_store()
5642 .iter_cached_live_object_set_for_testing()
5643 }
5644
5645 #[cfg(test)]
5646 pub(crate) fn shutdown_execution_for_test(&self) {
5647 self.tx_execution_shutdown
5648 .lock()
5649 .take()
5650 .unwrap()
5651 .send(())
5652 .unwrap();
5653 }
5654
5655 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5658 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5659 self.get_object_cache_reader()
5660 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5661 self.get_reconfig_api()
5662 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5663 }
5664}
5665
5666pub struct RandomnessRoundReceiver {
5667 authority_state: Arc<AuthorityState>,
5668 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5669}
5670
5671impl RandomnessRoundReceiver {
5672 pub fn spawn(
5673 authority_state: Arc<AuthorityState>,
5674 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5675 ) -> JoinHandle<()> {
5676 let rrr = RandomnessRoundReceiver {
5677 authority_state,
5678 randomness_rx,
5679 };
5680 spawn_monitored_task!(rrr.run())
5681 }
5682
5683 async fn run(mut self) {
5684 info!("RandomnessRoundReceiver event loop started");
5685
5686 loop {
5687 tokio::select! {
5688 maybe_recv = self.randomness_rx.recv() => {
5689 if let Some((epoch, round, bytes)) = maybe_recv {
5690 self.handle_new_randomness(epoch, round, bytes);
5691 } else {
5692 break;
5693 }
5694 },
5695 }
5696 }
5697
5698 info!("RandomnessRoundReceiver event loop ended");
5699 }
5700
5701 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5702 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5703 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5704 if epoch_store.epoch() != epoch {
5705 warn!(
5706 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5707 epoch_store.epoch()
5708 );
5709 return;
5710 }
5711 let transaction = VerifiedTransaction::new_randomness_state_update(
5712 epoch,
5713 round,
5714 bytes,
5715 epoch_store
5716 .epoch_start_config()
5717 .randomness_obj_initial_shared_version(),
5718 );
5719 debug!(
5720 "created randomness state update transaction with digest: {:?}",
5721 transaction.digest()
5722 );
5723 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5724 let digest = *transaction.digest();
5725
5726 self.authority_state
5731 .get_cache_commit()
5732 .persist_transaction(&transaction);
5733
5734 self.authority_state
5736 .transaction_manager()
5737 .enqueue(vec![transaction], &epoch_store);
5738
5739 let authority_state = self.authority_state.clone();
5740 spawn_monitored_task!(async move {
5741 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5750 let result = tokio::time::timeout(
5751 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5752 authority_state
5753 .get_transaction_cache_reader()
5754 .try_notify_read_executed_effects(&[digest]),
5755 )
5756 .await;
5757 let result = match result {
5758 Ok(result) => result,
5759 Err(_) => {
5760 if cfg!(debug_assertions) {
5761 panic!(
5763 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5764 );
5765 }
5766 warn!(
5767 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5768 );
5769 authority_state
5771 .get_transaction_cache_reader()
5772 .try_notify_read_executed_effects(&[digest])
5773 .await
5774 }
5775 };
5776
5777 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5778 let effects = effects.pop().expect("should return effects");
5779 if *effects.status() != ExecutionStatus::Success {
5780 fatal!(
5781 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5782 );
5783 }
5784 debug!(
5785 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5786 );
5787 });
5788 }
5789}
5790
5791#[async_trait]
5792impl TransactionKeyValueStoreTrait for AuthorityState {
5793 async fn multi_get(
5794 &self,
5795 transaction_keys: &[TransactionDigest],
5796 effects_keys: &[TransactionDigest],
5797 ) -> IotaResult<KVStoreTransactionData> {
5798 let txns = if !transaction_keys.is_empty() {
5799 self.get_transaction_cache_reader()
5800 .try_multi_get_transaction_blocks(transaction_keys)?
5801 .into_iter()
5802 .map(|t| t.map(|t| (*t).clone().into_inner()))
5803 .collect()
5804 } else {
5805 vec![]
5806 };
5807
5808 let fx = if !effects_keys.is_empty() {
5809 self.get_transaction_cache_reader()
5810 .try_multi_get_executed_effects(effects_keys)?
5811 } else {
5812 vec![]
5813 };
5814
5815 Ok((txns, fx))
5816 }
5817
5818 async fn multi_get_checkpoints(
5819 &self,
5820 checkpoint_summaries: &[CheckpointSequenceNumber],
5821 checkpoint_contents: &[CheckpointSequenceNumber],
5822 checkpoint_summaries_by_digest: &[CheckpointDigest],
5823 ) -> IotaResult<(
5824 Vec<Option<CertifiedCheckpointSummary>>,
5825 Vec<Option<CheckpointContents>>,
5826 Vec<Option<CertifiedCheckpointSummary>>,
5827 )> {
5828 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5830 let store = self.get_checkpoint_store();
5831 for seq in checkpoint_summaries {
5832 let checkpoint = store
5833 .get_checkpoint_by_sequence_number(*seq)?
5834 .map(|c| c.into_inner());
5835
5836 summaries.push(checkpoint);
5837 }
5838
5839 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5840 for seq in checkpoint_contents {
5841 let checkpoint = store
5842 .get_checkpoint_by_sequence_number(*seq)?
5843 .and_then(|summary| {
5844 store
5845 .get_checkpoint_contents(&summary.content_digest)
5846 .expect("db read cannot fail")
5847 });
5848 contents.push(checkpoint);
5849 }
5850
5851 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5852 for digest in checkpoint_summaries_by_digest {
5853 let checkpoint = store
5854 .get_checkpoint_by_digest(digest)?
5855 .map(|c| c.into_inner());
5856 summaries_by_digest.push(checkpoint);
5857 }
5858
5859 Ok((summaries, contents, summaries_by_digest))
5860 }
5861
5862 async fn get_transaction_perpetual_checkpoint(
5863 &self,
5864 digest: TransactionDigest,
5865 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5866 self.get_checkpoint_cache()
5867 .try_get_transaction_perpetual_checkpoint(&digest)
5868 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5869 }
5870
5871 async fn get_object(
5872 &self,
5873 object_id: ObjectID,
5874 version: VersionNumber,
5875 ) -> IotaResult<Option<Object>> {
5876 self.get_object_cache_reader()
5877 .try_get_object_by_key(&object_id, version)
5878 }
5879
5880 #[instrument(skip_all)]
5881 async fn multi_get_objects(
5882 &self,
5883 object_keys: &[ObjectKey],
5884 ) -> IotaResult<Vec<Option<Object>>> {
5885 Ok(self
5886 .get_object_cache_reader()
5887 .multi_get_objects_by_key(object_keys))
5888 }
5889
5890 async fn multi_get_transactions_perpetual_checkpoints(
5891 &self,
5892 digests: &[TransactionDigest],
5893 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5894 let res = self
5895 .get_checkpoint_cache()
5896 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5897
5898 Ok(res
5899 .into_iter()
5900 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5901 .collect())
5902 }
5903
5904 #[instrument(skip(self, digests), fields(digests = digests.iter().map(|d| d.to_string()).collect::<Vec<String>>().join(", ")))]
5905 async fn multi_get_events_by_tx_digests(
5906 &self,
5907 digests: &[TransactionDigest],
5908 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5909 if digests.is_empty() {
5910 return Ok(vec![]);
5911 }
5912
5913 Ok(self
5914 .get_transaction_cache_reader()
5915 .multi_get_events(digests))
5916 }
5917}
5918
5919#[cfg(msim)]
5920pub mod framework_injection {
5921 use std::{
5922 cell::RefCell,
5923 collections::{BTreeMap, BTreeSet},
5924 };
5925
5926 use iota_framework::{BuiltInFramework, SystemPackage};
5927 use iota_types::base_types::{AuthorityName, ObjectID};
5928 use move_binary_format::CompiledModule;
5929
5930 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5931
5932 thread_local! {
5934 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5935 }
5936
5937 type Framework = Vec<CompiledModule>;
5938
5939 pub type PackageUpgradeCallback =
5940 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5941
5942 enum PackageOverrideConfig {
5943 Global(Framework),
5944 PerValidator(PackageUpgradeCallback),
5945 }
5946
5947 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5948 modules
5949 .iter()
5950 .map(|m| {
5951 let mut buf = Vec::new();
5952 m.serialize_with_version(m.version, &mut buf).unwrap();
5953 buf
5954 })
5955 .collect()
5956 }
5957
5958 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5959 OVERRIDE.with(|bs| {
5960 bs.borrow_mut()
5961 .insert(package_id, PackageOverrideConfig::Global(modules))
5962 });
5963 }
5964
5965 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5966 OVERRIDE.with(|bs| {
5967 bs.borrow_mut()
5968 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5969 });
5970 }
5971
5972 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5973 OVERRIDE.with(|cfg| {
5974 cfg.borrow().get(package_id).and_then(|entry| match entry {
5975 PackageOverrideConfig::Global(framework) => {
5976 Some(compiled_modules_to_bytes(framework))
5977 }
5978 PackageOverrideConfig::PerValidator(func) => {
5979 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5980 }
5981 })
5982 })
5983 }
5984
5985 pub fn get_override_modules(
5986 package_id: &ObjectID,
5987 name: AuthorityName,
5988 ) -> Option<Vec<CompiledModule>> {
5989 OVERRIDE.with(|cfg| {
5990 cfg.borrow().get(package_id).and_then(|entry| match entry {
5991 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5992 PackageOverrideConfig::PerValidator(func) => func(name),
5993 })
5994 })
5995 }
5996
5997 pub fn get_override_system_package(
5998 package_id: &ObjectID,
5999 name: AuthorityName,
6000 ) -> Option<SystemPackage> {
6001 let bytes = get_override_bytes(package_id, name)?;
6002 let dependencies = if package_id.is_system_package() {
6003 BuiltInFramework::get_package_by_id(package_id)
6004 .dependencies
6005 .to_vec()
6006 } else {
6007 BuiltInFramework::all_package_ids()
6010 };
6011 Some(SystemPackage {
6012 id: *package_id,
6013 bytes,
6014 dependencies,
6015 })
6016 }
6017
6018 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
6019 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
6020 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
6021 cfg.borrow()
6022 .keys()
6023 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
6024 .collect()
6025 });
6026
6027 extra
6028 .into_iter()
6029 .map(|package| SystemPackage {
6030 id: package,
6031 bytes: get_override_bytes(&package, name).unwrap(),
6032 dependencies: BuiltInFramework::all_package_ids(),
6033 })
6034 .collect()
6035 }
6036}
6037
6038#[derive(Debug, Serialize, Deserialize, Clone)]
6039pub struct ObjDumpFormat {
6040 pub id: ObjectID,
6041 pub version: VersionNumber,
6042 pub digest: ObjectDigest,
6043 pub object: Object,
6044}
6045
6046impl ObjDumpFormat {
6047 fn new(object: Object) -> Self {
6048 let oref = object.compute_object_reference();
6049 Self {
6050 id: oref.object_id,
6051 version: oref.version,
6052 digest: oref.digest,
6053 object,
6054 }
6055 }
6056}
6057
6058#[derive(Debug, Serialize, Deserialize, Clone)]
6059pub struct NodeStateDump {
6060 pub tx_digest: TransactionDigest,
6061 pub sender_signed_data: SenderSignedData,
6062 pub executed_epoch: u64,
6063 pub reference_gas_price: u64,
6064 pub protocol_version: u64,
6065 pub epoch_start_timestamp_ms: u64,
6066 pub computed_effects: TransactionEffects,
6067 pub expected_effects_digest: TransactionEffectsDigest,
6068 pub relevant_system_packages: Vec<ObjDumpFormat>,
6069 pub shared_objects: Vec<ObjDumpFormat>,
6070 pub loaded_child_objects: Vec<ObjDumpFormat>,
6071 pub modified_at_versions: Vec<ObjDumpFormat>,
6072 pub runtime_reads: Vec<ObjDumpFormat>,
6073 pub input_objects: Vec<ObjDumpFormat>,
6074}
6075
6076impl NodeStateDump {
6077 pub fn new(
6078 tx_digest: &TransactionDigest,
6079 effects: &TransactionEffects,
6080 expected_effects_digest: TransactionEffectsDigest,
6081 object_store: &dyn ObjectStore,
6082 epoch_store: &Arc<AuthorityPerEpochStore>,
6083 inner_temporary_store: &InnerTemporaryStore,
6084 certificate: &VerifiedExecutableTransaction,
6085 ) -> IotaResult<Self> {
6086 let executed_epoch = epoch_store.epoch();
6088 let reference_gas_price = epoch_store.reference_gas_price();
6089 let epoch_start_config = epoch_store.epoch_start_config();
6090 let protocol_version = epoch_store.protocol_version().as_u64();
6091 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
6092
6093 let mut relevant_system_packages = Vec::new();
6095 for sys_package_id in BuiltInFramework::all_package_ids() {
6096 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
6097 relevant_system_packages.push(ObjDumpFormat::new(w))
6098 }
6099 }
6100
6101 let mut shared_objects = Vec::new();
6103 for kind in effects.input_shared_objects() {
6104 match kind {
6105 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
6106 if let Some(w) =
6107 object_store.try_get_object_by_key(&obj_ref.object_id, obj_ref.version)?
6108 {
6109 shared_objects.push(ObjDumpFormat::new(w))
6110 }
6111 }
6112 InputSharedObject::ReadDeleted(..)
6113 | InputSharedObject::MutateDeleted(..)
6114 | InputSharedObject::Cancelled(..) => (), }
6117 }
6118
6119 let mut loaded_child_objects = Vec::new();
6122 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
6123 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
6124 loaded_child_objects.push(ObjDumpFormat::new(w))
6125 }
6126 }
6127
6128 let mut modified_at_versions = Vec::new();
6130 for (id, ver) in effects.modified_at_versions() {
6131 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
6132 modified_at_versions.push(ObjDumpFormat::new(w))
6133 }
6134 }
6135
6136 let mut runtime_reads = Vec::new();
6140 for obj in inner_temporary_store
6141 .runtime_packages_loaded_from_db
6142 .values()
6143 {
6144 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6145 }
6146
6147 Ok(Self {
6150 tx_digest: *tx_digest,
6151 executed_epoch,
6152 reference_gas_price,
6153 epoch_start_timestamp_ms,
6154 protocol_version,
6155 relevant_system_packages,
6156 shared_objects,
6157 loaded_child_objects,
6158 modified_at_versions,
6159 runtime_reads,
6160 sender_signed_data: certificate.clone().into_message(),
6161 input_objects: inner_temporary_store
6162 .input_objects
6163 .values()
6164 .map(|o| ObjDumpFormat::new(o.clone()))
6165 .collect(),
6166 computed_effects: effects.clone(),
6167 expected_effects_digest,
6168 })
6169 }
6170
6171 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6172 let mut objects = Vec::new();
6173 objects.extend(self.relevant_system_packages.clone());
6174 objects.extend(self.shared_objects.clone());
6175 objects.extend(self.loaded_child_objects.clone());
6176 objects.extend(self.modified_at_versions.clone());
6177 objects.extend(self.runtime_reads.clone());
6178 objects.extend(self.input_objects.clone());
6179 objects
6180 }
6181
6182 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6183 let file_name = format!(
6184 "{}_{}_NODE_DUMP.json",
6185 self.tx_digest,
6186 AuthorityState::unixtime_now_ms()
6187 );
6188 let mut path = path.to_path_buf();
6189 path.push(&file_name);
6190 let mut file = File::create(path.clone())?;
6191 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6192 Ok(path)
6193 }
6194
6195 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6196 let file = File::open(path)?;
6197 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6198 }
6199}
6200
6201fn pre_consensus_move_authenticators<'a>(
6213 tx: &'a VerifiedTransaction,
6214 protocol_config: &ProtocolConfig,
6215) -> Vec<&'a MoveAuthenticator> {
6216 if protocol_config.pre_consensus_sponsor_only_move_authentication() {
6217 if tx.transaction_data().is_sponsored_tx() {
6218 if let Some(sponsor_move_authenticator) = tx.sponsor_move_authenticator() {
6219 vec![sponsor_move_authenticator]
6220 } else {
6221 vec![]
6222 }
6223 } else {
6224 tx.move_authenticators()
6225 }
6226 } else {
6227 tx.move_authenticators()
6228 }
6229}