1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_storage::{
48 key_value_store::{
49 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
50 },
51 key_value_store_metrics::KeyValueStoreMetrics,
52};
53#[cfg(msim)]
54use iota_types::committee::CommitteeTrait;
55use iota_types::{
56 IOTA_SYSTEM_ADDRESS, TypeTag,
57 authenticator_state::get_authenticator_state,
58 base_types::*,
59 committee::{Committee, EpochId, ProtocolVersion},
60 crypto::{AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer, default_hash},
61 deny_list_v1::check_coin_deny_list_v1_during_signing,
62 digests::{ChainIdentifier, TransactionEventsDigest},
63 dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
64 effects::{
65 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
66 TransactionEvents, VerifiedCertifiedTransactionEffects, VerifiedSignedTransactionEffects,
67 },
68 error::{ExecutionError, IotaError, IotaResult, UserInputError},
69 event::{Event, EventID, SystemEpochInfoEvent},
70 executable_transaction::VerifiedExecutableTransaction,
71 execution_config_utils::to_binary_config,
72 execution_status::ExecutionStatus,
73 fp_ensure,
74 gas::{GasCostSummary, IotaGasStatus},
75 gas_coin::NANOS_PER_IOTA,
76 inner_temporary_store::{
77 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
78 WrittenObjects,
79 },
80 iota_system_state::{
81 IotaSystemState, IotaSystemStateTrait,
82 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
83 },
84 is_system_package,
85 layout_resolver::{LayoutResolver, into_struct_layout},
86 message_envelope::Message,
87 messages_checkpoint::{
88 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
89 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
90 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
91 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
92 },
93 messages_consensus::AuthorityCapabilitiesV1,
94 messages_grpc::{
95 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
96 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
97 TransactionStatus,
98 },
99 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
100 object::{
101 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
102 bounded_visitor::BoundedVisitor,
103 },
104 storage::{
105 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
106 },
107 supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions},
108 transaction::*,
109 transaction_executor::SimulateTransactionResult,
110};
111use itertools::Itertools;
112use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
113use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
114use parking_lot::Mutex;
115use prometheus::{
116 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
117 register_histogram_vec_with_registry, register_histogram_with_registry,
118 register_int_counter_vec_with_registry, register_int_counter_with_registry,
119 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
120};
121use serde::{Deserialize, Serialize, de::DeserializeOwned};
122use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
123use tap::TapFallible;
124use tokio::{
125 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
126 task::JoinHandle,
127};
128use tracing::{Instrument, debug, error, info, instrument, trace, warn};
129use typed_store::TypedStoreError;
130
131use self::{
132 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
133};
134#[cfg(msim)]
135pub use crate::checkpoints::checkpoint_executor::utils::{
136 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
137};
138use crate::{
139 authority::{
140 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
141 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
142 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
143 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
144 authority_store_tables::AuthorityPrunerTables,
145 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
146 },
147 authority_client::NetworkAuthorityClient,
148 checkpoints::CheckpointStore,
149 congestion_tracker::CongestionTracker,
150 consensus_adapter::ConsensusAdapter,
151 epoch::committee_store::CommitteeStore,
152 execution_cache::{
153 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
154 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
155 TransactionCacheRead,
156 },
157 execution_driver::execution_process,
158 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
159 metrics::{LatencyObserver, RateTracker},
160 module_cache_metrics::ResolverMetrics,
161 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
162 rest_index::RestIndexStore,
163 stake_aggregator::StakeAggregator,
164 state_accumulator::{AccumulatorStore, StateAccumulator},
165 subscription_handler::SubscriptionHandler,
166 transaction_input_loader::TransactionInputLoader,
167 transaction_manager::TransactionManager,
168 transaction_outputs::TransactionOutputs,
169 validator_tx_finalizer::ValidatorTxFinalizer,
170 verify_indexes::verify_indexes,
171};
172
173#[cfg(test)]
174#[path = "unit_tests/authority_tests.rs"]
175pub mod authority_tests;
176
177#[cfg(test)]
178#[path = "unit_tests/transaction_tests.rs"]
179pub mod transaction_tests;
180
181#[cfg(test)]
182#[path = "unit_tests/batch_transaction_tests.rs"]
183mod batch_transaction_tests;
184
185#[cfg(test)]
186#[path = "unit_tests/move_integration_tests.rs"]
187pub mod move_integration_tests;
188
189#[cfg(test)]
190#[path = "unit_tests/gas_tests.rs"]
191mod gas_tests;
192
193#[cfg(test)]
194#[path = "unit_tests/batch_verification_tests.rs"]
195mod batch_verification_tests;
196
197#[cfg(test)]
198#[path = "unit_tests/coin_deny_list_tests.rs"]
199mod coin_deny_list_tests;
200
201#[cfg(test)]
202#[path = "unit_tests/auth_unit_test_utils.rs"]
203pub mod auth_unit_test_utils;
204
205pub mod authority_test_utils;
206
207pub mod authority_per_epoch_store;
208pub mod authority_per_epoch_store_pruner;
209
210pub mod authority_store_pruner;
211pub mod authority_store_tables;
212pub mod authority_store_types;
213pub mod epoch_start_configuration;
214pub mod shared_object_congestion_tracker;
215pub mod shared_object_version_manager;
216pub mod suggested_gas_price_calculator;
217pub mod test_authority_builder;
218pub mod transaction_deferral;
219
220pub(crate) mod authority_store;
221pub mod backpressure;
222
223pub struct AuthorityMetrics {
225 tx_orders: IntCounter,
226 total_certs: IntCounter,
227 total_cert_attempts: IntCounter,
228 total_effects: IntCounter,
229 pub shared_obj_tx: IntCounter,
230 sponsored_tx: IntCounter,
231 tx_already_processed: IntCounter,
232 num_input_objs: Histogram,
233 num_shared_objects: Histogram,
234 batch_size: Histogram,
235
236 authority_state_handle_transaction_latency: Histogram,
237
238 execute_certificate_latency_single_writer: Histogram,
239 execute_certificate_latency_shared_object: Histogram,
240
241 execute_certificate_with_effects_latency: Histogram,
242 internal_execution_latency: Histogram,
243 execution_load_input_objects_latency: Histogram,
244 prepare_certificate_latency: Histogram,
245 commit_certificate_latency: Histogram,
246 db_checkpoint_latency: Histogram,
247
248 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
249 pub(crate) transaction_manager_num_missing_objects: IntGauge,
250 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
251 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
252 pub(crate) transaction_manager_num_ready: IntGauge,
253 pub(crate) transaction_manager_object_cache_size: IntGauge,
254 pub(crate) transaction_manager_object_cache_hits: IntCounter,
255 pub(crate) transaction_manager_object_cache_misses: IntCounter,
256 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
257 pub(crate) transaction_manager_package_cache_size: IntGauge,
258 pub(crate) transaction_manager_package_cache_hits: IntCounter,
259 pub(crate) transaction_manager_package_cache_misses: IntCounter,
260 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
261 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
262
263 pub(crate) execution_driver_executed_transactions: IntCounter,
264 pub(crate) execution_driver_dispatch_queue: IntGauge,
265 pub(crate) execution_queueing_delay_s: Histogram,
266 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
267 pub(crate) execution_gas_latency_ratio: Histogram,
268
269 pub(crate) skipped_consensus_txns: IntCounter,
270 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
271
272 pub(crate) authority_overload_status: IntGauge,
273 pub(crate) authority_load_shedding_percentage: IntGauge,
274
275 pub(crate) transaction_overload_sources: IntCounterVec,
276
277 post_processing_total_events_emitted: IntCounter,
279 post_processing_total_tx_indexed: IntCounter,
280 post_processing_total_tx_had_event_processed: IntCounter,
281 post_processing_total_failures: IntCounter,
282
283 pub consensus_handler_processed: IntCounterVec,
285 pub consensus_handler_transaction_sizes: HistogramVec,
286 pub consensus_handler_num_low_scoring_authorities: IntGauge,
287 pub consensus_handler_scores: IntGaugeVec,
288 pub consensus_handler_deferred_transactions: IntCounter,
289 pub consensus_handler_congested_transactions: IntCounter,
290 pub consensus_handler_cancelled_transactions: IntCounter,
291 pub consensus_handler_max_object_costs: IntGaugeVec,
292 pub consensus_committed_subdags: IntCounterVec,
293 pub consensus_committed_messages: IntGaugeVec,
294 pub consensus_committed_user_transactions: IntGaugeVec,
295 pub consensus_calculated_throughput: IntGauge,
296 pub consensus_calculated_throughput_profile: IntGauge,
297
298 pub limits_metrics: Arc<LimitsMetrics>,
299
300 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
302
303 pub zklogin_sig_count: IntCounter,
305 pub multisig_sig_count: IntCounter,
307
308 pub execution_queueing_latency: LatencyObserver,
311
312 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
319
320 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
323}
324
325const POSITIVE_INT_BUCKETS: &[f64] = &[
327 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
328 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
329 7000000., 10000000.,
330];
331
332const LATENCY_SEC_BUCKETS: &[f64] = &[
333 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
334 10., 20., 30., 60., 90.,
335];
336
337const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
339 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
340 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
341];
342
343const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
344 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
345 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
346];
347
348pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
352 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
353 let execute_certificate_latency = register_histogram_vec_with_registry!(
354 "authority_state_execute_certificate_latency",
355 "Latency of executing certificates, including waiting for inputs",
356 &["tx_type"],
357 LATENCY_SEC_BUCKETS.to_vec(),
358 registry,
359 )
360 .unwrap();
361
362 let execute_certificate_latency_single_writer =
363 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
364 let execute_certificate_latency_shared_object =
365 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
366
367 Self {
368 tx_orders: register_int_counter_with_registry!(
369 "total_transaction_orders",
370 "Total number of transaction orders",
371 registry,
372 )
373 .unwrap(),
374 total_certs: register_int_counter_with_registry!(
375 "total_transaction_certificates",
376 "Total number of transaction certificates handled",
377 registry,
378 )
379 .unwrap(),
380 total_cert_attempts: register_int_counter_with_registry!(
381 "total_handle_certificate_attempts",
382 "Number of calls to handle_certificate",
383 registry,
384 )
385 .unwrap(),
386 total_effects: register_int_counter_with_registry!(
388 "total_transaction_effects",
389 "Total number of transaction effects produced",
390 registry,
391 )
392 .unwrap(),
393
394 shared_obj_tx: register_int_counter_with_registry!(
395 "num_shared_obj_tx",
396 "Number of transactions involving shared objects",
397 registry,
398 )
399 .unwrap(),
400
401 sponsored_tx: register_int_counter_with_registry!(
402 "num_sponsored_tx",
403 "Number of sponsored transactions",
404 registry,
405 )
406 .unwrap(),
407
408 tx_already_processed: register_int_counter_with_registry!(
409 "num_tx_already_processed",
410 "Number of transaction orders already processed previously",
411 registry,
412 )
413 .unwrap(),
414 num_input_objs: register_histogram_with_registry!(
415 "num_input_objects",
416 "Distribution of number of input TX objects per TX",
417 POSITIVE_INT_BUCKETS.to_vec(),
418 registry,
419 )
420 .unwrap(),
421 num_shared_objects: register_histogram_with_registry!(
422 "num_shared_objects",
423 "Number of shared input objects per TX",
424 POSITIVE_INT_BUCKETS.to_vec(),
425 registry,
426 )
427 .unwrap(),
428 batch_size: register_histogram_with_registry!(
429 "batch_size",
430 "Distribution of size of transaction batch",
431 POSITIVE_INT_BUCKETS.to_vec(),
432 registry,
433 )
434 .unwrap(),
435 authority_state_handle_transaction_latency: register_histogram_with_registry!(
436 "authority_state_handle_transaction_latency",
437 "Latency of handling transactions",
438 LATENCY_SEC_BUCKETS.to_vec(),
439 registry,
440 )
441 .unwrap(),
442 execute_certificate_latency_single_writer,
443 execute_certificate_latency_shared_object,
444 execute_certificate_with_effects_latency: register_histogram_with_registry!(
445 "authority_state_execute_certificate_with_effects_latency",
446 "Latency of executing certificates with effects, including waiting for inputs",
447 LATENCY_SEC_BUCKETS.to_vec(),
448 registry,
449 )
450 .unwrap(),
451 internal_execution_latency: register_histogram_with_registry!(
452 "authority_state_internal_execution_latency",
453 "Latency of actual certificate executions",
454 LATENCY_SEC_BUCKETS.to_vec(),
455 registry,
456 )
457 .unwrap(),
458 execution_load_input_objects_latency: register_histogram_with_registry!(
459 "authority_state_execution_load_input_objects_latency",
460 "Latency of loading input objects for execution",
461 LOW_LATENCY_SEC_BUCKETS.to_vec(),
462 registry,
463 )
464 .unwrap(),
465 prepare_certificate_latency: register_histogram_with_registry!(
466 "authority_state_prepare_certificate_latency",
467 "Latency of executing certificates, before committing the results",
468 LATENCY_SEC_BUCKETS.to_vec(),
469 registry,
470 )
471 .unwrap(),
472 commit_certificate_latency: register_histogram_with_registry!(
473 "authority_state_commit_certificate_latency",
474 "Latency of committing certificate execution results",
475 LATENCY_SEC_BUCKETS.to_vec(),
476 registry,
477 )
478 .unwrap(),
479 db_checkpoint_latency: register_histogram_with_registry!(
480 "db_checkpoint_latency",
481 "Latency of checkpointing dbs",
482 LATENCY_SEC_BUCKETS.to_vec(),
483 registry,
484 ).unwrap(),
485 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
486 "transaction_manager_num_enqueued_certificates",
487 "Current number of certificates enqueued to TransactionManager",
488 &["result"],
489 registry,
490 )
491 .unwrap(),
492 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
493 "transaction_manager_num_missing_objects",
494 "Current number of missing objects in TransactionManager",
495 registry,
496 )
497 .unwrap(),
498 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
499 "transaction_manager_num_pending_certificates",
500 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
501 registry,
502 )
503 .unwrap(),
504 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
505 "transaction_manager_num_executing_certificates",
506 "Number of executing certificates, including queued and actually running certificates",
507 registry,
508 )
509 .unwrap(),
510 transaction_manager_num_ready: register_int_gauge_with_registry!(
511 "transaction_manager_num_ready",
512 "Number of ready transactions in TransactionManager",
513 registry,
514 )
515 .unwrap(),
516 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
517 "transaction_manager_object_cache_size",
518 "Current size of object-availability cache in TransactionManager",
519 registry,
520 )
521 .unwrap(),
522 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
523 "transaction_manager_object_cache_hits",
524 "Number of object-availability cache hits in TransactionManager",
525 registry,
526 )
527 .unwrap(),
528 authority_overload_status: register_int_gauge_with_registry!(
529 "authority_overload_status",
530 "Whether authority is current experiencing overload and enters load shedding mode.",
531 registry)
532 .unwrap(),
533 authority_load_shedding_percentage: register_int_gauge_with_registry!(
534 "authority_load_shedding_percentage",
535 "The percentage of transactions is shed when the authority is in load shedding mode.",
536 registry)
537 .unwrap(),
538 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
539 "transaction_manager_object_cache_misses",
540 "Number of object-availability cache misses in TransactionManager",
541 registry,
542 )
543 .unwrap(),
544 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
545 "transaction_manager_object_cache_evictions",
546 "Number of object-availability cache evictions in TransactionManager",
547 registry,
548 )
549 .unwrap(),
550 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
551 "transaction_manager_package_cache_size",
552 "Current size of package-availability cache in TransactionManager",
553 registry,
554 )
555 .unwrap(),
556 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
557 "transaction_manager_package_cache_hits",
558 "Number of package-availability cache hits in TransactionManager",
559 registry,
560 )
561 .unwrap(),
562 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
563 "transaction_manager_package_cache_misses",
564 "Number of package-availability cache misses in TransactionManager",
565 registry,
566 )
567 .unwrap(),
568 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
569 "transaction_manager_package_cache_evictions",
570 "Number of package-availability cache evictions in TransactionManager",
571 registry,
572 )
573 .unwrap(),
574 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
575 "transaction_manager_transaction_queue_age_s",
576 "Time spent in waiting for transaction in the queue",
577 LATENCY_SEC_BUCKETS.to_vec(),
578 registry,
579 )
580 .unwrap(),
581 transaction_overload_sources: register_int_counter_vec_with_registry!(
582 "transaction_overload_sources",
583 "Number of times each source indicates transaction overload.",
584 &["source"],
585 registry)
586 .unwrap(),
587 execution_driver_executed_transactions: register_int_counter_with_registry!(
588 "execution_driver_executed_transactions",
589 "Cumulative number of transaction executed by execution driver",
590 registry,
591 )
592 .unwrap(),
593 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
594 "execution_driver_dispatch_queue",
595 "Number of transaction pending in execution driver dispatch queue",
596 registry,
597 )
598 .unwrap(),
599 execution_queueing_delay_s: register_histogram_with_registry!(
600 "execution_queueing_delay_s",
601 "Queueing delay between a transaction is ready for execution until it starts executing.",
602 LATENCY_SEC_BUCKETS.to_vec(),
603 registry
604 )
605 .unwrap(),
606 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
607 "prepare_cert_gas_latency_ratio",
608 "The ratio of computation gas divided by VM execution latency.",
609 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
610 registry
611 )
612 .unwrap(),
613 execution_gas_latency_ratio: register_histogram_with_registry!(
614 "execution_gas_latency_ratio",
615 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
616 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
617 registry
618 )
619 .unwrap(),
620 skipped_consensus_txns: register_int_counter_with_registry!(
621 "skipped_consensus_txns",
622 "Total number of consensus transactions skipped",
623 registry,
624 )
625 .unwrap(),
626 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
627 "skipped_consensus_txns_cache_hit",
628 "Total number of consensus transactions skipped because of local cache hit",
629 registry,
630 )
631 .unwrap(),
632 post_processing_total_events_emitted: register_int_counter_with_registry!(
633 "post_processing_total_events_emitted",
634 "Total number of events emitted in post processing",
635 registry,
636 )
637 .unwrap(),
638 post_processing_total_tx_indexed: register_int_counter_with_registry!(
639 "post_processing_total_tx_indexed",
640 "Total number of txes indexed in post processing",
641 registry,
642 )
643 .unwrap(),
644 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
645 "post_processing_total_tx_had_event_processed",
646 "Total number of txes finished event processing in post processing",
647 registry,
648 )
649 .unwrap(),
650 post_processing_total_failures: register_int_counter_with_registry!(
651 "post_processing_total_failures",
652 "Total number of failure in post processing",
653 registry,
654 )
655 .unwrap(),
656 consensus_handler_processed: register_int_counter_vec_with_registry!(
657 "consensus_handler_processed",
658 "Number of transactions processed by consensus handler",
659 &["class"],
660 registry
661 ).unwrap(),
662 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
663 "consensus_handler_transaction_sizes",
664 "Sizes of each type of transactions processed by consensus handler",
665 &["class"],
666 POSITIVE_INT_BUCKETS.to_vec(),
667 registry
668 ).unwrap(),
669 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
670 "consensus_handler_num_low_scoring_authorities",
671 "Number of low scoring authorities based on reputation scores from consensus",
672 registry
673 ).unwrap(),
674 consensus_handler_scores: register_int_gauge_vec_with_registry!(
675 "consensus_handler_scores",
676 "scores from consensus for each authority",
677 &["authority"],
678 registry,
679 ).unwrap(),
680 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
681 "consensus_handler_deferred_transactions",
682 "Number of transactions deferred by consensus handler",
683 registry,
684 ).unwrap(),
685 consensus_handler_congested_transactions: register_int_counter_with_registry!(
686 "consensus_handler_congested_transactions",
687 "Number of transactions deferred by consensus handler due to congestion",
688 registry,
689 ).unwrap(),
690 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
691 "consensus_handler_cancelled_transactions",
692 "Number of transactions cancelled by consensus handler",
693 registry,
694 ).unwrap(),
695 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
696 "consensus_handler_max_congestion_control_object_costs",
697 "Max object costs for congestion control in the current consensus commit",
698 &["commit_type"],
699 registry,
700 ).unwrap(),
701 consensus_committed_subdags: register_int_counter_vec_with_registry!(
702 "consensus_committed_subdags",
703 "Number of committed subdags, sliced by author",
704 &["authority"],
705 registry,
706 ).unwrap(),
707 consensus_committed_messages: register_int_gauge_vec_with_registry!(
708 "consensus_committed_messages",
709 "Total number of committed consensus messages, sliced by author",
710 &["authority"],
711 registry,
712 ).unwrap(),
713 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
714 "consensus_committed_user_transactions",
715 "Number of committed user transactions, sliced by submitter",
716 &["authority"],
717 registry,
718 ).unwrap(),
719 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
720 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
721 zklogin_sig_count: register_int_counter_with_registry!(
722 "zklogin_sig_count",
723 "Count of zkLogin signatures",
724 registry,
725 )
726 .unwrap(),
727 multisig_sig_count: register_int_counter_with_registry!(
728 "multisig_sig_count",
729 "Count of zkLogin signatures",
730 registry,
731 )
732 .unwrap(),
733 consensus_calculated_throughput: register_int_gauge_with_registry!(
734 "consensus_calculated_throughput",
735 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
736 registry,
737 ).unwrap(),
738 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
739 "consensus_calculated_throughput_profile",
740 "The current active calculated throughput profile",
741 registry
742 ).unwrap(),
743 execution_queueing_latency: LatencyObserver::new(),
744 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
745 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
746 }
747 }
748
749 pub fn reset_on_reconfigure(&self) {
753 self.consensus_committed_messages.reset();
754 self.consensus_handler_scores.reset();
755 self.consensus_committed_user_transactions.reset();
756 }
757}
758
759pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
766
767pub struct AuthorityState {
768 pub name: AuthorityName,
771 pub secret: StableSyncAuthoritySigner,
773
774 input_loader: TransactionInputLoader,
776 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
777
778 epoch_store: ArcSwap<AuthorityPerEpochStore>,
779
780 execution_lock: RwLock<EpochId>,
786
787 pub indexes: Option<Arc<IndexStore>>,
788 pub rest_index: Option<Arc<RestIndexStore>>,
789
790 pub subscription_handler: Arc<SubscriptionHandler>,
791 pub checkpoint_store: Arc<CheckpointStore>,
792
793 committee_store: Arc<CommitteeStore>,
794
795 transaction_manager: Arc<TransactionManager>,
797
798 #[cfg_attr(not(test), expect(unused))]
800 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
801
802 pub metrics: Arc<AuthorityMetrics>,
803 _pruner: AuthorityStorePruner,
804 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
805
806 db_checkpoint_config: DBCheckpointConfig,
808
809 pub config: NodeConfig,
810
811 pub overload_info: AuthorityOverloadInfo,
813
814 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
815
816 chain_identifier: ChainIdentifier,
819
820 pub(crate) congestion_tracker: Arc<CongestionTracker>,
821}
822
823impl AuthorityState {
832 pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
833 epoch_store.committee().authority_exists(&self.name)
834 }
835
836 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
837 !self.is_validator(epoch_store)
838 }
839
840 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
841 &self.committee_store
842 }
843
844 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
845 self.committee_store.clone()
846 }
847
848 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
849 &self.config.authority_overload_config
850 }
851
852 pub fn get_epoch_state_commitments(
853 &self,
854 epoch: EpochId,
855 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
856 self.checkpoint_store.get_epoch_state_commitments(epoch)
857 }
858
859 #[instrument(level = "trace", skip_all)]
863 async fn handle_transaction_impl(
864 &self,
865 transaction: VerifiedTransaction,
866 epoch_store: &Arc<AuthorityPerEpochStore>,
867 ) -> IotaResult<VerifiedSignedTransaction> {
868 let _execution_lock = self.execution_lock_for_signing();
870
871 let tx_digest = transaction.digest();
872 let tx_data = transaction.data().transaction_data();
873
874 let input_object_kinds = tx_data.input_objects()?;
875 let receiving_objects_refs = tx_data.receiving_objects();
876
877 iota_transaction_checks::deny::check_transaction_for_signing(
881 tx_data,
882 transaction.tx_signatures(),
883 &input_object_kinds,
884 &receiving_objects_refs,
885 &self.config.transaction_deny_config,
886 self.get_backing_package_store().as_ref(),
887 )?;
888
889 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
890 Some(tx_digest),
891 &input_object_kinds,
892 &receiving_objects_refs,
893 epoch_store.epoch(),
894 )?;
895
896 let (_gas_status, checked_input_objects) =
897 iota_transaction_checks::check_transaction_input(
898 epoch_store.protocol_config(),
899 epoch_store.reference_gas_price(),
900 tx_data,
901 input_objects,
902 &receiving_objects,
903 &self.metrics.bytecode_verifier_metrics,
904 &self.config.verifier_signing_config,
905 )?;
906
907 check_coin_deny_list_v1_during_signing(
908 tx_data.sender(),
909 &checked_input_objects,
910 &receiving_objects,
911 &self.get_object_store(),
912 )?;
913
914 let owned_objects = checked_input_objects.inner().filter_owned_objects();
915
916 let signed_transaction = VerifiedSignedTransaction::new(
917 epoch_store.epoch(),
918 transaction,
919 self.name,
920 &*self.secret,
921 );
922
923 self.get_cache_writer().try_acquire_transaction_locks(
928 epoch_store,
929 &owned_objects,
930 signed_transaction.clone(),
931 )?;
932
933 Ok(signed_transaction)
934 }
935
936 #[instrument(level = "trace", skip_all)]
938 pub async fn handle_transaction(
939 &self,
940 epoch_store: &Arc<AuthorityPerEpochStore>,
941 transaction: VerifiedTransaction,
942 ) -> IotaResult<HandleTransactionResponse> {
943 let tx_digest = *transaction.digest();
944 debug!("handle_transaction");
945
946 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
948 return Ok(HandleTransactionResponse { status });
949 }
950
951 let _metrics_guard = self
952 .metrics
953 .authority_state_handle_transaction_latency
954 .start_timer();
955 self.metrics.tx_orders.inc();
956
957 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
958 match signed {
959 Ok(s) => {
960 if self.is_validator(epoch_store) {
961 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
962 let tx = s.clone();
963 let validator_tx_finalizer = validator_tx_finalizer.clone();
964 let cache_reader = self.get_transaction_cache_reader().clone();
965 let epoch_store = epoch_store.clone();
966 spawn_monitored_task!(epoch_store.within_alive_epoch(
967 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
968 ));
969 }
970 }
971 Ok(HandleTransactionResponse {
972 status: TransactionStatus::Signed(s.into_inner().into_sig()),
973 })
974 }
975 Err(err) => Ok(HandleTransactionResponse {
979 status: self
980 .get_transaction_status(&tx_digest, epoch_store)?
981 .ok_or(err)?
982 .1,
983 }),
984 }
985 }
986
987 pub fn check_system_overload_at_signing(&self) -> bool {
988 self.config
989 .authority_overload_config
990 .check_system_overload_at_signing
991 }
992
993 pub fn check_system_overload_at_execution(&self) -> bool {
994 self.config
995 .authority_overload_config
996 .check_system_overload_at_execution
997 }
998
999 pub(crate) fn check_system_overload(
1000 &self,
1001 consensus_adapter: &Arc<ConsensusAdapter>,
1002 tx_data: &SenderSignedData,
1003 do_authority_overload_check: bool,
1004 ) -> IotaResult {
1005 if do_authority_overload_check {
1006 self.check_authority_overload(tx_data).tap_err(|_| {
1007 self.update_overload_metrics("execution_queue");
1008 })?;
1009 }
1010 self.transaction_manager
1011 .check_execution_overload(self.overload_config(), tx_data)
1012 .tap_err(|_| {
1013 self.update_overload_metrics("execution_pending");
1014 })?;
1015 consensus_adapter.check_consensus_overload().tap_err(|_| {
1016 self.update_overload_metrics("consensus");
1017 })?;
1018
1019 let pending_tx_count = self
1020 .get_cache_commit()
1021 .approximate_pending_transaction_count();
1022 if pending_tx_count
1023 > self
1024 .config
1025 .execution_cache_config
1026 .writeback_cache
1027 .backpressure_threshold_for_rpc()
1028 {
1029 return Err(IotaError::ValidatorOverloadedRetryAfter {
1030 retry_after_secs: 10,
1031 });
1032 }
1033
1034 Ok(())
1035 }
1036
1037 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1038 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1039 return Ok(());
1040 }
1041
1042 let load_shedding_percentage = self
1043 .overload_info
1044 .load_shedding_percentage
1045 .load(Ordering::Relaxed);
1046 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1047 }
1048
1049 fn update_overload_metrics(&self, source: &str) {
1050 self.metrics
1051 .transaction_overload_sources
1052 .with_label_values(&[source])
1053 .inc();
1054 }
1055
1056 #[instrument(level = "trace", skip_all)]
1062 pub async fn fullnode_execute_certificate_with_effects(
1063 &self,
1064 transaction: &VerifiedExecutableTransaction,
1065 effects: &VerifiedCertifiedTransactionEffects,
1072 epoch_store: &Arc<AuthorityPerEpochStore>,
1073 ) -> IotaResult {
1074 assert!(self.is_fullnode(epoch_store));
1075 if self.epoch_store.load().epoch() != epoch_store.epoch() {
1080 return Err(IotaError::EpochEnded(epoch_store.epoch()));
1081 }
1082 let _metrics_guard = self
1083 .metrics
1084 .execute_certificate_with_effects_latency
1085 .start_timer();
1086 let digest = *transaction.digest();
1087 debug!("execute_certificate_with_effects");
1088 fp_ensure!(
1089 *effects.data().transaction_digest() == digest,
1090 IotaError::ErrorWhileProcessingCertificate {
1091 err: "effects/tx digest mismatch".to_string()
1092 }
1093 );
1094
1095 if transaction.contains_shared_object() {
1096 epoch_store.acquire_shared_version_assignments_from_effects(
1097 transaction,
1098 effects.data(),
1099 self.get_object_cache_reader().as_ref(),
1100 )?;
1101 }
1102
1103 let expected_effects_digest = effects.digest();
1104
1105 self.transaction_manager
1106 .enqueue(vec![transaction.clone()], epoch_store);
1107
1108 let observed_effects = self
1109 .get_transaction_cache_reader()
1110 .try_notify_read_executed_effects(&[digest])
1111 .instrument(tracing::debug_span!(
1112 "notify_read_effects_in_execute_certificate_with_effects"
1113 ))
1114 .await?
1115 .pop()
1116 .expect("notify_read_effects should return exactly 1 element");
1117
1118 let observed_effects_digest = observed_effects.digest();
1119 if &observed_effects_digest != expected_effects_digest {
1120 panic!(
1121 "Locally executed effects do not match canonical effects! expected_effects_digest={:?} observed_effects_digest={:?} expected_effects={:?} observed_effects={:?} input_objects={:?}",
1122 expected_effects_digest,
1123 observed_effects_digest,
1124 effects.data(),
1125 observed_effects,
1126 transaction.data().transaction_data().input_objects()
1127 );
1128 }
1129 Ok(())
1130 }
1131
1132 #[instrument(level = "trace", skip_all)]
1134 pub async fn execute_certificate(
1135 &self,
1136 certificate: &VerifiedCertificate,
1137 epoch_store: &Arc<AuthorityPerEpochStore>,
1138 ) -> IotaResult<TransactionEffects> {
1139 let _metrics_guard = if certificate.contains_shared_object() {
1140 self.metrics
1141 .execute_certificate_latency_shared_object
1142 .start_timer()
1143 } else {
1144 self.metrics
1145 .execute_certificate_latency_single_writer
1146 .start_timer()
1147 };
1148 trace!("execute_certificate");
1149
1150 self.metrics.total_cert_attempts.inc();
1151
1152 if !certificate.contains_shared_object() {
1153 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1158 }
1159
1160 epoch_store
1163 .within_alive_epoch(self.notify_read_effects(certificate))
1164 .await
1165 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1166 .and_then(|r| r)
1167 }
1168
1169 #[instrument(level = "trace", skip_all)]
1184 pub fn try_execute_immediately(
1185 &self,
1186 certificate: &VerifiedExecutableTransaction,
1187 expected_effects_digest: Option<TransactionEffectsDigest>,
1188 epoch_store: &Arc<AuthorityPerEpochStore>,
1189 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1190 let _scope = monitored_scope("Execution::try_execute_immediately");
1191 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1192
1193 let tx_digest = certificate.digest();
1194
1195 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1197
1198 if let Some(effects) = self
1201 .get_transaction_cache_reader()
1202 .try_get_executed_effects(tx_digest)?
1203 {
1204 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1205 assert_eq!(
1206 effects.digest(),
1207 expected_effects_digest_inner,
1208 "Unexpected effects digest for transaction {tx_digest:?}"
1209 );
1210 }
1211 tx_guard.release();
1212 return Ok((effects, None));
1213 }
1214 let input_objects =
1215 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1216
1217 let expected_effects_digest =
1223 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1224
1225 self.process_certificate(
1226 tx_guard,
1227 certificate,
1228 input_objects,
1229 expected_effects_digest,
1230 epoch_store,
1231 )
1232 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1233 .tap_ok(
1234 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1235 )
1236 }
1237
1238 pub fn read_objects_for_execution(
1239 &self,
1240 tx_lock: &CertLockGuard,
1241 certificate: &VerifiedExecutableTransaction,
1242 epoch_store: &Arc<AuthorityPerEpochStore>,
1243 ) -> IotaResult<InputObjects> {
1244 let _scope = monitored_scope("Execution::load_input_objects");
1245 let _metrics_guard = self
1246 .metrics
1247 .execution_load_input_objects_latency
1248 .start_timer();
1249 let input_objects = &certificate.data().transaction_data().input_objects()?;
1250 self.input_loader.read_objects_for_execution(
1251 epoch_store,
1252 &certificate.key(),
1253 tx_lock,
1254 input_objects,
1255 epoch_store.epoch(),
1256 )
1257 }
1258
1259 pub fn try_execute_for_test(
1263 &self,
1264 certificate: &VerifiedCertificate,
1265 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1266 let epoch_store = self.epoch_store_for_testing();
1267 let (effects, execution_error_opt) = self.try_execute_immediately(
1268 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1269 None,
1270 &epoch_store,
1271 )?;
1272 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1273 Ok((signed_effects, execution_error_opt))
1274 }
1275
1276 pub fn execute_for_test(
1278 &self,
1279 certificate: &VerifiedCertificate,
1280 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1281 self.try_execute_for_test(certificate)
1282 .expect("try_execute_for_test should not fail")
1283 }
1284
1285 pub async fn notify_read_effects(
1286 &self,
1287 certificate: &VerifiedCertificate,
1288 ) -> IotaResult<TransactionEffects> {
1289 self.get_transaction_cache_reader()
1290 .try_notify_read_executed_effects(&[*certificate.digest()])
1291 .await
1292 .map(|mut r| r.pop().expect("must return correct number of effects"))
1293 }
1294
1295 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1296 self.get_object_cache_reader()
1297 .try_check_owned_objects_are_live(owned_object_refs)
1298 }
1299
1300 pub(crate) fn debug_dump_transaction_state(
1305 &self,
1306 tx_digest: &TransactionDigest,
1307 effects: &TransactionEffects,
1308 expected_effects_digest: TransactionEffectsDigest,
1309 inner_temporary_store: &InnerTemporaryStore,
1310 certificate: &VerifiedExecutableTransaction,
1311 debug_dump_config: &StateDebugDumpConfig,
1312 ) -> IotaResult<PathBuf> {
1313 let dump_dir = debug_dump_config
1314 .dump_file_directory
1315 .as_ref()
1316 .cloned()
1317 .unwrap_or(std::env::temp_dir());
1318 let epoch_store = self.load_epoch_store_one_call_per_task();
1319
1320 NodeStateDump::new(
1321 tx_digest,
1322 effects,
1323 expected_effects_digest,
1324 self.get_object_store().as_ref(),
1325 &epoch_store,
1326 inner_temporary_store,
1327 certificate,
1328 )?
1329 .write_to_file(&dump_dir)
1330 .map_err(|e| IotaError::FileIO(e.to_string()))
1331 }
1332
1333 #[instrument(level = "trace", skip_all)]
1334 pub(crate) fn process_certificate(
1335 &self,
1336 tx_guard: CertTxGuard,
1337 certificate: &VerifiedExecutableTransaction,
1338 input_objects: InputObjects,
1339 expected_effects_digest: Option<TransactionEffectsDigest>,
1340 epoch_store: &Arc<AuthorityPerEpochStore>,
1341 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1342 let process_certificate_start_time = tokio::time::Instant::now();
1343 let digest = *certificate.digest();
1344
1345 fail_point_if!("correlated-crash-process-certificate", || {
1346 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1347 iota_simulator::task::kill_current_node(None);
1348 }
1349 });
1350
1351 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1352 let execution_guard = match execution_guard {
1358 Ok(execution_guard) => execution_guard,
1359 Err(err) => {
1360 tx_guard.release();
1361 return Err(err);
1362 }
1363 };
1364 if *execution_guard != epoch_store.epoch() {
1368 tx_guard.release();
1369 info!("The epoch of the execution_guard doesn't match the epoch store");
1370 return Err(IotaError::WrongEpoch {
1371 expected_epoch: epoch_store.epoch(),
1372 actual_epoch: *execution_guard,
1373 });
1374 }
1375
1376 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1382 &execution_guard,
1383 certificate,
1384 input_objects,
1385 epoch_store,
1386 ) {
1387 Err(e) => {
1388 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1389 tx_guard.release();
1390 return Err(e);
1391 }
1392 Ok(res) => res,
1393 };
1394
1395 if let Some(expected_effects_digest) = expected_effects_digest {
1396 if effects.digest() != expected_effects_digest {
1397 match self.debug_dump_transaction_state(
1399 &digest,
1400 &effects,
1401 expected_effects_digest,
1402 &inner_temporary_store,
1403 certificate,
1404 &self.config.state_debug_dump_config,
1405 ) {
1406 Ok(out_path) => {
1407 info!(
1408 "Dumped node state for transaction {} to {}",
1409 digest,
1410 out_path.as_path().display().to_string()
1411 );
1412 }
1413 Err(e) => {
1414 error!("Error dumping state for transaction {}: {e}", digest);
1415 }
1416 }
1417 error!(
1418 tx_digest = ?digest,
1419 ?expected_effects_digest,
1420 actual_effects = ?effects,
1421 "fork detected!"
1422 );
1423 panic!(
1424 "Transaction {} is expected to have effects digest {}, but got {}!",
1425 digest,
1426 expected_effects_digest,
1427 effects.digest(),
1428 );
1429 }
1430 }
1431
1432 fail_point!("crash");
1433
1434 self.commit_certificate(
1435 certificate,
1436 inner_temporary_store,
1437 &effects,
1438 tx_guard,
1439 execution_guard,
1440 epoch_store,
1441 )?;
1442
1443 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1444 certificate.data().transaction_data().kind()
1445 {
1446 if let Some(err) = &execution_error_opt {
1447 debug_fatal!("Authenticator state update failed: {:?}", err);
1448 }
1449 epoch_store.update_authenticator_state(auth_state);
1450
1451 if cfg!(debug_assertions) {
1454 let authenticator_state = get_authenticator_state(self.get_object_store())
1455 .expect("Read cannot fail")
1456 .expect("Authenticator state must exist");
1457
1458 let mut sys_jwks: Vec<_> = authenticator_state
1459 .active_jwks
1460 .into_iter()
1461 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1462 .collect();
1463 let mut active_jwks: Vec<_> = epoch_store
1464 .signature_verifier
1465 .get_jwks()
1466 .into_iter()
1467 .collect();
1468 sys_jwks.sort();
1469 active_jwks.sort();
1470
1471 assert_eq!(sys_jwks, active_jwks);
1472 }
1473 }
1474
1475 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1476 if elapsed > 0.0 {
1477 self.metrics
1478 .execution_gas_latency_ratio
1479 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1480 };
1481 Ok((effects, execution_error_opt))
1482 }
1483
1484 #[instrument(level = "trace", skip_all)]
1485 fn commit_certificate(
1486 &self,
1487 certificate: &VerifiedExecutableTransaction,
1488 inner_temporary_store: InnerTemporaryStore,
1489 effects: &TransactionEffects,
1490 tx_guard: CertTxGuard,
1491 _execution_guard: ExecutionLockReadGuard<'_>,
1492 epoch_store: &Arc<AuthorityPerEpochStore>,
1493 ) -> IotaResult {
1494 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1495 monitored_scope("Execution::commit_certificate");
1496 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1497
1498 let tx_key = certificate.key();
1499 let tx_digest = certificate.digest();
1500 let input_object_count = inner_temporary_store.input_objects.len();
1501 let shared_object_count = effects.input_shared_objects().len();
1502
1503 let output_keys = inner_temporary_store.get_output_keys(effects);
1504
1505 let _ = self
1507 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1508 .tap_err(|e| {
1509 self.metrics.post_processing_total_failures.inc();
1510 error!(?tx_digest, "tx post processing failed: {e}");
1511 });
1512
1513 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1517
1518 fail_point!("crash");
1520
1521 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1522 certificate.clone().into_unsigned(),
1523 effects.clone(),
1524 inner_temporary_store,
1525 );
1526 self.get_cache_writer()
1527 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1528
1529 if certificate.transaction_data().is_end_of_epoch_tx() {
1530 self.get_object_cache_reader()
1533 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1534 }
1535
1536 tx_guard.commit_tx();
1538
1539 self.transaction_manager
1543 .notify_commit(tx_digest, output_keys, epoch_store);
1544
1545 self.update_metrics(certificate, input_object_count, shared_object_count);
1546
1547 Ok(())
1548 }
1549
1550 fn update_metrics(
1551 &self,
1552 certificate: &VerifiedExecutableTransaction,
1553 input_object_count: usize,
1554 shared_object_count: usize,
1555 ) {
1556 if certificate.has_zklogin_sig() {
1558 self.metrics.zklogin_sig_count.inc();
1559 } else if certificate.has_upgraded_multisig() {
1560 self.metrics.multisig_sig_count.inc();
1561 }
1562
1563 self.metrics.total_effects.inc();
1564 self.metrics.total_certs.inc();
1565
1566 if shared_object_count > 0 {
1567 self.metrics.shared_obj_tx.inc();
1568 }
1569
1570 if certificate.is_sponsored_tx() {
1571 self.metrics.sponsored_tx.inc();
1572 }
1573
1574 self.metrics
1575 .num_input_objs
1576 .observe(input_object_count as f64);
1577 self.metrics
1578 .num_shared_objects
1579 .observe(shared_object_count as f64);
1580 self.metrics.batch_size.observe(
1581 certificate
1582 .data()
1583 .intent_message()
1584 .value
1585 .kind()
1586 .num_commands() as f64,
1587 );
1588 }
1589
1590 #[instrument(level = "trace", skip_all)]
1602 fn prepare_certificate(
1603 &self,
1604 _execution_guard: &ExecutionLockReadGuard<'_>,
1605 certificate: &VerifiedExecutableTransaction,
1606 input_objects: InputObjects,
1607 epoch_store: &Arc<AuthorityPerEpochStore>,
1608 ) -> IotaResult<(
1609 InnerTemporaryStore,
1610 TransactionEffects,
1611 Option<ExecutionError>,
1612 )> {
1613 let _scope = monitored_scope("Execution::prepare_certificate");
1614 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1615 let prepare_certificate_start_time = tokio::time::Instant::now();
1616
1617 let tx_data = certificate.data().transaction_data();
1620 tx_data.validity_check(epoch_store.protocol_config())?;
1621
1622 let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1625 certificate,
1626 input_objects,
1627 epoch_store.protocol_config(),
1628 epoch_store.reference_gas_price(),
1629 )?;
1630
1631 let owned_object_refs = input_objects.inner().filter_owned_objects();
1632 self.check_owned_locks(&owned_object_refs)?;
1633 let tx_digest = *certificate.digest();
1634 let protocol_config = epoch_store.protocol_config();
1635 let transaction_data = &certificate.data().intent_message().value;
1636 let (kind, signer, gas) = transaction_data.execution_parts();
1637
1638 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1639 let (inner_temp_store, _, mut effects, execution_error_opt) =
1640 epoch_store.executor().execute_transaction_to_effects(
1641 self.get_backing_store().as_ref(),
1642 protocol_config,
1643 self.metrics.limits_metrics.clone(),
1644 self.config
1647 .expensive_safety_check_config
1648 .enable_deep_per_tx_iota_conservation_check(),
1649 self.config.certificate_deny_config.certificate_deny_set(),
1650 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1651 epoch_store
1652 .epoch_start_config()
1653 .epoch_data()
1654 .epoch_start_timestamp(),
1655 input_objects,
1656 gas,
1657 gas_status,
1658 kind,
1659 signer,
1660 tx_digest,
1661 &mut None,
1662 );
1663
1664 fail_point_if!("cp_execution_nondeterminism", || {
1665 #[cfg(msim)]
1666 self.create_fail_state(certificate, epoch_store, &mut effects);
1667 });
1668
1669 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1670 if elapsed > 0.0 {
1671 self.metrics
1672 .prepare_cert_gas_latency_ratio
1673 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1674 }
1675
1676 Ok((inner_temp_store, effects, execution_error_opt.err()))
1677 }
1678
1679 pub fn prepare_certificate_for_benchmark(
1680 &self,
1681 certificate: &VerifiedExecutableTransaction,
1682 input_objects: InputObjects,
1683 epoch_store: &Arc<AuthorityPerEpochStore>,
1684 ) -> IotaResult<(
1685 InnerTemporaryStore,
1686 TransactionEffects,
1687 Option<ExecutionError>,
1688 )> {
1689 let lock = RwLock::new(epoch_store.epoch());
1690 let execution_guard = lock.try_read().unwrap();
1691
1692 self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1693 }
1694
1695 #[allow(clippy::type_complexity)]
1696 pub fn dry_exec_transaction(
1697 &self,
1698 transaction: TransactionData,
1699 transaction_digest: TransactionDigest,
1700 ) -> IotaResult<(
1701 DryRunTransactionBlockResponse,
1702 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1703 TransactionEffects,
1704 Option<ObjectID>,
1705 )> {
1706 let epoch_store = self.load_epoch_store_one_call_per_task();
1707 if !self.is_fullnode(&epoch_store) {
1708 return Err(IotaError::UnsupportedFeature {
1709 error: "dry-exec is only supported on fullnodes".to_string(),
1710 });
1711 }
1712
1713 if transaction.kind().is_system_tx() {
1714 return Err(IotaError::UnsupportedFeature {
1715 error: "dry-exec does not support system transactions".to_string(),
1716 });
1717 }
1718
1719 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1720 }
1721
1722 #[allow(clippy::type_complexity)]
1723 pub fn dry_exec_transaction_for_benchmark(
1724 &self,
1725 transaction: TransactionData,
1726 transaction_digest: TransactionDigest,
1727 ) -> IotaResult<(
1728 DryRunTransactionBlockResponse,
1729 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1730 TransactionEffects,
1731 Option<ObjectID>,
1732 )> {
1733 let epoch_store = self.load_epoch_store_one_call_per_task();
1734 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1735 }
1736
1737 #[allow(clippy::type_complexity)]
1738 fn dry_exec_transaction_impl(
1739 &self,
1740 epoch_store: &AuthorityPerEpochStore,
1741 transaction: TransactionData,
1742 transaction_digest: TransactionDigest,
1743 ) -> IotaResult<(
1744 DryRunTransactionBlockResponse,
1745 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1746 TransactionEffects,
1747 Option<ObjectID>,
1748 )> {
1749 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1751
1752 let input_object_kinds = transaction.input_objects()?;
1753 let receiving_object_refs = transaction.receiving_objects();
1754
1755 iota_transaction_checks::deny::check_transaction_for_signing(
1756 &transaction,
1757 &[],
1758 &input_object_kinds,
1759 &receiving_object_refs,
1760 &self.config.transaction_deny_config,
1761 self.get_backing_package_store().as_ref(),
1762 )?;
1763
1764 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1765 None,
1767 &input_object_kinds,
1768 &receiving_object_refs,
1769 epoch_store.epoch(),
1770 )?;
1771
1772 let mut transaction = transaction;
1774 let mut gas_object_refs = transaction.gas().to_vec();
1775 let reference_gas_price = epoch_store.reference_gas_price();
1776 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1777 let sender = transaction.gas_owner();
1778 let gas_object_id = ObjectID::random();
1779 let gas_object = Object::new_move(
1780 MoveObject::new_gas_coin(
1781 OBJECT_START_VERSION,
1782 gas_object_id,
1783 SIMULATION_GAS_COIN_VALUE,
1784 ),
1785 Owner::AddressOwner(sender),
1786 TransactionDigest::genesis_marker(),
1787 );
1788 let gas_object_ref = gas_object.compute_object_reference();
1789 gas_object_refs = vec![gas_object_ref];
1790 transaction.gas_data_mut().payment = gas_object_refs.clone();
1792 (
1793 iota_transaction_checks::check_transaction_input_with_given_gas(
1794 epoch_store.protocol_config(),
1795 reference_gas_price,
1796 &transaction,
1797 input_objects,
1798 receiving_objects,
1799 gas_object,
1800 &self.metrics.bytecode_verifier_metrics,
1801 &self.config.verifier_signing_config,
1802 )?,
1803 Some(gas_object_id),
1804 )
1805 } else {
1806 (
1807 iota_transaction_checks::check_transaction_input(
1808 epoch_store.protocol_config(),
1809 reference_gas_price,
1810 &transaction,
1811 input_objects,
1812 &receiving_objects,
1813 &self.metrics.bytecode_verifier_metrics,
1814 &self.config.verifier_signing_config,
1815 )?,
1816 None,
1817 )
1818 };
1819
1820 let protocol_config = epoch_store.protocol_config();
1821 let (kind, signer, _) = transaction.execution_parts();
1822
1823 let silent = true;
1824 let executor = iota_execution::executor(protocol_config, silent, None)
1825 .expect("Creating an executor should not fail here");
1826
1827 let expensive_checks = false;
1828 let (inner_temp_store, _, effects, _execution_error) = executor
1829 .execute_transaction_to_effects(
1830 self.get_backing_store().as_ref(),
1831 protocol_config,
1832 self.metrics.limits_metrics.clone(),
1833 expensive_checks,
1834 self.config.certificate_deny_config.certificate_deny_set(),
1835 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1836 epoch_store
1837 .epoch_start_config()
1838 .epoch_data()
1839 .epoch_start_timestamp(),
1840 checked_input_objects,
1841 gas_object_refs,
1842 gas_status,
1843 kind,
1844 signer,
1845 transaction_digest,
1846 &mut None,
1847 );
1848 let tx_digest = *effects.transaction_digest();
1849
1850 let module_cache =
1851 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1852
1853 let mut layout_resolver =
1854 epoch_store
1855 .executor()
1856 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1857 &inner_temp_store,
1858 self.get_backing_package_store(),
1859 )));
1860 let object_changes = Vec::new();
1862
1863 let balance_changes = Vec::new();
1865
1866 let written_with_kind = effects
1867 .created()
1868 .into_iter()
1869 .map(|(oref, _)| (oref, WriteKind::Create))
1870 .chain(
1871 effects
1872 .unwrapped()
1873 .into_iter()
1874 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1875 )
1876 .chain(
1877 effects
1878 .mutated()
1879 .into_iter()
1880 .map(|(oref, _)| (oref, WriteKind::Mutate)),
1881 )
1882 .map(|(oref, kind)| {
1883 let obj = inner_temp_store.written.get(&oref.0).unwrap();
1884 (oref.0, (oref, obj.clone(), kind))
1886 })
1887 .collect();
1888
1889 Ok((
1890 DryRunTransactionBlockResponse {
1891 suggested_gas_price: self
1893 .congestion_tracker
1894 .get_prediction_suggested_gas_price(&transaction),
1895 input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1896 .map_err(|e| IotaError::TransactionSerialization {
1897 error: format!(
1898 "Failed to convert transaction to IotaTransactionBlockData: {e}",
1899 ),
1900 })?, effects: effects.clone().try_into()?,
1902 events: IotaTransactionBlockEvents::try_from(
1903 inner_temp_store.events.clone(),
1904 tx_digest,
1905 None,
1906 layout_resolver.as_mut(),
1907 )?,
1908 object_changes,
1909 balance_changes,
1910 },
1911 written_with_kind,
1912 effects,
1913 mock_gas,
1914 ))
1915 }
1916
1917 pub fn simulate_transaction(
1918 &self,
1919 transaction: TransactionData,
1920 ) -> IotaResult<SimulateTransactionResult> {
1921 if transaction.kind().is_system_tx() {
1922 return Err(IotaError::UnsupportedFeature {
1923 error: "simulate does not support system transactions".to_string(),
1924 });
1925 }
1926
1927 let epoch_store = self.load_epoch_store_one_call_per_task();
1928 if !self.is_fullnode(&epoch_store) {
1929 return Err(IotaError::UnsupportedFeature {
1930 error: "simulate is only supported on fullnodes".to_string(),
1931 });
1932 }
1933
1934 self.simulate_transaction_impl(&epoch_store, transaction)
1935 }
1936
1937 fn simulate_transaction_impl(
1938 &self,
1939 epoch_store: &AuthorityPerEpochStore,
1940 transaction: TransactionData,
1941 ) -> IotaResult<SimulateTransactionResult> {
1942 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1944
1945 let input_object_kinds = transaction.input_objects()?;
1946 let receiving_object_refs = transaction.receiving_objects();
1947
1948 iota_transaction_checks::deny::check_transaction_for_signing(
1949 &transaction,
1950 &[],
1951 &input_object_kinds,
1952 &receiving_object_refs,
1953 &self.config.transaction_deny_config,
1954 self.get_backing_package_store().as_ref(),
1955 )?;
1956
1957 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1958 None,
1960 &input_object_kinds,
1961 &receiving_object_refs,
1962 epoch_store.epoch(),
1963 )?;
1964 let mut transaction = transaction;
1966 let mut gas_object_refs = transaction.gas().to_vec();
1967 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1968 let sender = transaction.gas_owner();
1969 let gas_object_id = ObjectID::MAX;
1970 let gas_object = Object::new_move(
1971 MoveObject::new_gas_coin(
1972 OBJECT_START_VERSION,
1973 gas_object_id,
1974 SIMULATION_GAS_COIN_VALUE,
1975 ),
1976 Owner::AddressOwner(sender),
1977 TransactionDigest::genesis_marker(),
1978 );
1979 let gas_object_ref = gas_object.compute_object_reference();
1980 gas_object_refs = vec![gas_object_ref];
1981
1982 transaction.gas_data_mut().payment = gas_object_refs.clone();
1984 (
1985 iota_transaction_checks::check_transaction_input_with_given_gas(
1986 epoch_store.protocol_config(),
1987 epoch_store.reference_gas_price(),
1988 &transaction,
1989 input_objects,
1990 receiving_objects,
1991 gas_object,
1992 &self.metrics.bytecode_verifier_metrics,
1993 &self.config.verifier_signing_config,
1994 )?,
1995 Some(gas_object_id),
1996 )
1997 } else {
1998 (
1999 iota_transaction_checks::check_transaction_input(
2000 epoch_store.protocol_config(),
2001 epoch_store.reference_gas_price(),
2002 &transaction,
2003 input_objects,
2004 &receiving_objects,
2005 &self.metrics.bytecode_verifier_metrics,
2006 &self.config.verifier_signing_config,
2007 )?,
2008 None,
2009 )
2010 };
2011
2012 let protocol_config = epoch_store.protocol_config();
2013 let (kind, signer, _) = transaction.execution_parts();
2014
2015 let silent = true;
2016 let executor = iota_execution::executor(protocol_config, silent, None)
2017 .expect("Creating an executor should not fail here");
2018
2019 let expensive_checks = false;
2020 let (inner_temp_store, _, effects, _execution_error) = executor
2021 .execute_transaction_to_effects(
2022 self.get_backing_store().as_ref(),
2023 protocol_config,
2024 self.metrics.limits_metrics.clone(),
2025 expensive_checks,
2026 self.config.certificate_deny_config.certificate_deny_set(),
2027 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2028 epoch_store
2029 .epoch_start_config()
2030 .epoch_data()
2031 .epoch_start_timestamp(),
2032 checked_input_objects,
2033 gas_object_refs,
2034 gas_status,
2035 kind,
2036 signer,
2037 transaction.digest(),
2038 &mut None,
2039 );
2040
2041 Ok(SimulateTransactionResult {
2042 input_objects: inner_temp_store.input_objects,
2043 output_objects: inner_temp_store.written,
2044 events: effects.events_digest().map(|_| inner_temp_store.events),
2045 effects,
2046 mock_gas_id: mock_gas,
2047 })
2048 }
2049
2050 pub async fn dev_inspect_transaction_block(
2052 &self,
2053 sender: IotaAddress,
2054 transaction_kind: TransactionKind,
2055 gas_price: Option<u64>,
2056 gas_budget: Option<u64>,
2057 gas_sponsor: Option<IotaAddress>,
2058 gas_objects: Option<Vec<ObjectRef>>,
2059 show_raw_txn_data_and_effects: Option<bool>,
2060 skip_checks: Option<bool>,
2061 ) -> IotaResult<DevInspectResults> {
2062 let epoch_store = self.load_epoch_store_one_call_per_task();
2063
2064 if !self.is_fullnode(&epoch_store) {
2065 return Err(IotaError::UnsupportedFeature {
2066 error: "dev-inspect is only supported on fullnodes".to_string(),
2067 });
2068 }
2069
2070 if transaction_kind.is_system_tx() {
2071 return Err(IotaError::UnsupportedFeature {
2072 error: "system transactions are not supported".to_string(),
2073 });
2074 }
2075
2076 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2077 let skip_checks = skip_checks.unwrap_or(true);
2078 let reference_gas_price = epoch_store.reference_gas_price();
2079 let protocol_config = epoch_store.protocol_config();
2080 let max_tx_gas = protocol_config.max_tx_gas();
2081
2082 let price = gas_price.unwrap_or(reference_gas_price);
2083 let budget = gas_budget.unwrap_or(max_tx_gas);
2084 let owner = gas_sponsor.unwrap_or(sender);
2085 let payment = gas_objects.unwrap_or_default();
2088 let transaction = TransactionData::V1(TransactionDataV1 {
2089 kind: transaction_kind.clone(),
2090 sender,
2091 gas_data: GasData {
2092 payment,
2093 owner,
2094 price,
2095 budget,
2096 },
2097 expiration: TransactionExpiration::None,
2098 });
2099
2100 let raw_txn_data = if show_raw_txn_data_and_effects {
2101 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2102 error: "Failed to serialize transaction during dev inspect".to_string(),
2103 })?
2104 } else {
2105 vec![]
2106 };
2107
2108 transaction.validity_check_no_gas_check(protocol_config)?;
2109
2110 let input_object_kinds = transaction.input_objects()?;
2111 let receiving_object_refs = transaction.receiving_objects();
2112
2113 iota_transaction_checks::deny::check_transaction_for_signing(
2114 &transaction,
2115 &[],
2116 &input_object_kinds,
2117 &receiving_object_refs,
2118 &self.config.transaction_deny_config,
2119 self.get_backing_package_store().as_ref(),
2120 )?;
2121
2122 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2123 None,
2125 &input_object_kinds,
2126 &receiving_object_refs,
2127 epoch_store.epoch(),
2128 )?;
2129
2130 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2132 SIMULATION_GAS_COIN_VALUE,
2133 transaction.gas_owner(),
2134 );
2135
2136 let gas_objects = if transaction.gas().is_empty() {
2137 let gas_object_ref = dummy_gas_object.compute_object_reference();
2138 vec![gas_object_ref]
2139 } else {
2140 transaction.gas().to_vec()
2141 };
2142
2143 let (gas_status, checked_input_objects) = if skip_checks {
2144 if transaction.gas().is_empty() {
2149 input_objects.push(ObjectReadResult::new(
2150 InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2151 dummy_gas_object.into(),
2152 ));
2153 }
2154 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2155 protocol_config,
2156 &transaction_kind,
2157 input_objects,
2158 receiving_objects,
2159 )?;
2160 let gas_status = IotaGasStatus::new(
2161 max_tx_gas,
2162 transaction.gas_price(),
2163 reference_gas_price,
2164 protocol_config,
2165 )?;
2166
2167 (gas_status, checked_input_objects)
2168 } else {
2169 if transaction.gas().is_empty() {
2173 iota_transaction_checks::check_transaction_input_with_given_gas(
2174 epoch_store.protocol_config(),
2175 reference_gas_price,
2176 &transaction,
2177 input_objects,
2178 receiving_objects,
2179 dummy_gas_object,
2180 &self.metrics.bytecode_verifier_metrics,
2181 &self.config.verifier_signing_config,
2182 )?
2183 } else {
2184 iota_transaction_checks::check_transaction_input(
2185 epoch_store.protocol_config(),
2186 reference_gas_price,
2187 &transaction,
2188 input_objects,
2189 &receiving_objects,
2190 &self.metrics.bytecode_verifier_metrics,
2191 &self.config.verifier_signing_config,
2192 )?
2193 }
2194 };
2195
2196 let executor = iota_execution::executor(protocol_config, true, None)
2197 .expect("Creating an executor should not fail here");
2198 let intent_msg = IntentMessage::new(
2199 Intent {
2200 version: IntentVersion::V0,
2201 scope: IntentScope::TransactionData,
2202 app_id: AppId::Iota,
2203 },
2204 transaction,
2205 );
2206 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2207 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2208 self.get_backing_store().as_ref(),
2209 protocol_config,
2210 self.metrics.limits_metrics.clone(),
2211 false,
2213 self.config.certificate_deny_config.certificate_deny_set(),
2214 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2215 epoch_store
2216 .epoch_start_config()
2217 .epoch_data()
2218 .epoch_start_timestamp(),
2219 checked_input_objects,
2220 gas_objects,
2221 gas_status,
2222 transaction_kind,
2223 sender,
2224 transaction_digest,
2225 skip_checks,
2226 );
2227
2228 let raw_effects = if show_raw_txn_data_and_effects {
2229 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2230 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2231 })?
2232 } else {
2233 vec![]
2234 };
2235
2236 let mut layout_resolver =
2237 epoch_store
2238 .executor()
2239 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2240 &inner_temp_store,
2241 self.get_backing_package_store(),
2242 )));
2243
2244 DevInspectResults::new(
2245 effects,
2246 inner_temp_store.events.clone(),
2247 execution_result,
2248 raw_txn_data,
2249 raw_effects,
2250 layout_resolver.as_mut(),
2251 )
2252 }
2253
2254 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2256 let epoch_store = self.epoch_store_for_testing();
2257 Ok(epoch_store.reference_gas_price())
2258 }
2259
2260 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2261 self.get_transaction_cache_reader()
2262 .try_is_tx_already_executed(digest)
2263 }
2264
2265 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2267 self.try_is_tx_already_executed(digest)
2268 .expect("storage access failed")
2269 }
2270
2271 #[instrument(level = "debug", skip_all, err)]
2273 fn index_tx(
2274 &self,
2275 indexes: &IndexStore,
2276 digest: &TransactionDigest,
2277 cert: &VerifiedExecutableTransaction,
2279 effects: &TransactionEffects,
2280 events: &TransactionEvents,
2281 timestamp_ms: u64,
2282 tx_coins: Option<TxCoins>,
2283 written: &WrittenObjects,
2284 inner_temporary_store: &InnerTemporaryStore,
2285 ) -> IotaResult<u64> {
2286 let changes = self
2287 .process_object_index(effects, written, inner_temporary_store)
2288 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2289
2290 indexes.index_tx(
2291 cert.data().intent_message().value.sender(),
2292 cert.data()
2293 .intent_message()
2294 .value
2295 .input_objects()?
2296 .iter()
2297 .map(|o| o.object_id()),
2298 effects
2299 .all_changed_objects()
2300 .into_iter()
2301 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2302 cert.data()
2303 .intent_message()
2304 .value
2305 .move_calls()
2306 .into_iter()
2307 .map(|(package, module, function)| {
2308 (*package, module.to_owned(), function.to_owned())
2309 }),
2310 events,
2311 changes,
2312 digest,
2313 timestamp_ms,
2314 tx_coins,
2315 )
2316 }
2317
2318 #[cfg(msim)]
2319 fn create_fail_state(
2320 &self,
2321 certificate: &VerifiedExecutableTransaction,
2322 epoch_store: &Arc<AuthorityPerEpochStore>,
2323 effects: &mut TransactionEffects,
2324 ) {
2325 use std::cell::RefCell;
2326 thread_local! {
2327 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2328 }
2329 if !certificate.data().intent_message().value.is_system_tx() {
2330 let committee = epoch_store.committee();
2331 let cur_stake = (**committee).weight(&self.name);
2332 if cur_stake > 0 {
2333 FAIL_STATE.with_borrow_mut(|fail_state| {
2334 if fail_state.0 < committee.validity_threshold() {
2336 fail_state.0 += cur_stake;
2337 fail_state.1.insert(self.name);
2338 }
2339
2340 if fail_state.1.contains(&self.name) {
2341 info!("cp_exec failing tx");
2342 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2343 }
2344 });
2345 }
2346 }
2347 }
2348
2349 fn process_object_index(
2350 &self,
2351 effects: &TransactionEffects,
2352 written: &WrittenObjects,
2353 inner_temporary_store: &InnerTemporaryStore,
2354 ) -> IotaResult<ObjectIndexChanges> {
2355 let epoch_store = self.load_epoch_store_one_call_per_task();
2356 let mut layout_resolver =
2357 epoch_store
2358 .executor()
2359 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2360 inner_temporary_store,
2361 self.get_backing_package_store(),
2362 )));
2363
2364 let modified_at_version = effects
2365 .modified_at_versions()
2366 .into_iter()
2367 .collect::<HashMap<_, _>>();
2368
2369 let tx_digest = effects.transaction_digest();
2370 let mut deleted_owners = vec![];
2371 let mut deleted_dynamic_fields = vec![];
2372 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2373 let old_version = modified_at_version.get(&id).unwrap();
2374 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2377 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2378 ) {
2379 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2380 Owner::ObjectOwner(object_id) => {
2381 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2382 }
2383 _ => {}
2384 }
2385 }
2386
2387 let mut new_owners = vec![];
2388 let mut new_dynamic_fields = vec![];
2389
2390 for (oref, owner, kind) in effects.all_changed_objects() {
2391 let id = &oref.0;
2392 if let WriteKind::Mutate = kind {
2395 let Some(old_version) = modified_at_version.get(id) else {
2396 panic!(
2397 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2398 );
2399 };
2400 let Some(old_object) = self
2403 .get_object_store()
2404 .try_get_object_by_key(id, *old_version)?
2405 else {
2406 panic!(
2407 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2408 );
2409 };
2410 if old_object.owner != owner {
2411 match old_object.owner {
2412 Owner::AddressOwner(addr) => {
2413 deleted_owners.push((addr, *id));
2414 }
2415 Owner::ObjectOwner(object_id) => {
2416 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2417 }
2418 _ => {}
2419 }
2420 }
2421 }
2422
2423 match owner {
2424 Owner::AddressOwner(addr) => {
2425 let new_object = written.get(id).unwrap_or_else(
2428 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2429 );
2430 assert_eq!(
2431 new_object.version(),
2432 oref.1,
2433 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2434 tx_digest,
2435 id,
2436 new_object.version(),
2437 oref.1
2438 );
2439
2440 let type_ = new_object
2441 .type_()
2442 .map(|type_| ObjectType::Struct(type_.clone()))
2443 .unwrap_or(ObjectType::Package);
2444
2445 new_owners.push((
2446 (addr, *id),
2447 ObjectInfo {
2448 object_id: *id,
2449 version: oref.1,
2450 digest: oref.2,
2451 type_,
2452 owner,
2453 previous_transaction: *effects.transaction_digest(),
2454 },
2455 ));
2456 }
2457 Owner::ObjectOwner(owner) => {
2458 let new_object = written.get(id).unwrap_or_else(
2459 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2460 );
2461 assert_eq!(
2462 new_object.version(),
2463 oref.1,
2464 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2465 tx_digest,
2466 id,
2467 new_object.version(),
2468 oref.1
2469 );
2470
2471 let Some(df_info) = self
2472 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2473 .unwrap_or_else(|e| {
2474 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2475 None
2476 }
2477 )
2478 else {
2479 continue;
2481 };
2482 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2483 }
2484 _ => {}
2485 }
2486 }
2487
2488 Ok(ObjectIndexChanges {
2489 deleted_owners,
2490 deleted_dynamic_fields,
2491 new_owners,
2492 new_dynamic_fields,
2493 })
2494 }
2495
2496 fn try_create_dynamic_field_info(
2497 &self,
2498 o: &Object,
2499 written: &WrittenObjects,
2500 resolver: &mut dyn LayoutResolver,
2501 get_latest_object_version: bool,
2502 ) -> IotaResult<Option<DynamicFieldInfo>> {
2503 let Some(move_object) = o.data.try_as_move().cloned() else {
2505 return Ok(None);
2506 };
2507
2508 if !move_object.type_().is_dynamic_field() {
2510 return Ok(None);
2511 }
2512
2513 let layout = resolver
2514 .get_annotated_layout(&move_object.type_().clone().into())?
2515 .into_layout();
2516
2517 let field =
2518 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2519 IotaError::ObjectDeserialization {
2520 error: e.to_string(),
2521 }
2522 })?;
2523
2524 let type_ = field.kind;
2525 let name_type: TypeTag = field.name_layout.into();
2526 let bcs_name = field.name_bytes.to_owned();
2527
2528 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2529 .map_err(|e| {
2530 warn!("{e}");
2531 IotaError::ObjectDeserialization {
2532 error: e.to_string(),
2533 }
2534 })?;
2535
2536 let name = DynamicFieldName {
2537 type_: name_type,
2538 value: IotaMoveValue::from(name_value).to_json_value(),
2539 };
2540
2541 let value_metadata = field.value_metadata().map_err(|e| {
2542 warn!("{e}");
2543 IotaError::ObjectDeserialization {
2544 error: e.to_string(),
2545 }
2546 })?;
2547
2548 Ok(Some(match value_metadata {
2549 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2550 name,
2551 bcs_name,
2552 type_,
2553 object_type: object_type.to_canonical_string(true),
2554 object_id: o.id(),
2555 version: o.version(),
2556 digest: o.digest(),
2557 },
2558
2559 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2560 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2565 (
2566 object.version(),
2567 object.digest(),
2568 object.data.type_().unwrap().clone(),
2569 )
2570 } else {
2571 let object = if get_latest_object_version {
2573 self.get_object_store()
2580 .try_get_object(&object_id)?
2581 .ok_or_else(|| UserInputError::ObjectNotFound {
2582 object_id,
2583 version: Some(o.version()),
2584 })?
2585 } else {
2586 self.get_object_store()
2588 .try_get_object_by_key(&object_id, o.version())?
2589 .ok_or_else(|| UserInputError::ObjectNotFound {
2590 object_id,
2591 version: Some(o.version()),
2592 })?
2593 };
2594
2595 (
2596 object.version(),
2597 object.digest(),
2598 object.data.type_().unwrap().clone(),
2599 )
2600 };
2601
2602 DynamicFieldInfo {
2603 name,
2604 bcs_name,
2605 type_,
2606 object_type: object_type.to_string(),
2607 object_id,
2608 version,
2609 digest,
2610 }
2611 }
2612 }))
2613 }
2614
2615 #[instrument(level = "trace", skip_all, err)]
2616 fn post_process_one_tx(
2617 &self,
2618 certificate: &VerifiedExecutableTransaction,
2619 effects: &TransactionEffects,
2620 inner_temporary_store: &InnerTemporaryStore,
2621 epoch_store: &Arc<AuthorityPerEpochStore>,
2622 ) -> IotaResult {
2623 if self.indexes.is_none() {
2624 return Ok(());
2625 }
2626
2627 let tx_digest = certificate.digest();
2628 let timestamp_ms = Self::unixtime_now_ms();
2629 let events = &inner_temporary_store.events;
2630 let written = &inner_temporary_store.written;
2631 let tx_coins =
2632 self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2633
2634 if let Some(indexes) = &self.indexes {
2636 let _ = self
2637 .index_tx(
2638 indexes.as_ref(),
2639 tx_digest,
2640 certificate,
2641 effects,
2642 events,
2643 timestamp_ms,
2644 tx_coins,
2645 written,
2646 inner_temporary_store,
2647 )
2648 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2649 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2650 .expect("Indexing tx should not fail");
2651
2652 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2653 let events = self.make_transaction_block_events(
2654 events.clone(),
2655 *tx_digest,
2656 timestamp_ms,
2657 epoch_store,
2658 inner_temporary_store,
2659 )?;
2660 self.subscription_handler
2662 .process_tx(certificate.data().transaction_data(), &effects, &events)
2663 .tap_ok(|_| {
2664 self.metrics
2665 .post_processing_total_tx_had_event_processed
2666 .inc()
2667 })
2668 .tap_err(|e| {
2669 warn!(
2670 ?tx_digest,
2671 "Post processing - Couldn't process events for tx: {}", e
2672 )
2673 })?;
2674
2675 self.metrics
2676 .post_processing_total_events_emitted
2677 .inc_by(events.data.len() as u64);
2678 };
2679 Ok(())
2680 }
2681
2682 fn make_transaction_block_events(
2683 &self,
2684 transaction_events: TransactionEvents,
2685 digest: TransactionDigest,
2686 timestamp_ms: u64,
2687 epoch_store: &Arc<AuthorityPerEpochStore>,
2688 inner_temporary_store: &InnerTemporaryStore,
2689 ) -> IotaResult<IotaTransactionBlockEvents> {
2690 let mut layout_resolver =
2691 epoch_store
2692 .executor()
2693 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2694 inner_temporary_store,
2695 self.get_backing_package_store(),
2696 )));
2697 IotaTransactionBlockEvents::try_from(
2698 transaction_events,
2699 digest,
2700 Some(timestamp_ms),
2701 layout_resolver.as_mut(),
2702 )
2703 }
2704
2705 pub fn unixtime_now_ms() -> u64 {
2706 let now = SystemTime::now()
2707 .duration_since(UNIX_EPOCH)
2708 .expect("Time went backwards")
2709 .as_millis();
2710 u64::try_from(now).expect("Travelling in time machine")
2711 }
2712
2713 #[instrument(level = "trace", skip_all)]
2714 pub async fn handle_transaction_info_request(
2715 &self,
2716 request: TransactionInfoRequest,
2717 ) -> IotaResult<TransactionInfoResponse> {
2718 let epoch_store = self.load_epoch_store_one_call_per_task();
2719 let (transaction, status) = self
2720 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2721 .ok_or(IotaError::TransactionNotFound {
2722 digest: request.transaction_digest,
2723 })?;
2724 Ok(TransactionInfoResponse {
2725 transaction,
2726 status,
2727 })
2728 }
2729
2730 #[instrument(level = "trace", skip_all)]
2731 pub async fn handle_object_info_request(
2732 &self,
2733 request: ObjectInfoRequest,
2734 ) -> IotaResult<ObjectInfoResponse> {
2735 let epoch_store = self.load_epoch_store_one_call_per_task();
2736
2737 let requested_object_seq = match request.request_kind {
2738 ObjectInfoRequestKind::LatestObjectInfo => {
2739 let (_, seq, _) = self
2740 .try_get_object_or_tombstone(request.object_id)
2741 .await?
2742 .ok_or_else(|| {
2743 IotaError::from(UserInputError::ObjectNotFound {
2744 object_id: request.object_id,
2745 version: None,
2746 })
2747 })?;
2748 seq
2749 }
2750 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2751 };
2752
2753 let object = self
2754 .get_object_store()
2755 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2756 .ok_or_else(|| {
2757 IotaError::from(UserInputError::ObjectNotFound {
2758 object_id: request.object_id,
2759 version: Some(requested_object_seq),
2760 })
2761 })?;
2762
2763 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2764 (request.generate_layout, object.data.try_as_move())
2765 {
2766 Some(into_struct_layout(
2767 self.load_epoch_store_one_call_per_task()
2768 .executor()
2769 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2770 .get_annotated_layout(&move_obj.type_().clone().into())?,
2771 )?)
2772 } else {
2773 None
2774 };
2775
2776 let lock = if !object.is_address_owned() {
2777 None
2779 } else {
2780 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2781 .await?
2782 .map(|s| s.into_inner())
2783 };
2784
2785 Ok(ObjectInfoResponse {
2786 object,
2787 layout,
2788 lock_for_debugging: lock,
2789 })
2790 }
2791
2792 #[instrument(level = "trace", skip_all)]
2793 pub fn handle_checkpoint_request(
2794 &self,
2795 request: &CheckpointRequest,
2796 ) -> IotaResult<CheckpointResponse> {
2797 let summary = if request.certified {
2798 let summary = match request.sequence_number {
2799 Some(seq) => self
2800 .checkpoint_store
2801 .get_checkpoint_by_sequence_number(seq)?,
2802 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2803 }
2804 .map(|v| v.into_inner());
2805 summary.map(CheckpointSummaryResponse::Certified)
2806 } else {
2807 let summary = match request.sequence_number {
2808 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2809 None => self
2810 .checkpoint_store
2811 .get_latest_locally_computed_checkpoint()?,
2812 };
2813 summary.map(CheckpointSummaryResponse::Pending)
2814 };
2815 let contents = match &summary {
2816 Some(s) => self
2817 .checkpoint_store
2818 .get_checkpoint_contents(&s.content_digest())?,
2819 None => None,
2820 };
2821 Ok(CheckpointResponse {
2822 checkpoint: summary,
2823 contents,
2824 })
2825 }
2826
2827 fn check_protocol_version(
2828 supported_protocol_versions: SupportedProtocolVersions,
2829 current_version: ProtocolVersion,
2830 ) {
2831 info!("current protocol version is now {:?}", current_version);
2832 info!("supported versions are: {:?}", supported_protocol_versions);
2833 if !supported_protocol_versions.is_version_supported(current_version) {
2834 let msg = format!(
2835 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2836 );
2837
2838 error!("{}", msg);
2839 eprintln!("{msg}");
2840
2841 #[cfg(not(msim))]
2842 std::process::exit(1);
2843
2844 #[cfg(msim)]
2845 iota_simulator::task::shutdown_current_node();
2846 }
2847 }
2848
2849 #[expect(clippy::disallowed_methods)] pub async fn new(
2851 name: AuthorityName,
2852 secret: StableSyncAuthoritySigner,
2853 supported_protocol_versions: SupportedProtocolVersions,
2854 store: Arc<AuthorityStore>,
2855 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2856 epoch_store: Arc<AuthorityPerEpochStore>,
2857 committee_store: Arc<CommitteeStore>,
2858 indexes: Option<Arc<IndexStore>>,
2859 rest_index: Option<Arc<RestIndexStore>>,
2860 checkpoint_store: Arc<CheckpointStore>,
2861 prometheus_registry: &Registry,
2862 genesis_objects: &[Object],
2863 db_checkpoint_config: &DBCheckpointConfig,
2864 config: NodeConfig,
2865 archive_readers: ArchiveReaderBalancer,
2866 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2867 chain_identifier: ChainIdentifier,
2868 pruner_db: Option<Arc<AuthorityPrunerTables>>,
2869 ) -> Arc<Self> {
2870 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2871
2872 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2873 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2874 let transaction_manager = Arc::new(TransactionManager::new(
2875 execution_cache_trait_pointers.object_cache_reader.clone(),
2876 execution_cache_trait_pointers
2877 .transaction_cache_reader
2878 .clone(),
2879 &epoch_store,
2880 tx_ready_certificates,
2881 metrics.clone(),
2882 ));
2883 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2884
2885 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2886 epoch_store.get_parent_path(),
2887 &config.authority_store_pruning_config,
2888 );
2889 let _pruner = AuthorityStorePruner::new(
2890 store.perpetual_tables.clone(),
2891 checkpoint_store.clone(),
2892 rest_index.clone(),
2893 config.authority_store_pruning_config.clone(),
2894 epoch_store.committee().authority_exists(&name),
2895 epoch_store.epoch_start_state().epoch_duration_ms(),
2896 prometheus_registry,
2897 archive_readers,
2898 pruner_db,
2899 );
2900 let input_loader =
2901 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2902 let epoch = epoch_store.epoch();
2903 let rgp = epoch_store.reference_gas_price();
2904 let state = Arc::new(AuthorityState {
2905 name,
2906 secret,
2907 execution_lock: RwLock::new(epoch),
2908 epoch_store: ArcSwap::new(epoch_store.clone()),
2909 input_loader,
2910 execution_cache_trait_pointers,
2911 indexes,
2912 rest_index,
2913 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2914 checkpoint_store,
2915 committee_store,
2916 transaction_manager,
2917 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2918 metrics,
2919 _pruner,
2920 _authority_per_epoch_pruner,
2921 db_checkpoint_config: db_checkpoint_config.clone(),
2922 config,
2923 overload_info: AuthorityOverloadInfo::default(),
2924 validator_tx_finalizer,
2925 chain_identifier,
2926 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
2927 });
2928
2929 let authority_state = Arc::downgrade(&state);
2931 spawn_monitored_task!(execution_process(
2932 authority_state,
2933 rx_ready_certificates,
2934 rx_execution_shutdown,
2935 ));
2936
2937 state
2939 .create_owner_index_if_empty(genesis_objects, &epoch_store)
2940 .expect("Error indexing genesis objects.");
2941
2942 state
2943 }
2944
2945 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2947 &self.execution_cache_trait_pointers.object_cache_reader
2948 }
2949
2950 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2951 &self.execution_cache_trait_pointers.transaction_cache_reader
2952 }
2953
2954 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2955 &self.execution_cache_trait_pointers.cache_writer
2956 }
2957
2958 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2959 &self.execution_cache_trait_pointers.backing_store
2960 }
2961
2962 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2963 &self.execution_cache_trait_pointers.backing_package_store
2964 }
2965
2966 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2967 &self.execution_cache_trait_pointers.object_store
2968 }
2969
2970 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2971 &self.execution_cache_trait_pointers.reconfig_api
2972 }
2973
2974 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2975 &self.execution_cache_trait_pointers.accumulator_store
2976 }
2977
2978 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2979 &self.execution_cache_trait_pointers.checkpoint_cache
2980 }
2981
2982 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2983 &self.execution_cache_trait_pointers.state_sync_store
2984 }
2985
2986 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2987 &self.execution_cache_trait_pointers.cache_commit
2988 }
2989
2990 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2991 self.execution_cache_trait_pointers
2992 .testing_api
2993 .database_for_testing()
2994 }
2995
2996 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2997 &self,
2998 config: NodeConfig,
2999 metrics: Arc<AuthorityStorePruningMetrics>,
3000 ) -> anyhow::Result<()> {
3001 let archive_readers =
3002 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3003 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3004 &self.database_for_testing().perpetual_tables,
3005 &self.checkpoint_store,
3006 self.rest_index.as_deref(),
3007 None,
3008 config.authority_store_pruning_config,
3009 metrics,
3010 archive_readers,
3011 EPOCH_DURATION_MS_FOR_TESTING,
3012 )
3013 .await
3014 }
3015
3016 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3017 &self.transaction_manager
3018 }
3019
3020 pub fn enqueue_transactions_for_execution(
3023 &self,
3024 txns: Vec<VerifiedExecutableTransaction>,
3025 epoch_store: &Arc<AuthorityPerEpochStore>,
3026 ) {
3027 self.transaction_manager.enqueue(txns, epoch_store)
3028 }
3029
3030 pub fn enqueue_certificates_for_execution(
3032 &self,
3033 certs: Vec<VerifiedCertificate>,
3034 epoch_store: &Arc<AuthorityPerEpochStore>,
3035 ) {
3036 self.transaction_manager
3037 .enqueue_certificates(certs, epoch_store)
3038 }
3039
3040 pub fn enqueue_with_expected_effects_digest(
3041 &self,
3042 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3043 epoch_store: &AuthorityPerEpochStore,
3044 ) {
3045 self.transaction_manager
3046 .enqueue_with_expected_effects_digest(certs, epoch_store)
3047 }
3048
3049 fn create_owner_index_if_empty(
3050 &self,
3051 genesis_objects: &[Object],
3052 epoch_store: &Arc<AuthorityPerEpochStore>,
3053 ) -> IotaResult {
3054 let Some(index_store) = &self.indexes else {
3055 return Ok(());
3056 };
3057 if !index_store.is_empty() {
3058 return Ok(());
3059 }
3060
3061 let mut new_owners = vec![];
3062 let mut new_dynamic_fields = vec![];
3063 let mut layout_resolver = epoch_store
3064 .executor()
3065 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3066 for o in genesis_objects.iter() {
3067 match o.owner {
3068 Owner::AddressOwner(addr) => new_owners.push((
3069 (addr, o.id()),
3070 ObjectInfo::new(&o.compute_object_reference(), o),
3071 )),
3072 Owner::ObjectOwner(object_id) => {
3073 let id = o.id();
3074 let Some(info) = self.try_create_dynamic_field_info(
3075 o,
3076 &BTreeMap::new(),
3077 layout_resolver.as_mut(),
3078 true,
3079 )?
3080 else {
3081 continue;
3082 };
3083 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3084 }
3085 _ => {}
3086 }
3087 }
3088
3089 index_store.insert_genesis_objects(ObjectIndexChanges {
3090 deleted_owners: vec![],
3091 deleted_dynamic_fields: vec![],
3092 new_owners,
3093 new_dynamic_fields,
3094 })
3095 }
3096
3097 pub fn execution_lock_for_executable_transaction(
3101 &self,
3102 transaction: &VerifiedExecutableTransaction,
3103 ) -> IotaResult<ExecutionLockReadGuard> {
3104 let lock = self
3105 .execution_lock
3106 .try_read()
3107 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3108 if *lock == transaction.auth_sig().epoch() {
3109 Ok(lock)
3110 } else {
3111 Err(IotaError::WrongEpoch {
3112 expected_epoch: *lock,
3113 actual_epoch: transaction.auth_sig().epoch(),
3114 })
3115 }
3116 }
3117
3118 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard> {
3124 self.execution_lock
3125 .try_read()
3126 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3127 }
3128
3129 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard {
3130 self.execution_lock.write().await
3131 }
3132
3133 #[instrument(level = "error", skip_all)]
3134 pub async fn reconfigure(
3135 &self,
3136 cur_epoch_store: &AuthorityPerEpochStore,
3137 supported_protocol_versions: SupportedProtocolVersions,
3138 new_committee: Committee,
3139 epoch_start_configuration: EpochStartConfiguration,
3140 accumulator: Arc<StateAccumulator>,
3141 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3142 epoch_supply_change: i64,
3143 epoch_last_checkpoint: CheckpointSequenceNumber,
3144 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3145 Self::check_protocol_version(
3146 supported_protocol_versions,
3147 epoch_start_configuration
3148 .epoch_start_state()
3149 .protocol_version(),
3150 );
3151 self.metrics.reset_on_reconfigure();
3152 self.committee_store.insert_new_committee(&new_committee)?;
3153
3154 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3156
3157 cur_epoch_store.epoch_terminated().await;
3159
3160 let highest_locally_built_checkpoint_seq = self
3161 .checkpoint_store
3162 .get_latest_locally_computed_checkpoint()?
3163 .map(|c| *c.sequence_number())
3164 .unwrap_or(0);
3165
3166 assert!(
3167 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3168 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3169 );
3170 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3171 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3175 if num_shared_version_assignments > 1 {
3178 debug_fatal!(
3180 "all shared_version_assignments should have been removed \
3181 (num_shared_version_assignments: {num_shared_version_assignments})"
3182 );
3183 }
3184 }
3185
3186 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3192 .await?;
3193 self.get_reconfig_api()
3194 .clear_state_end_of_epoch(&execution_lock);
3195 self.check_system_consistency(
3196 cur_epoch_store,
3197 accumulator,
3198 expensive_safety_check_config,
3199 epoch_supply_change,
3200 )?;
3201 self.get_reconfig_api()
3202 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3203 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3204 if self
3205 .db_checkpoint_config
3206 .perform_db_checkpoints_at_epoch_end
3207 {
3208 let checkpoint_indexes = self
3209 .db_checkpoint_config
3210 .perform_index_db_checkpoints_at_epoch_end
3211 .unwrap_or(false);
3212 let current_epoch = cur_epoch_store.epoch();
3213 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3214 self.checkpoint_all_dbs(
3215 &epoch_checkpoint_path,
3216 cur_epoch_store,
3217 checkpoint_indexes,
3218 )?;
3219 }
3220 }
3221
3222 self.get_reconfig_api()
3223 .reconfigure_cache(&epoch_start_configuration)
3224 .await;
3225
3226 let new_epoch = new_committee.epoch;
3227 let new_epoch_store = self
3228 .reopen_epoch_db(
3229 cur_epoch_store,
3230 new_committee,
3231 epoch_start_configuration,
3232 expensive_safety_check_config,
3233 epoch_last_checkpoint,
3234 )
3235 .await?;
3236 assert_eq!(new_epoch_store.epoch(), new_epoch);
3237 self.transaction_manager.reconfigure(new_epoch);
3238 *execution_lock = new_epoch;
3239 Ok(new_epoch_store)
3243 }
3244
3245 pub async fn reconfigure_for_testing(&self) {
3250 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3251 let epoch_store = self.epoch_store_for_testing().clone();
3252 let protocol_config = epoch_store.protocol_config().clone();
3253 let _guard =
3261 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3262 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3263 self.get_backing_package_store().clone(),
3264 self.get_object_store().clone(),
3265 &self.config.expensive_safety_check_config,
3266 self.checkpoint_store
3267 .get_epoch_last_checkpoint(epoch_store.epoch())
3268 .unwrap()
3269 .map(|c| *c.sequence_number())
3270 .unwrap_or_default(),
3271 );
3272 let new_epoch = new_epoch_store.epoch();
3273 self.transaction_manager.reconfigure(new_epoch);
3274 self.epoch_store.store(new_epoch_store);
3275 epoch_store.epoch_terminated().await;
3276 *execution_lock = new_epoch;
3277 }
3278
3279 #[instrument(level = "error", skip_all)]
3280 fn check_system_consistency(
3281 &self,
3282 cur_epoch_store: &AuthorityPerEpochStore,
3283 accumulator: Arc<StateAccumulator>,
3284 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3285 epoch_supply_change: i64,
3286 ) -> IotaResult<()> {
3287 info!(
3288 "Performing iota conservation consistency check for epoch {}",
3289 cur_epoch_store.epoch()
3290 );
3291
3292 if cfg!(debug_assertions) {
3293 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3294 }
3295
3296 self.get_reconfig_api()
3297 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3298
3299 if expensive_safety_check_config.enable_state_consistency_check() {
3301 info!(
3302 "Performing state consistency check for epoch {}",
3303 cur_epoch_store.epoch()
3304 );
3305 self.expensive_check_is_consistent_state(
3306 accumulator,
3307 cur_epoch_store,
3308 cfg!(debug_assertions), );
3310 }
3311
3312 if expensive_safety_check_config.enable_secondary_index_checks() {
3313 if let Some(indexes) = self.indexes.clone() {
3314 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3315 .expect("secondary indexes are inconsistent");
3316 }
3317 }
3318
3319 Ok(())
3320 }
3321
3322 fn expensive_check_is_consistent_state(
3323 &self,
3324 accumulator: Arc<StateAccumulator>,
3325 cur_epoch_store: &AuthorityPerEpochStore,
3326 panic: bool,
3327 ) {
3328 let live_object_set_hash = accumulator.digest_live_object_set();
3329
3330 let root_state_hash: ECMHLiveObjectSetDigest = self
3331 .get_accumulator_store()
3332 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3333 .expect("Retrieving root state hash cannot fail")
3334 .expect("Root state hash for epoch must exist")
3335 .1
3336 .digest()
3337 .into();
3338
3339 let is_inconsistent = root_state_hash != live_object_set_hash;
3340 if is_inconsistent {
3341 if panic {
3342 panic!(
3343 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3344 );
3345 } else {
3346 error!(
3347 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3348 root_state_hash, live_object_set_hash
3349 );
3350 }
3351 } else {
3352 info!("State consistency check passed");
3353 }
3354
3355 if !panic {
3356 accumulator.set_inconsistent_state(is_inconsistent);
3357 }
3358 }
3359
3360 pub fn current_epoch_for_testing(&self) -> EpochId {
3361 self.epoch_store_for_testing().epoch()
3362 }
3363
3364 #[instrument(level = "error", skip_all)]
3365 pub fn checkpoint_all_dbs(
3366 &self,
3367 checkpoint_path: &Path,
3368 cur_epoch_store: &AuthorityPerEpochStore,
3369 checkpoint_indexes: bool,
3370 ) -> IotaResult {
3371 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3372 let current_epoch = cur_epoch_store.epoch();
3373
3374 if checkpoint_path.exists() {
3375 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3376 return Ok(());
3377 }
3378
3379 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3380 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3381
3382 if checkpoint_path_tmp.exists() {
3383 fs::remove_dir_all(&checkpoint_path_tmp)
3384 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3385 }
3386
3387 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3388 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3389
3390 self.checkpoint_store
3393 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3394
3395 self.get_reconfig_api()
3396 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3397
3398 self.committee_store
3399 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3400
3401 if checkpoint_indexes {
3402 if let Some(indexes) = self.indexes.as_ref() {
3403 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3404 }
3405 }
3406
3407 fs::rename(checkpoint_path_tmp, checkpoint_path)
3408 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3409 Ok(())
3410 }
3411
3412 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3419 self.epoch_store.load()
3420 }
3421
3422 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3424 self.load_epoch_store_one_call_per_task()
3425 }
3426
3427 pub fn clone_committee_for_testing(&self) -> Committee {
3428 Committee::clone(self.epoch_store_for_testing().committee())
3429 }
3430
3431 #[instrument(level = "trace", skip_all)]
3432 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3433 self.get_object_store()
3434 .try_get_object(object_id)
3435 .map_err(Into::into)
3436 }
3437
3438 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3440 self.try_get_object(object_id)
3441 .await
3442 .expect("storage access failed")
3443 }
3444
3445 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3446 Ok(self
3447 .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3448 .await?
3449 .expect("framework object should always exist")
3450 .compute_object_reference())
3451 }
3452
3453 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3455 self.get_object_cache_reader()
3456 .try_get_iota_system_state_object_unsafe()
3457 }
3458
3459 #[instrument(level = "trace", skip_all)]
3460 pub fn get_checkpoint_by_sequence_number(
3461 &self,
3462 sequence_number: CheckpointSequenceNumber,
3463 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3464 Ok(self
3465 .checkpoint_store
3466 .get_checkpoint_by_sequence_number(sequence_number)?)
3467 }
3468
3469 #[instrument(level = "trace", skip_all)]
3470 pub fn get_transaction_checkpoint_for_tests(
3471 &self,
3472 digest: &TransactionDigest,
3473 epoch_store: &AuthorityPerEpochStore,
3474 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3475 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3476 let Some(checkpoint) = checkpoint else {
3477 return Ok(None);
3478 };
3479 let checkpoint = self
3480 .checkpoint_store
3481 .get_checkpoint_by_sequence_number(checkpoint)?;
3482 Ok(checkpoint)
3483 }
3484
3485 #[instrument(level = "trace", skip_all)]
3486 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3487 Ok(
3488 match self
3489 .get_object_cache_reader()
3490 .try_get_latest_object_or_tombstone(*object_id)?
3491 {
3492 Some((_, ObjectOrTombstone::Object(object))) => {
3493 let layout = self.get_object_layout(&object)?;
3494 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3495 }
3496 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3497 None => ObjectRead::NotExists(*object_id),
3498 },
3499 )
3500 }
3501
3502 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3504 self.chain_identifier
3505 }
3506
3507 #[instrument(level = "trace", skip_all)]
3508 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3509 where
3510 T: DeserializeOwned,
3511 {
3512 let o = self.get_object_read(object_id)?.into_object()?;
3513 if let Some(move_object) = o.data.try_as_move() {
3514 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3515 IotaError::ObjectDeserialization {
3516 error: format!("{e}"),
3517 }
3518 })?)
3519 } else {
3520 Err(IotaError::ObjectDeserialization {
3521 error: format!("Provided object : [{object_id}] is not a Move object."),
3522 })
3523 }
3524 }
3525
3526 #[instrument(level = "trace", skip_all)]
3532 pub fn get_past_object_read(
3533 &self,
3534 object_id: &ObjectID,
3535 version: SequenceNumber,
3536 ) -> IotaResult<PastObjectRead> {
3537 let Some(obj_ref) = self
3539 .get_object_cache_reader()
3540 .try_get_latest_object_ref_or_tombstone(*object_id)?
3541 else {
3542 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3543 };
3544
3545 if version > obj_ref.1 {
3546 return Ok(PastObjectRead::VersionTooHigh {
3547 object_id: *object_id,
3548 asked_version: version,
3549 latest_version: obj_ref.1,
3550 });
3551 }
3552
3553 if version < obj_ref.1 {
3554 return Ok(match self.read_object_at_version(object_id, version)? {
3556 Some((object, layout)) => {
3557 let obj_ref = object.compute_object_reference();
3558 PastObjectRead::VersionFound(obj_ref, object, layout)
3559 }
3560
3561 None => PastObjectRead::VersionNotFound(*object_id, version),
3562 });
3563 }
3564
3565 if !obj_ref.2.is_alive() {
3566 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3567 }
3568
3569 match self.read_object_at_version(object_id, obj_ref.1)? {
3570 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3571 None => {
3572 error!(
3573 "Object with in parent_entry is missing from object store, datastore is \
3574 inconsistent",
3575 );
3576 Err(UserInputError::ObjectNotFound {
3577 object_id: *object_id,
3578 version: Some(obj_ref.1),
3579 }
3580 .into())
3581 }
3582 }
3583 }
3584
3585 #[instrument(level = "trace", skip_all)]
3586 fn read_object_at_version(
3587 &self,
3588 object_id: &ObjectID,
3589 version: SequenceNumber,
3590 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3591 let Some(object) = self
3592 .get_object_cache_reader()
3593 .try_get_object_by_key(object_id, version)?
3594 else {
3595 return Ok(None);
3596 };
3597
3598 let layout = self.get_object_layout(&object)?;
3599 Ok(Some((object, layout)))
3600 }
3601
3602 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3603 let layout = object
3604 .data
3605 .try_as_move()
3606 .map(|object| {
3607 into_struct_layout(
3608 self.load_epoch_store_one_call_per_task()
3609 .executor()
3610 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3612 .get_annotated_layout(&object.type_().clone().into())?,
3613 )
3614 })
3615 .transpose()?;
3616 Ok(layout)
3617 }
3618
3619 fn get_owner_at_version(
3620 &self,
3621 object_id: &ObjectID,
3622 version: SequenceNumber,
3623 ) -> IotaResult<Owner> {
3624 self.get_object_store()
3625 .try_get_object_by_key(object_id, version)?
3626 .ok_or_else(|| {
3627 IotaError::from(UserInputError::ObjectNotFound {
3628 object_id: *object_id,
3629 version: Some(version),
3630 })
3631 })
3632 .map(|o| o.owner)
3633 }
3634
3635 #[instrument(level = "trace", skip_all)]
3636 pub fn get_owner_objects(
3637 &self,
3638 owner: IotaAddress,
3639 cursor: Option<ObjectID>,
3641 limit: usize,
3642 filter: Option<IotaObjectDataFilter>,
3643 ) -> IotaResult<Vec<ObjectInfo>> {
3644 if let Some(indexes) = &self.indexes {
3645 indexes.get_owner_objects(owner, cursor, limit, filter)
3646 } else {
3647 Err(IotaError::IndexStoreNotAvailable)
3648 }
3649 }
3650
3651 #[instrument(level = "trace", skip_all)]
3652 pub fn get_owned_coins_iterator_with_cursor(
3653 &self,
3654 owner: IotaAddress,
3655 cursor: (String, ObjectID),
3657 limit: usize,
3658 one_coin_type_only: bool,
3659 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3660 if let Some(indexes) = &self.indexes {
3661 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3662 } else {
3663 Err(IotaError::IndexStoreNotAvailable)
3664 }
3665 }
3666
3667 #[instrument(level = "trace", skip_all)]
3668 pub fn get_owner_objects_iterator(
3669 &self,
3670 owner: IotaAddress,
3671 cursor: Option<ObjectID>,
3673 filter: Option<IotaObjectDataFilter>,
3674 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3675 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3676 if let Some(indexes) = &self.indexes {
3677 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3678 } else {
3679 Err(IotaError::IndexStoreNotAvailable)
3680 }
3681 }
3682
3683 #[instrument(level = "trace", skip_all)]
3684 pub async fn get_move_objects<T>(
3685 &self,
3686 owner: IotaAddress,
3687 type_: MoveObjectType,
3688 ) -> IotaResult<Vec<T>>
3689 where
3690 T: DeserializeOwned,
3691 {
3692 let object_ids = self
3693 .get_owner_objects_iterator(owner, None, None)?
3694 .filter(|o| match &o.type_ {
3695 ObjectType::Struct(s) => &type_ == s,
3696 ObjectType::Package => false,
3697 })
3698 .map(|info| ObjectKey(info.object_id, info.version))
3699 .collect::<Vec<_>>();
3700 let mut move_objects = vec![];
3701
3702 let objects = self
3703 .get_object_store()
3704 .try_multi_get_objects_by_key(&object_ids)?;
3705
3706 for (o, id) in objects.into_iter().zip(object_ids) {
3707 let object = o.ok_or_else(|| {
3708 IotaError::from(UserInputError::ObjectNotFound {
3709 object_id: id.0,
3710 version: Some(id.1),
3711 })
3712 })?;
3713 let move_object = object.data.try_as_move().ok_or_else(|| {
3714 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3715 })?;
3716 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3717 IotaError::ObjectDeserialization {
3718 error: format!("{e}"),
3719 }
3720 })?);
3721 }
3722 Ok(move_objects)
3723 }
3724
3725 #[instrument(level = "trace", skip_all)]
3726 pub fn get_dynamic_fields(
3727 &self,
3728 owner: ObjectID,
3729 cursor: Option<ObjectID>,
3731 limit: usize,
3732 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3733 Ok(self
3734 .get_dynamic_fields_iterator(owner, cursor)?
3735 .take(limit)
3736 .collect::<Result<Vec<_>, _>>()?)
3737 }
3738
3739 fn get_dynamic_fields_iterator(
3740 &self,
3741 owner: ObjectID,
3742 cursor: Option<ObjectID>,
3744 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3745 {
3746 if let Some(indexes) = &self.indexes {
3747 indexes.get_dynamic_fields_iterator(owner, cursor)
3748 } else {
3749 Err(IotaError::IndexStoreNotAvailable)
3750 }
3751 }
3752
3753 #[instrument(level = "trace", skip_all)]
3754 pub fn get_dynamic_field_object_id(
3755 &self,
3756 owner: ObjectID,
3757 name_type: TypeTag,
3758 name_bcs_bytes: &[u8],
3759 ) -> IotaResult<Option<ObjectID>> {
3760 if let Some(indexes) = &self.indexes {
3761 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3762 } else {
3763 Err(IotaError::IndexStoreNotAvailable)
3764 }
3765 }
3766
3767 #[instrument(level = "trace", skip_all)]
3768 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3769 Ok(self.get_indexes()?.next_sequence_number())
3770 }
3771
3772 #[instrument(level = "trace", skip_all)]
3773 pub async fn get_executed_transaction_and_effects(
3774 &self,
3775 digest: TransactionDigest,
3776 kv_store: Arc<TransactionKeyValueStore>,
3777 ) -> IotaResult<(Transaction, TransactionEffects)> {
3778 let transaction = kv_store.get_tx(digest).await?;
3779 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3780 Ok((transaction, effects))
3781 }
3782
3783 #[instrument(level = "trace", skip_all)]
3784 pub fn multi_get_checkpoint_by_sequence_number(
3785 &self,
3786 sequence_numbers: &[CheckpointSequenceNumber],
3787 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3788 Ok(self
3789 .checkpoint_store
3790 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3791 }
3792
3793 #[instrument(level = "trace", skip_all)]
3794 pub fn get_transaction_events(
3795 &self,
3796 digest: &TransactionEventsDigest,
3797 ) -> IotaResult<TransactionEvents> {
3798 self.get_transaction_cache_reader()
3799 .try_get_events(digest)?
3800 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3801 }
3802
3803 pub fn get_transaction_input_objects(
3804 &self,
3805 effects: &TransactionEffects,
3806 ) -> anyhow::Result<Vec<Object>> {
3807 let input_object_keys = effects
3808 .modified_at_versions()
3809 .into_iter()
3810 .map(|(object_id, version)| ObjectKey(object_id, version))
3811 .collect::<Vec<_>>();
3812
3813 let input_objects = self
3814 .get_object_store()
3815 .try_multi_get_objects_by_key(&input_object_keys)?
3816 .into_iter()
3817 .enumerate()
3818 .map(|(idx, maybe_object)| {
3819 maybe_object.ok_or_else(|| {
3820 anyhow::anyhow!(
3821 "missing input object key {:?} from tx {}",
3822 input_object_keys[idx],
3823 effects.transaction_digest()
3824 )
3825 })
3826 })
3827 .collect::<anyhow::Result<Vec<_>>>()?;
3828 Ok(input_objects)
3829 }
3830
3831 pub fn get_transaction_output_objects(
3832 &self,
3833 effects: &TransactionEffects,
3834 ) -> anyhow::Result<Vec<Object>> {
3835 let output_object_keys = effects
3836 .all_changed_objects()
3837 .into_iter()
3838 .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3839 .collect::<Vec<_>>();
3840
3841 let output_objects = self
3842 .get_object_store()
3843 .try_multi_get_objects_by_key(&output_object_keys)?
3844 .into_iter()
3845 .enumerate()
3846 .map(|(idx, maybe_object)| {
3847 maybe_object.ok_or_else(|| {
3848 anyhow::anyhow!(
3849 "missing output object key {:?} from tx {}",
3850 output_object_keys[idx],
3851 effects.transaction_digest()
3852 )
3853 })
3854 })
3855 .collect::<anyhow::Result<Vec<_>>>()?;
3856 Ok(output_objects)
3857 }
3858
3859 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3860 match &self.indexes {
3861 Some(i) => Ok(i.clone()),
3862 None => Err(IotaError::UnsupportedFeature {
3863 error: "extended object indexing is not enabled on this server".into(),
3864 }),
3865 }
3866 }
3867
3868 pub async fn get_transactions_for_tests(
3869 self: &Arc<Self>,
3870 filter: Option<TransactionFilter>,
3871 cursor: Option<TransactionDigest>,
3872 limit: Option<usize>,
3873 reverse: bool,
3874 ) -> IotaResult<Vec<TransactionDigest>> {
3875 let metrics = KeyValueStoreMetrics::new_for_tests();
3876 let kv_store = Arc::new(TransactionKeyValueStore::new(
3877 "rocksdb",
3878 metrics,
3879 self.clone(),
3880 ));
3881 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3882 .await
3883 }
3884
3885 #[instrument(level = "trace", skip_all)]
3886 pub async fn get_transactions(
3887 &self,
3888 kv_store: &Arc<TransactionKeyValueStore>,
3889 filter: Option<TransactionFilter>,
3890 cursor: Option<TransactionDigest>,
3892 limit: Option<usize>,
3893 reverse: bool,
3894 ) -> IotaResult<Vec<TransactionDigest>> {
3895 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3896 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3897 let iter = checkpoint_contents.iter().map(|c| c.transaction);
3898 if reverse {
3899 let iter = iter
3900 .rev()
3901 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3902 .skip(usize::from(cursor.is_some()));
3903 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3904 } else {
3905 let iter = iter
3906 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3907 .skip(usize::from(cursor.is_some()));
3908 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3909 }
3910 }
3911 self.get_indexes()?
3912 .get_transactions(filter, cursor, limit, reverse)
3913 }
3914
3915 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3916 &self.checkpoint_store
3917 }
3918
3919 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3920 self.get_checkpoint_store()
3921 .get_highest_executed_checkpoint_seq_number()?
3922 .ok_or(IotaError::UserInput {
3923 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3924 })
3925 }
3926
3927 #[cfg(msim)]
3928 pub fn get_highest_pruned_checkpoint_for_testing(
3929 &self,
3930 ) -> IotaResult<CheckpointSequenceNumber> {
3931 self.database_for_testing()
3932 .perpetual_tables
3933 .get_highest_pruned_checkpoint()
3934 }
3935
3936 #[instrument(level = "trace", skip_all)]
3937 pub fn get_checkpoint_summary_by_sequence_number(
3938 &self,
3939 sequence_number: CheckpointSequenceNumber,
3940 ) -> IotaResult<CheckpointSummary> {
3941 let verified_checkpoint = self
3942 .get_checkpoint_store()
3943 .get_checkpoint_by_sequence_number(sequence_number)?;
3944 match verified_checkpoint {
3945 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3946 None => Err(IotaError::UserInput {
3947 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3948 }),
3949 }
3950 }
3951
3952 #[instrument(level = "trace", skip_all)]
3953 pub fn get_checkpoint_summary_by_digest(
3954 &self,
3955 digest: CheckpointDigest,
3956 ) -> IotaResult<CheckpointSummary> {
3957 let verified_checkpoint = self
3958 .get_checkpoint_store()
3959 .get_checkpoint_by_digest(&digest)?;
3960 match verified_checkpoint {
3961 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3962 None => Err(IotaError::UserInput {
3963 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3964 }),
3965 }
3966 }
3967
3968 #[instrument(level = "trace", skip_all)]
3969 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3970 if is_system_package(package_id) {
3971 return self.find_genesis_txn_digest();
3972 }
3973 Ok(self
3974 .get_object_read(&package_id)?
3975 .into_object()?
3976 .previous_transaction)
3977 }
3978
3979 #[instrument(level = "trace", skip_all)]
3980 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3981 let summary = self
3982 .get_verified_checkpoint_by_sequence_number(0)?
3983 .into_message();
3984 let content = self.get_checkpoint_contents(summary.content_digest)?;
3985 let genesis_transaction = content.enumerate_transactions(&summary).next();
3986 Ok(genesis_transaction
3987 .ok_or(IotaError::UserInput {
3988 error: UserInputError::GenesisTransactionNotFound,
3989 })?
3990 .1
3991 .transaction)
3992 }
3993
3994 #[instrument(level = "trace", skip_all)]
3995 pub fn get_verified_checkpoint_by_sequence_number(
3996 &self,
3997 sequence_number: CheckpointSequenceNumber,
3998 ) -> IotaResult<VerifiedCheckpoint> {
3999 let verified_checkpoint = self
4000 .get_checkpoint_store()
4001 .get_checkpoint_by_sequence_number(sequence_number)?;
4002 match verified_checkpoint {
4003 Some(verified_checkpoint) => Ok(verified_checkpoint),
4004 None => Err(IotaError::UserInput {
4005 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4006 }),
4007 }
4008 }
4009
4010 #[instrument(level = "trace", skip_all)]
4011 pub fn get_verified_checkpoint_summary_by_digest(
4012 &self,
4013 digest: CheckpointDigest,
4014 ) -> IotaResult<VerifiedCheckpoint> {
4015 let verified_checkpoint = self
4016 .get_checkpoint_store()
4017 .get_checkpoint_by_digest(&digest)?;
4018 match verified_checkpoint {
4019 Some(verified_checkpoint) => Ok(verified_checkpoint),
4020 None => Err(IotaError::UserInput {
4021 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4022 }),
4023 }
4024 }
4025
4026 #[instrument(level = "trace", skip_all)]
4027 pub fn get_checkpoint_contents(
4028 &self,
4029 digest: CheckpointContentsDigest,
4030 ) -> IotaResult<CheckpointContents> {
4031 self.get_checkpoint_store()
4032 .get_checkpoint_contents(&digest)?
4033 .ok_or(IotaError::UserInput {
4034 error: UserInputError::CheckpointContentsNotFound(digest),
4035 })
4036 }
4037
4038 #[instrument(level = "trace", skip_all)]
4039 pub fn get_checkpoint_contents_by_sequence_number(
4040 &self,
4041 sequence_number: CheckpointSequenceNumber,
4042 ) -> IotaResult<CheckpointContents> {
4043 let verified_checkpoint = self
4044 .get_checkpoint_store()
4045 .get_checkpoint_by_sequence_number(sequence_number)?;
4046 match verified_checkpoint {
4047 Some(verified_checkpoint) => {
4048 let content_digest = verified_checkpoint.into_inner().content_digest;
4049 self.get_checkpoint_contents(content_digest)
4050 }
4051 None => Err(IotaError::UserInput {
4052 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4053 }),
4054 }
4055 }
4056
4057 #[instrument(level = "trace", skip_all)]
4058 pub async fn query_events(
4059 &self,
4060 kv_store: &Arc<TransactionKeyValueStore>,
4061 query: EventFilter,
4062 cursor: Option<EventID>,
4064 limit: usize,
4065 descending: bool,
4066 ) -> IotaResult<Vec<IotaEvent>> {
4067 let index_store = self.get_indexes()?;
4068
4069 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4071 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4072 IotaError::TransactionNotFound {
4073 digest: cursor.tx_digest,
4074 },
4075 )?;
4076 (tx_seq, cursor.event_seq as usize)
4077 } else if descending {
4078 (u64::MAX, usize::MAX)
4079 } else {
4080 (0, 0)
4081 };
4082
4083 let limit = limit + 1;
4084 let mut event_keys = match query {
4085 EventFilter::All(filters) => {
4086 if filters.is_empty() {
4087 index_store.all_events(tx_num, event_num, limit, descending)?
4088 } else {
4089 return Err(IotaError::UserInput {
4090 error: UserInputError::Unsupported(
4091 "This query type does not currently support filter combinations"
4092 .to_string(),
4093 ),
4094 });
4095 }
4096 }
4097 EventFilter::Transaction(digest) => {
4098 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4099 }
4100 EventFilter::MoveModule { package, module } => {
4101 let module_id = ModuleId::new(package.into(), module);
4102 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4103 }
4104 EventFilter::MoveEventType(struct_name) => index_store
4105 .events_by_move_event_struct_name(
4106 &struct_name,
4107 tx_num,
4108 event_num,
4109 limit,
4110 descending,
4111 )?,
4112 EventFilter::Sender(sender) => {
4113 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4114 }
4115 EventFilter::TimeRange {
4116 start_time,
4117 end_time,
4118 } => index_store
4119 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4120 EventFilter::MoveEventModule { package, module } => index_store
4121 .events_by_move_event_module(
4122 &ModuleId::new(package.into(), module),
4123 tx_num,
4124 event_num,
4125 limit,
4126 descending,
4127 )?,
4128 EventFilter::Package(_)
4130 | EventFilter::MoveEventField { .. }
4131 | EventFilter::Any(_)
4132 | EventFilter::And(_, _)
4133 | EventFilter::Or(_, _) => {
4134 return Err(IotaError::UserInput {
4135 error: UserInputError::Unsupported(
4136 "This query type is not supported by the full node.".to_string(),
4137 ),
4138 });
4139 }
4140 };
4141
4142 if cursor.is_some() {
4145 if !event_keys.is_empty() {
4146 event_keys.remove(0);
4147 }
4148 } else {
4149 event_keys.truncate(limit - 1);
4150 }
4151
4152 let transaction_digests = event_keys
4154 .iter()
4155 .map(|(_, digest, _, _)| *digest)
4156 .collect::<HashSet<_>>()
4157 .into_iter()
4158 .collect::<Vec<_>>();
4159
4160 let events = kv_store
4161 .multi_get_events_by_tx_digests(&transaction_digests)
4162 .await?;
4163
4164 let events_map: HashMap<_, _> =
4165 transaction_digests.iter().zip(events.into_iter()).collect();
4166
4167 let stored_events = event_keys
4168 .into_iter()
4169 .map(|k| {
4170 (
4171 k,
4172 events_map
4173 .get(&k.1)
4174 .expect("fetched digest is missing")
4175 .clone()
4176 .and_then(|e| e.data.get(k.2).cloned()),
4177 )
4178 })
4179 .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4180 event
4181 .map(|e| (e, tx_digest, event_seq, timestamp))
4182 .ok_or(IotaError::TransactionEventsNotFound { digest })
4183 })
4184 .collect::<Result<Vec<_>, _>>()?;
4185
4186 let epoch_store = self.load_epoch_store_one_call_per_task();
4187 let backing_store = self.get_backing_package_store().as_ref();
4188 let mut layout_resolver = epoch_store
4189 .executor()
4190 .type_layout_resolver(Box::new(backing_store));
4191 let mut events = vec![];
4192 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4193 events.push(IotaEvent::try_from(
4194 e.clone(),
4195 tx_digest,
4196 event_seq as u64,
4197 Some(timestamp),
4198 layout_resolver.get_annotated_layout(&e.type_)?,
4199 )?)
4200 }
4201 Ok(events)
4202 }
4203
4204 pub async fn insert_genesis_object(&self, object: Object) {
4205 self.get_reconfig_api()
4206 .try_insert_genesis_object(object)
4207 .expect("Cannot insert genesis object")
4208 }
4209
4210 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4211 futures::future::join_all(
4212 objects
4213 .iter()
4214 .map(|o| self.insert_genesis_object(o.clone())),
4215 )
4216 .await;
4217 }
4218
4219 #[instrument(level = "trace", skip_all)]
4221 pub fn get_transaction_status(
4222 &self,
4223 transaction_digest: &TransactionDigest,
4224 epoch_store: &Arc<AuthorityPerEpochStore>,
4225 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4226 if let Some(effects) =
4228 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4229 {
4230 if let Some(transaction) = self
4231 .get_transaction_cache_reader()
4232 .try_get_transaction_block(transaction_digest)?
4233 {
4234 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4235 let events = if let Some(digest) = effects.events_digest() {
4236 self.get_transaction_events(digest)?
4237 } else {
4238 TransactionEvents::default()
4239 };
4240 return Ok(Some((
4241 (*transaction).clone().into_message(),
4242 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4243 )));
4244 } else {
4245 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4250 }
4251 }
4252 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4253 self.metrics.tx_already_processed.inc();
4254 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4255 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4256 } else {
4257 Ok(None)
4258 }
4259 }
4260
4261 #[instrument(level = "trace", skip_all)]
4265 pub fn get_signed_effects_and_maybe_resign(
4266 &self,
4267 transaction_digest: &TransactionDigest,
4268 epoch_store: &Arc<AuthorityPerEpochStore>,
4269 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4270 let effects = self
4271 .get_transaction_cache_reader()
4272 .try_get_executed_effects(transaction_digest)?;
4273 match effects {
4274 Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4275 None => Ok(None),
4276 }
4277 }
4278
4279 #[instrument(level = "trace", skip_all)]
4280 pub(crate) fn sign_effects(
4281 &self,
4282 effects: TransactionEffects,
4283 epoch_store: &Arc<AuthorityPerEpochStore>,
4284 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4285 let tx_digest = *effects.transaction_digest();
4286 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4287 Some(sig) if sig.epoch == epoch_store.epoch() => {
4288 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4289 }
4290 _ => {
4291 debug!(
4314 ?tx_digest,
4315 epoch=?epoch_store.epoch(),
4316 "Re-signing the effects with the current epoch"
4317 );
4318
4319 let sig = AuthoritySignInfo::new(
4320 epoch_store.epoch(),
4321 &effects,
4322 Intent::iota_app(IntentScope::TransactionEffects),
4323 self.name,
4324 &*self.secret,
4325 );
4326
4327 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4328
4329 epoch_store.insert_effects_digest_and_signature(
4330 &tx_digest,
4331 effects.digest(),
4332 &sig,
4333 )?;
4334
4335 effects
4336 }
4337 };
4338
4339 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4340 signed_effects,
4341 ))
4342 }
4343
4344 #[instrument(level = "trace", skip_all)]
4346 fn fullnode_only_get_tx_coins_for_indexing(
4347 &self,
4348 inner_temporary_store: &InnerTemporaryStore,
4349 epoch_store: &Arc<AuthorityPerEpochStore>,
4350 ) -> Option<TxCoins> {
4351 if self.indexes.is_none() || self.is_validator(epoch_store) {
4352 return None;
4353 }
4354 let written_coin_objects = inner_temporary_store
4355 .written
4356 .iter()
4357 .filter_map(|(k, v)| {
4358 if v.is_coin() {
4359 Some((*k, v.clone()))
4360 } else {
4361 None
4362 }
4363 })
4364 .collect();
4365 let input_coin_objects = inner_temporary_store
4366 .input_objects
4367 .iter()
4368 .filter_map(|(k, v)| {
4369 if v.is_coin() {
4370 Some((*k, v.clone()))
4371 } else {
4372 None
4373 }
4374 })
4375 .collect::<ObjectMap>();
4376 Some((input_coin_objects, written_coin_objects))
4377 }
4378
4379 #[instrument(level = "trace", skip_all)]
4391 pub async fn get_transaction_lock(
4392 &self,
4393 object_ref: &ObjectRef,
4394 epoch_store: &AuthorityPerEpochStore,
4395 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4396 let lock_info = self
4397 .get_object_cache_reader()
4398 .try_get_lock(*object_ref, epoch_store)?;
4399 let lock_info = match lock_info {
4400 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4401 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4402 provided_obj_ref: *object_ref,
4403 current_version: locked_ref.1,
4404 }
4405 .into());
4406 }
4407 ObjectLockStatus::Initialized => {
4408 return Ok(None);
4409 }
4410 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4411 };
4412
4413 epoch_store.get_signed_transaction(&lock_info)
4414 }
4415
4416 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4417 self.get_object_cache_reader().try_get_objects(objects)
4418 }
4419
4420 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4422 self.try_get_objects(objects)
4423 .await
4424 .expect("storage access failed")
4425 }
4426
4427 pub async fn try_get_object_or_tombstone(
4428 &self,
4429 object_id: ObjectID,
4430 ) -> IotaResult<Option<ObjectRef>> {
4431 self.get_object_cache_reader()
4432 .try_get_latest_object_ref_or_tombstone(object_id)
4433 }
4434
4435 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4437 self.try_get_object_or_tombstone(object_id)
4438 .await
4439 .expect("storage access failed")
4440 }
4441
4442 pub fn set_override_protocol_upgrade_buffer_stake(
4452 &self,
4453 expected_epoch: EpochId,
4454 buffer_stake_bps: u64,
4455 ) -> IotaResult {
4456 let epoch_store = self.load_epoch_store_one_call_per_task();
4457 let actual_epoch = epoch_store.epoch();
4458 if actual_epoch != expected_epoch {
4459 return Err(IotaError::WrongEpoch {
4460 expected_epoch,
4461 actual_epoch,
4462 });
4463 }
4464
4465 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4466 }
4467
4468 pub fn clear_override_protocol_upgrade_buffer_stake(
4469 &self,
4470 expected_epoch: EpochId,
4471 ) -> IotaResult {
4472 let epoch_store = self.load_epoch_store_one_call_per_task();
4473 let actual_epoch = epoch_store.epoch();
4474 if actual_epoch != expected_epoch {
4475 return Err(IotaError::WrongEpoch {
4476 expected_epoch,
4477 actual_epoch,
4478 });
4479 }
4480
4481 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4482 }
4483
4484 pub async fn get_available_system_packages(
4488 &self,
4489 binary_config: &BinaryConfig,
4490 ) -> Vec<ObjectRef> {
4491 let mut results = vec![];
4492
4493 let system_packages = BuiltInFramework::iter_system_packages();
4494
4495 #[cfg(msim)]
4497 let extra_packages = framework_injection::get_extra_packages(self.name);
4498 #[cfg(msim)]
4499 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4500
4501 for system_package in system_packages {
4502 let modules = system_package.modules().to_vec();
4503 #[cfg(msim)]
4505 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4506 .unwrap_or(modules);
4507
4508 let Some(obj_ref) = iota_framework::compare_system_package(
4509 &self.get_object_store(),
4510 &system_package.id,
4511 &modules,
4512 system_package.dependencies.to_vec(),
4513 binary_config,
4514 )
4515 .await
4516 else {
4517 return vec![];
4518 };
4519 results.push(obj_ref);
4520 }
4521
4522 results
4523 }
4524
4525 async fn get_system_package_bytes(
4542 &self,
4543 system_packages: Vec<ObjectRef>,
4544 binary_config: &BinaryConfig,
4545 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4546 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4547 let objects = self.get_objects(&ids).await;
4548
4549 let mut res = Vec::with_capacity(system_packages.len());
4550 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4551 let prev_transaction = match object {
4552 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4553 info!("Framework {} does not need updating", system_package_ref.0);
4555 continue;
4556 }
4557
4558 Some(cur_object) => cur_object.previous_transaction,
4559 None => TransactionDigest::genesis_marker(),
4560 };
4561
4562 #[cfg(msim)]
4563 let SystemPackage {
4564 id: _,
4565 bytes,
4566 dependencies,
4567 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4568 .unwrap_or_else(|| {
4569 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4570 });
4571
4572 #[cfg(not(msim))]
4573 let SystemPackage {
4574 id: _,
4575 bytes,
4576 dependencies,
4577 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4578
4579 let modules: Vec<_> = bytes
4580 .iter()
4581 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4582 .collect();
4583
4584 let new_object = Object::new_system_package(
4585 &modules,
4586 system_package_ref.1,
4587 dependencies.clone(),
4588 prev_transaction,
4589 );
4590
4591 let new_ref = new_object.compute_object_reference();
4592 if new_ref != system_package_ref {
4593 error!(
4594 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4595 );
4596 return None;
4597 }
4598
4599 res.push((system_package_ref.1, bytes, dependencies));
4600 }
4601
4602 Some(res)
4603 }
4604
4605 fn is_protocol_version_supported_v1(
4609 proposed_protocol_version: ProtocolVersion,
4610 committee: &Committee,
4611 capabilities: Vec<AuthorityCapabilitiesV1>,
4612 mut buffer_stake_bps: u64,
4613 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
4614 if buffer_stake_bps > 10000 {
4615 warn!("clamping buffer_stake_bps to 10000");
4616 buffer_stake_bps = 10000;
4617 }
4618
4619 let mut desired_upgrades: Vec<_> = capabilities
4622 .into_iter()
4623 .filter_map(|mut cap| {
4624 if cap.available_system_packages.is_empty() {
4626 return None;
4627 }
4628
4629 cap.available_system_packages.sort();
4630
4631 info!(
4632 "validator {:?} supports {:?} with system packages: {:?}",
4633 cap.authority.concise(),
4634 cap.supported_protocol_versions,
4635 cap.available_system_packages,
4636 );
4637
4638 cap.supported_protocol_versions
4642 .get_version_digest(proposed_protocol_version)
4643 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4644 })
4645 .collect();
4646
4647 desired_upgrades.sort();
4650 desired_upgrades
4651 .into_iter()
4652 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4653 .into_iter()
4654 .find_map(|((digest, packages), group)| {
4655 assert!(!packages.is_empty());
4657
4658 let mut stake_aggregator: StakeAggregator<(), true> =
4659 StakeAggregator::new(Arc::new(committee.clone()));
4660
4661 for (_, _, authority) in group {
4662 stake_aggregator.insert_generic(authority, ());
4663 }
4664
4665 let total_votes = stake_aggregator.total_votes();
4666 let quorum_threshold = committee.quorum_threshold();
4667 let f = committee.total_votes() - committee.quorum_threshold();
4668
4669 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
4671 let effective_threshold = quorum_threshold + buffer_stake;
4672
4673 info!(
4674 protocol_config_digest = ?digest,
4675 ?total_votes,
4676 ?quorum_threshold,
4677 ?buffer_stake_bps,
4678 ?effective_threshold,
4679 ?proposed_protocol_version,
4680 ?packages,
4681 "support for upgrade"
4682 );
4683
4684 let has_support = total_votes >= effective_threshold;
4685 has_support.then_some((proposed_protocol_version, packages))
4686 })
4687 }
4688
4689 fn choose_protocol_version_and_system_packages_v1(
4693 current_protocol_version: ProtocolVersion,
4694 committee: &Committee,
4695 capabilities: Vec<AuthorityCapabilitiesV1>,
4696 buffer_stake_bps: u64,
4697 ) -> (ProtocolVersion, Vec<ObjectRef>) {
4698 let mut next_protocol_version = current_protocol_version;
4699 let mut system_packages = vec![];
4700
4701 while let Some((version, packages)) = Self::is_protocol_version_supported_v1(
4705 next_protocol_version + 1,
4706 committee,
4707 capabilities.clone(),
4708 buffer_stake_bps,
4709 ) {
4710 next_protocol_version = version;
4711 system_packages = packages;
4712 }
4713
4714 (next_protocol_version, system_packages)
4715 }
4716
4717 #[instrument(level = "debug", skip_all)]
4718 fn create_authenticator_state_tx(
4719 &self,
4720 epoch_store: &Arc<AuthorityPerEpochStore>,
4721 ) -> Option<EndOfEpochTransactionKind> {
4722 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4723 info!("authenticator state transactions not enabled");
4724 return None;
4725 }
4726
4727 let authenticator_state_exists = epoch_store.authenticator_state_exists();
4728 let tx = if authenticator_state_exists {
4729 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4730 let min_epoch =
4731 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4732 let authenticator_obj_initial_shared_version = epoch_store
4733 .epoch_start_config()
4734 .authenticator_obj_initial_shared_version()
4735 .expect("initial version must exist");
4736
4737 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4738 min_epoch,
4739 authenticator_obj_initial_shared_version,
4740 );
4741
4742 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4743
4744 tx
4745 } else {
4746 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4747 info!("Creating AuthenticatorStateCreate tx");
4748 tx
4749 };
4750 Some(tx)
4751 }
4752
4753 #[instrument(level = "error", skip_all)]
4766 pub async fn create_and_execute_advance_epoch_tx(
4767 &self,
4768 epoch_store: &Arc<AuthorityPerEpochStore>,
4769 gas_cost_summary: &GasCostSummary,
4770 checkpoint: CheckpointSequenceNumber,
4771 epoch_start_timestamp_ms: CheckpointTimestamp,
4772 ) -> anyhow::Result<(
4773 IotaSystemState,
4774 Option<SystemEpochInfoEvent>,
4775 TransactionEffects,
4776 )> {
4777 let mut txns = Vec::new();
4778
4779 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4780 txns.push(tx);
4781 }
4782
4783 let next_epoch = epoch_store.epoch() + 1;
4784
4785 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4786
4787 let (next_epoch_protocol_version, next_epoch_system_packages) =
4788 Self::choose_protocol_version_and_system_packages_v1(
4789 epoch_store.protocol_version(),
4790 epoch_store.committee(),
4791 epoch_store
4792 .get_capabilities_v1()
4793 .expect("read capabilities from db cannot fail"),
4794 buffer_stake_bps,
4795 );
4796
4797 let config = epoch_store.protocol_config();
4801 let binary_config = to_binary_config(config);
4802 let Some(next_epoch_system_package_bytes) = self
4803 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4804 .await
4805 else {
4806 error!(
4807 "upgraded system packages {:?} are not locally available, cannot create \
4808 ChangeEpochTx. validator binary must be upgraded to the correct version!",
4809 next_epoch_system_packages
4810 );
4811 bail!("missing system packages: cannot form ChangeEpochTx");
4821 };
4822
4823 if config.protocol_defined_base_fee()
4826 && config.max_committee_members_count_as_option().is_some()
4827 {
4828 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4829 next_epoch,
4830 next_epoch_protocol_version,
4831 gas_cost_summary.storage_cost,
4832 gas_cost_summary.computation_cost,
4833 gas_cost_summary.computation_cost_burned,
4834 gas_cost_summary.storage_rebate,
4835 gas_cost_summary.non_refundable_storage_fee,
4836 epoch_start_timestamp_ms,
4837 next_epoch_system_package_bytes,
4838 ));
4839 } else {
4840 txns.push(EndOfEpochTransactionKind::new_change_epoch(
4841 next_epoch,
4842 next_epoch_protocol_version,
4843 gas_cost_summary.storage_cost,
4844 gas_cost_summary.computation_cost,
4845 gas_cost_summary.storage_rebate,
4846 gas_cost_summary.non_refundable_storage_fee,
4847 epoch_start_timestamp_ms,
4848 next_epoch_system_package_bytes,
4849 ));
4850 }
4851
4852 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4853
4854 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4855 tx.clone(),
4856 epoch_store.epoch(),
4857 checkpoint,
4858 );
4859
4860 let tx_digest = executable_tx.digest();
4861
4862 info!(
4863 ?next_epoch,
4864 ?next_epoch_protocol_version,
4865 ?next_epoch_system_packages,
4866 computation_cost=?gas_cost_summary.computation_cost,
4867 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4868 storage_cost=?gas_cost_summary.storage_cost,
4869 storage_rebate=?gas_cost_summary.storage_rebate,
4870 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4871 ?tx_digest,
4872 "Creating advance epoch transaction"
4873 );
4874
4875 fail_point_async!("change_epoch_tx_delay");
4876 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
4877
4878 if self
4882 .get_transaction_cache_reader()
4883 .try_is_tx_already_executed(tx_digest)?
4884 {
4885 warn!("change epoch tx has already been executed via state sync");
4886 bail!("change epoch tx has already been executed via state sync",);
4887 }
4888
4889 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
4890
4891 epoch_store.assign_shared_object_versions_idempotent(
4895 self.get_object_cache_reader().as_ref(),
4896 &[executable_tx.clone()],
4897 )?;
4898
4899 let input_objects =
4900 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4901
4902 let (temporary_store, effects, _execution_error_opt) =
4903 self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4904 let system_obj = get_iota_system_state(&temporary_store.written)
4905 .expect("change epoch tx must write to system object");
4906 let system_epoch_info_event = temporary_store
4908 .events
4909 .data
4910 .into_iter()
4911 .find(|event| event.is_system_epoch_info_event())
4912 .map(SystemEpochInfoEvent::from);
4913 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4916
4917 self.get_state_sync_store()
4921 .try_insert_transaction_and_effects(&tx, &effects)
4922 .map_err(|err| {
4923 let err: anyhow::Error = err.into();
4924 err
4925 })?;
4926
4927 info!(
4928 "Effects summary of the change epoch transaction: {:?}",
4929 effects.summary_for_debug()
4930 );
4931 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
4932 assert!(effects.status().is_ok());
4934 Ok((system_obj, system_epoch_info_event, effects))
4935 }
4936
4937 #[instrument(level = "error", skip_all)]
4941 async fn revert_uncommitted_epoch_transactions(
4942 &self,
4943 epoch_store: &AuthorityPerEpochStore,
4944 ) -> IotaResult {
4945 {
4946 let state = epoch_store.get_reconfig_state_write_lock_guard();
4947 if state.should_accept_user_certs() {
4948 epoch_store.close_user_certs(state);
4957 }
4958 }
4960 let pending_certificates = epoch_store.pending_consensus_certificates();
4961 info!(
4962 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
4963 pending_certificates.len(),
4964 pending_certificates,
4965 );
4966 for digest in pending_certificates {
4967 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
4968 info!(
4969 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
4970 digest
4971 );
4972 continue;
4973 }
4974 info!("Reverting {:?} at the end of epoch", digest);
4975 epoch_store.revert_executed_transaction(&digest)?;
4976 self.get_reconfig_api().try_revert_state_update(&digest)?;
4977 }
4978 info!("All uncommitted local transactions reverted");
4979 Ok(())
4980 }
4981
4982 #[instrument(level = "error", skip_all)]
4983 async fn reopen_epoch_db(
4984 &self,
4985 cur_epoch_store: &AuthorityPerEpochStore,
4986 new_committee: Committee,
4987 epoch_start_configuration: EpochStartConfiguration,
4988 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4989 epoch_last_checkpoint: CheckpointSequenceNumber,
4990 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
4991 let new_epoch = new_committee.epoch;
4992 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
4993 assert_eq!(
4994 epoch_start_configuration.epoch_start_state().epoch(),
4995 new_committee.epoch
4996 );
4997 fail_point!("before-open-new-epoch-store");
4998 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
4999 self.name,
5000 new_committee,
5001 epoch_start_configuration,
5002 self.get_backing_package_store().clone(),
5003 self.get_object_store().clone(),
5004 expensive_safety_check_config,
5005 cur_epoch_store.get_chain_identifier(),
5006 epoch_last_checkpoint,
5007 );
5008 self.epoch_store.store(new_epoch_store.clone());
5009 Ok(new_epoch_store)
5010 }
5011
5012 #[cfg(test)]
5013 pub(crate) fn iter_live_object_set_for_testing(
5014 &self,
5015 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5016 self.get_accumulator_store()
5017 .iter_cached_live_object_set_for_testing()
5018 }
5019
5020 #[cfg(test)]
5021 pub(crate) fn shutdown_execution_for_test(&self) {
5022 self.tx_execution_shutdown
5023 .lock()
5024 .take()
5025 .unwrap()
5026 .send(())
5027 .unwrap();
5028 }
5029
5030 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5033 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5034 self.get_object_cache_reader()
5035 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5036 self.get_reconfig_api()
5037 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5038 }
5039}
5040
5041pub struct RandomnessRoundReceiver {
5042 authority_state: Arc<AuthorityState>,
5043 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5044}
5045
5046impl RandomnessRoundReceiver {
5047 pub fn spawn(
5048 authority_state: Arc<AuthorityState>,
5049 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5050 ) -> JoinHandle<()> {
5051 let rrr = RandomnessRoundReceiver {
5052 authority_state,
5053 randomness_rx,
5054 };
5055 spawn_monitored_task!(rrr.run())
5056 }
5057
5058 async fn run(mut self) {
5059 info!("RandomnessRoundReceiver event loop started");
5060
5061 loop {
5062 tokio::select! {
5063 maybe_recv = self.randomness_rx.recv() => {
5064 if let Some((epoch, round, bytes)) = maybe_recv {
5065 self.handle_new_randomness(epoch, round, bytes);
5066 } else {
5067 break;
5068 }
5069 },
5070 }
5071 }
5072
5073 info!("RandomnessRoundReceiver event loop ended");
5074 }
5075
5076 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5077 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5078 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5079 if epoch_store.epoch() != epoch {
5080 warn!(
5081 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5082 epoch_store.epoch()
5083 );
5084 return;
5085 }
5086 let transaction = VerifiedTransaction::new_randomness_state_update(
5087 epoch,
5088 round,
5089 bytes,
5090 epoch_store
5091 .epoch_start_config()
5092 .randomness_obj_initial_shared_version(),
5093 );
5094 debug!(
5095 "created randomness state update transaction with digest: {:?}",
5096 transaction.digest()
5097 );
5098 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5099 let digest = *transaction.digest();
5100
5101 self.authority_state
5106 .get_cache_commit()
5107 .persist_transaction(&transaction);
5108
5109 self.authority_state
5111 .transaction_manager()
5112 .enqueue(vec![transaction], &epoch_store);
5113
5114 let authority_state = self.authority_state.clone();
5115 spawn_monitored_task!(async move {
5116 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5125 let result = tokio::time::timeout(
5126 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5127 authority_state
5128 .get_transaction_cache_reader()
5129 .try_notify_read_executed_effects(&[digest]),
5130 )
5131 .await;
5132 let result = match result {
5133 Ok(result) => result,
5134 Err(_) => {
5135 if cfg!(debug_assertions) {
5136 panic!(
5138 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5139 );
5140 }
5141 warn!(
5142 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5143 );
5144 authority_state
5146 .get_transaction_cache_reader()
5147 .try_notify_read_executed_effects(&[digest])
5148 .await
5149 }
5150 };
5151
5152 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5153 let effects = effects.pop().expect("should return effects");
5154 if *effects.status() != ExecutionStatus::Success {
5155 fatal!(
5156 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5157 );
5158 }
5159 debug!(
5160 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5161 );
5162 });
5163 }
5164}
5165
5166#[async_trait]
5167impl TransactionKeyValueStoreTrait for AuthorityState {
5168 async fn multi_get(
5169 &self,
5170 transaction_keys: &[TransactionDigest],
5171 effects_keys: &[TransactionDigest],
5172 ) -> IotaResult<KVStoreTransactionData> {
5173 let txns = if !transaction_keys.is_empty() {
5174 self.get_transaction_cache_reader()
5175 .try_multi_get_transaction_blocks(transaction_keys)?
5176 .into_iter()
5177 .map(|t| t.map(|t| (*t).clone().into_inner()))
5178 .collect()
5179 } else {
5180 vec![]
5181 };
5182
5183 let fx = if !effects_keys.is_empty() {
5184 self.get_transaction_cache_reader()
5185 .try_multi_get_executed_effects(effects_keys)?
5186 } else {
5187 vec![]
5188 };
5189
5190 Ok((txns, fx))
5191 }
5192
5193 async fn multi_get_checkpoints(
5194 &self,
5195 checkpoint_summaries: &[CheckpointSequenceNumber],
5196 checkpoint_contents: &[CheckpointSequenceNumber],
5197 checkpoint_summaries_by_digest: &[CheckpointDigest],
5198 ) -> IotaResult<(
5199 Vec<Option<CertifiedCheckpointSummary>>,
5200 Vec<Option<CheckpointContents>>,
5201 Vec<Option<CertifiedCheckpointSummary>>,
5202 )> {
5203 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5205 let store = self.get_checkpoint_store();
5206 for seq in checkpoint_summaries {
5207 let checkpoint = store
5208 .get_checkpoint_by_sequence_number(*seq)?
5209 .map(|c| c.into_inner());
5210
5211 summaries.push(checkpoint);
5212 }
5213
5214 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5215 for seq in checkpoint_contents {
5216 let checkpoint = store
5217 .get_checkpoint_by_sequence_number(*seq)?
5218 .and_then(|summary| {
5219 store
5220 .get_checkpoint_contents(&summary.content_digest)
5221 .expect("db read cannot fail")
5222 });
5223 contents.push(checkpoint);
5224 }
5225
5226 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5227 for digest in checkpoint_summaries_by_digest {
5228 let checkpoint = store
5229 .get_checkpoint_by_digest(digest)?
5230 .map(|c| c.into_inner());
5231 summaries_by_digest.push(checkpoint);
5232 }
5233
5234 Ok((summaries, contents, summaries_by_digest))
5235 }
5236
5237 async fn get_transaction_perpetual_checkpoint(
5238 &self,
5239 digest: TransactionDigest,
5240 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5241 self.get_checkpoint_cache()
5242 .try_get_transaction_perpetual_checkpoint(&digest)
5243 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5244 }
5245
5246 async fn get_object(
5247 &self,
5248 object_id: ObjectID,
5249 version: VersionNumber,
5250 ) -> IotaResult<Option<Object>> {
5251 self.get_object_cache_reader()
5252 .try_get_object_by_key(&object_id, version)
5253 }
5254
5255 async fn multi_get_transactions_perpetual_checkpoints(
5256 &self,
5257 digests: &[TransactionDigest],
5258 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5259 let res = self
5260 .get_checkpoint_cache()
5261 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5262
5263 Ok(res
5264 .into_iter()
5265 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5266 .collect())
5267 }
5268
5269 #[instrument(skip(self))]
5270 async fn multi_get_events_by_tx_digests(
5271 &self,
5272 digests: &[TransactionDigest],
5273 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5274 if digests.is_empty() {
5275 return Ok(vec![]);
5276 }
5277 let events_digests: Vec<_> = self
5278 .get_transaction_cache_reader()
5279 .try_multi_get_executed_effects(digests)?
5280 .into_iter()
5281 .map(|t| t.and_then(|t| t.events_digest().cloned()))
5282 .collect();
5283 let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5284 let mut events = self
5285 .get_transaction_cache_reader()
5286 .try_multi_get_events(&non_empty_events)?
5287 .into_iter();
5288 Ok(events_digests
5289 .into_iter()
5290 .map(|ev| ev.and_then(|_| events.next()?))
5291 .collect())
5292 }
5293}
5294
5295#[cfg(msim)]
5296pub mod framework_injection {
5297 use std::{
5298 cell::RefCell,
5299 collections::{BTreeMap, BTreeSet},
5300 };
5301
5302 use iota_framework::{BuiltInFramework, SystemPackage};
5303 use iota_types::{
5304 base_types::{AuthorityName, ObjectID},
5305 is_system_package,
5306 };
5307 use move_binary_format::CompiledModule;
5308
5309 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5310
5311 thread_local! {
5313 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5314 }
5315
5316 type Framework = Vec<CompiledModule>;
5317
5318 pub type PackageUpgradeCallback =
5319 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5320
5321 enum PackageOverrideConfig {
5322 Global(Framework),
5323 PerValidator(PackageUpgradeCallback),
5324 }
5325
5326 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5327 modules
5328 .iter()
5329 .map(|m| {
5330 let mut buf = Vec::new();
5331 m.serialize_with_version(m.version, &mut buf).unwrap();
5332 buf
5333 })
5334 .collect()
5335 }
5336
5337 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5338 OVERRIDE.with(|bs| {
5339 bs.borrow_mut()
5340 .insert(package_id, PackageOverrideConfig::Global(modules))
5341 });
5342 }
5343
5344 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5345 OVERRIDE.with(|bs| {
5346 bs.borrow_mut()
5347 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5348 });
5349 }
5350
5351 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5352 OVERRIDE.with(|cfg| {
5353 cfg.borrow().get(package_id).and_then(|entry| match entry {
5354 PackageOverrideConfig::Global(framework) => {
5355 Some(compiled_modules_to_bytes(framework))
5356 }
5357 PackageOverrideConfig::PerValidator(func) => {
5358 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5359 }
5360 })
5361 })
5362 }
5363
5364 pub fn get_override_modules(
5365 package_id: &ObjectID,
5366 name: AuthorityName,
5367 ) -> Option<Vec<CompiledModule>> {
5368 OVERRIDE.with(|cfg| {
5369 cfg.borrow().get(package_id).and_then(|entry| match entry {
5370 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5371 PackageOverrideConfig::PerValidator(func) => func(name),
5372 })
5373 })
5374 }
5375
5376 pub fn get_override_system_package(
5377 package_id: &ObjectID,
5378 name: AuthorityName,
5379 ) -> Option<SystemPackage> {
5380 let bytes = get_override_bytes(package_id, name)?;
5381 let dependencies = if is_system_package(*package_id) {
5382 BuiltInFramework::get_package_by_id(package_id)
5383 .dependencies
5384 .to_vec()
5385 } else {
5386 BuiltInFramework::all_package_ids()
5389 };
5390 Some(SystemPackage {
5391 id: *package_id,
5392 bytes,
5393 dependencies,
5394 })
5395 }
5396
5397 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5398 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5399 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5400 cfg.borrow()
5401 .keys()
5402 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5403 .collect()
5404 });
5405
5406 extra
5407 .into_iter()
5408 .map(|package| SystemPackage {
5409 id: package,
5410 bytes: get_override_bytes(&package, name).unwrap(),
5411 dependencies: BuiltInFramework::all_package_ids(),
5412 })
5413 .collect()
5414 }
5415}
5416
5417#[derive(Debug, Serialize, Deserialize, Clone)]
5418pub struct ObjDumpFormat {
5419 pub id: ObjectID,
5420 pub version: VersionNumber,
5421 pub digest: ObjectDigest,
5422 pub object: Object,
5423}
5424
5425impl ObjDumpFormat {
5426 fn new(object: Object) -> Self {
5427 let oref = object.compute_object_reference();
5428 Self {
5429 id: oref.0,
5430 version: oref.1,
5431 digest: oref.2,
5432 object,
5433 }
5434 }
5435}
5436
5437#[derive(Debug, Serialize, Deserialize, Clone)]
5438pub struct NodeStateDump {
5439 pub tx_digest: TransactionDigest,
5440 pub sender_signed_data: SenderSignedData,
5441 pub executed_epoch: u64,
5442 pub reference_gas_price: u64,
5443 pub protocol_version: u64,
5444 pub epoch_start_timestamp_ms: u64,
5445 pub computed_effects: TransactionEffects,
5446 pub expected_effects_digest: TransactionEffectsDigest,
5447 pub relevant_system_packages: Vec<ObjDumpFormat>,
5448 pub shared_objects: Vec<ObjDumpFormat>,
5449 pub loaded_child_objects: Vec<ObjDumpFormat>,
5450 pub modified_at_versions: Vec<ObjDumpFormat>,
5451 pub runtime_reads: Vec<ObjDumpFormat>,
5452 pub input_objects: Vec<ObjDumpFormat>,
5453}
5454
5455impl NodeStateDump {
5456 pub fn new(
5457 tx_digest: &TransactionDigest,
5458 effects: &TransactionEffects,
5459 expected_effects_digest: TransactionEffectsDigest,
5460 object_store: &dyn ObjectStore,
5461 epoch_store: &Arc<AuthorityPerEpochStore>,
5462 inner_temporary_store: &InnerTemporaryStore,
5463 certificate: &VerifiedExecutableTransaction,
5464 ) -> IotaResult<Self> {
5465 let executed_epoch = epoch_store.epoch();
5467 let reference_gas_price = epoch_store.reference_gas_price();
5468 let epoch_start_config = epoch_store.epoch_start_config();
5469 let protocol_version = epoch_store.protocol_version().as_u64();
5470 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5471
5472 let mut relevant_system_packages = Vec::new();
5474 for sys_package_id in BuiltInFramework::all_package_ids() {
5475 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5476 relevant_system_packages.push(ObjDumpFormat::new(w))
5477 }
5478 }
5479
5480 let mut shared_objects = Vec::new();
5482 for kind in effects.input_shared_objects() {
5483 match kind {
5484 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5485 if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5486 shared_objects.push(ObjDumpFormat::new(w))
5487 }
5488 }
5489 InputSharedObject::ReadDeleted(..)
5490 | InputSharedObject::MutateDeleted(..)
5491 | InputSharedObject::Cancelled(..) => (), }
5494 }
5495
5496 let mut loaded_child_objects = Vec::new();
5499 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5500 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5501 loaded_child_objects.push(ObjDumpFormat::new(w))
5502 }
5503 }
5504
5505 let mut modified_at_versions = Vec::new();
5507 for (id, ver) in effects.modified_at_versions() {
5508 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5509 modified_at_versions.push(ObjDumpFormat::new(w))
5510 }
5511 }
5512
5513 let mut runtime_reads = Vec::new();
5517 for obj in inner_temporary_store
5518 .runtime_packages_loaded_from_db
5519 .values()
5520 {
5521 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5522 }
5523
5524 Ok(Self {
5527 tx_digest: *tx_digest,
5528 executed_epoch,
5529 reference_gas_price,
5530 epoch_start_timestamp_ms,
5531 protocol_version,
5532 relevant_system_packages,
5533 shared_objects,
5534 loaded_child_objects,
5535 modified_at_versions,
5536 runtime_reads,
5537 sender_signed_data: certificate.clone().into_message(),
5538 input_objects: inner_temporary_store
5539 .input_objects
5540 .values()
5541 .map(|o| ObjDumpFormat::new(o.clone()))
5542 .collect(),
5543 computed_effects: effects.clone(),
5544 expected_effects_digest,
5545 })
5546 }
5547
5548 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5549 let mut objects = Vec::new();
5550 objects.extend(self.relevant_system_packages.clone());
5551 objects.extend(self.shared_objects.clone());
5552 objects.extend(self.loaded_child_objects.clone());
5553 objects.extend(self.modified_at_versions.clone());
5554 objects.extend(self.runtime_reads.clone());
5555 objects.extend(self.input_objects.clone());
5556 objects
5557 }
5558
5559 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5560 let file_name = format!(
5561 "{}_{}_NODE_DUMP.json",
5562 self.tx_digest,
5563 AuthorityState::unixtime_now_ms()
5564 );
5565 let mut path = path.to_path_buf();
5566 path.push(&file_name);
5567 let mut file = File::create(path.clone())?;
5568 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5569 Ok(path)
5570 }
5571
5572 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5573 let file = File::open(path)?;
5574 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5575 }
5576}