1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::{Duration, SystemTime, UNIX_EPOCH},
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use fastcrypto::{
24 encoding::{Base58, Encoding},
25 hash::MultisetHash,
26};
27use iota_archival::reader::ArchiveReaderBalancer;
28use iota_common::{debug_fatal, fatal};
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
48use iota_storage::{
49 key_value_store::{
50 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51 },
52 key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57 IOTA_SYSTEM_ADDRESS, TypeTag,
58 account_abstraction::{
59 account::AuthenticatorFunctionRefV1Key,
60 authenticator_function::{
61 AuthenticatorFunctionRef, AuthenticatorFunctionRefForExecution,
62 AuthenticatorFunctionRefV1,
63 },
64 },
65 authenticator_state::get_authenticator_state,
66 base_types::*,
67 committee::{Committee, EpochId, ProtocolVersion},
68 crypto::{
69 AuthorityPublicKey, AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer,
70 default_hash,
71 },
72 deny_list_v1::check_coin_deny_list_v1_during_signing,
73 digests::{ChainIdentifier, Digest, TransactionEventsDigest},
74 dynamic_field::{self, DynamicFieldInfo, DynamicFieldName, Field, visitor as DFV},
75 effects::{
76 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
77 TransactionEvents, VerifiedSignedTransactionEffects,
78 },
79 error::{ExecutionError, IotaError, IotaResult, UserInputError},
80 event::{Event, EventID, SystemEpochInfoEvent},
81 executable_transaction::VerifiedExecutableTransaction,
82 execution_config_utils::to_binary_config,
83 execution_status::ExecutionStatus,
84 fp_ensure,
85 gas::{GasCostSummary, IotaGasStatus},
86 gas_coin::NANOS_PER_IOTA,
87 inner_temporary_store::{
88 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
89 WrittenObjects,
90 },
91 iota_system_state::{
92 IotaSystemState, IotaSystemStateTrait,
93 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
94 },
95 is_system_package,
96 layout_resolver::{LayoutResolver, into_struct_layout},
97 message_envelope::Message,
98 messages_checkpoint::{
99 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
100 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
101 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
102 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
103 },
104 messages_consensus::AuthorityCapabilitiesV1,
105 messages_grpc::{
106 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
107 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
108 TransactionStatus,
109 },
110 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
111 move_authenticator::MoveAuthenticator,
112 object::{
113 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
114 bounded_visitor::BoundedVisitor,
115 },
116 storage::{
117 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
118 },
119 supported_protocol_versions::{
120 ProtocolConfig, SupportedProtocolVersions, SupportedProtocolVersionsWithHashes,
121 },
122 transaction::*,
123 transaction_executor::{SimulateTransactionResult, VmChecks},
124};
125use itertools::Itertools;
126use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
127use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
128use parking_lot::Mutex;
129use prometheus::{
130 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
131 register_histogram_vec_with_registry, register_histogram_with_registry,
132 register_int_counter_vec_with_registry, register_int_counter_with_registry,
133 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
134};
135use serde::{Deserialize, Serialize, de::DeserializeOwned};
136use tap::TapFallible;
137use tokio::{
138 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
139 task::JoinHandle,
140};
141use tracing::{debug, error, info, instrument, trace, warn};
142use typed_store::TypedStoreError;
143
144use self::{
145 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
146};
147#[cfg(msim)]
148pub use crate::checkpoints::checkpoint_executor::utils::{
149 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
150};
151use crate::{
152 authority::{
153 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
154 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
155 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
156 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
157 authority_store_tables::AuthorityPrunerTables,
158 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
159 },
160 authority_client::NetworkAuthorityClient,
161 checkpoints::CheckpointStore,
162 congestion_tracker::CongestionTracker,
163 consensus_adapter::ConsensusAdapter,
164 epoch::committee_store::CommitteeStore,
165 execution_cache::{
166 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
167 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
168 TransactionCacheRead,
169 },
170 execution_driver::execution_process,
171 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
172 metrics::{LatencyObserver, RateTracker},
173 module_cache_metrics::ResolverMetrics,
174 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
175 rest_index::RestIndexStore,
176 stake_aggregator::StakeAggregator,
177 state_accumulator::{AccumulatorStore, StateAccumulator},
178 subscription_handler::SubscriptionHandler,
179 transaction_input_loader::TransactionInputLoader,
180 transaction_manager::TransactionManager,
181 transaction_outputs::TransactionOutputs,
182 validator_tx_finalizer::ValidatorTxFinalizer,
183 verify_indexes::verify_indexes,
184};
185
186#[cfg(test)]
187#[path = "unit_tests/authority_tests.rs"]
188pub mod authority_tests;
189
190#[cfg(test)]
191#[path = "unit_tests/transaction_tests.rs"]
192pub mod transaction_tests;
193
194#[cfg(test)]
195#[path = "unit_tests/batch_transaction_tests.rs"]
196mod batch_transaction_tests;
197
198#[cfg(test)]
199#[path = "unit_tests/move_integration_tests.rs"]
200pub mod move_integration_tests;
201
202#[cfg(test)]
203#[path = "unit_tests/gas_tests.rs"]
204mod gas_tests;
205
206#[cfg(test)]
207#[path = "unit_tests/batch_verification_tests.rs"]
208mod batch_verification_tests;
209
210#[cfg(test)]
211#[path = "unit_tests/coin_deny_list_tests.rs"]
212mod coin_deny_list_tests;
213
214#[cfg(test)]
215#[path = "unit_tests/auth_unit_test_utils.rs"]
216pub mod auth_unit_test_utils;
217
218pub mod authority_test_utils;
219
220pub mod authority_per_epoch_store;
221pub mod authority_per_epoch_store_pruner;
222
223pub mod authority_store_pruner;
224pub mod authority_store_tables;
225pub mod authority_store_types;
226pub mod epoch_start_configuration;
227pub mod shared_object_congestion_tracker;
228pub mod shared_object_version_manager;
229pub mod suggested_gas_price_calculator;
230pub mod test_authority_builder;
231pub mod transaction_deferral;
232
233pub(crate) mod authority_store;
234pub mod backpressure;
235
236pub struct AuthorityMetrics {
238 tx_orders: IntCounter,
239 total_certs: IntCounter,
240 total_cert_attempts: IntCounter,
241 total_effects: IntCounter,
242 pub shared_obj_tx: IntCounter,
243 sponsored_tx: IntCounter,
244 tx_already_processed: IntCounter,
245 num_input_objs: Histogram,
246 num_shared_objects: Histogram,
247 batch_size: Histogram,
248
249 authority_state_handle_transaction_latency: Histogram,
250
251 execute_certificate_latency_single_writer: Histogram,
252 execute_certificate_latency_shared_object: Histogram,
253
254 internal_execution_latency: Histogram,
255 execution_load_input_objects_latency: Histogram,
256 prepare_certificate_latency: Histogram,
257 commit_certificate_latency: Histogram,
258 db_checkpoint_latency: Histogram,
259
260 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
261 pub(crate) transaction_manager_num_missing_objects: IntGauge,
262 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
263 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
264 pub(crate) transaction_manager_num_ready: IntGauge,
265 pub(crate) transaction_manager_object_cache_size: IntGauge,
266 pub(crate) transaction_manager_object_cache_hits: IntCounter,
267 pub(crate) transaction_manager_object_cache_misses: IntCounter,
268 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
269 pub(crate) transaction_manager_package_cache_size: IntGauge,
270 pub(crate) transaction_manager_package_cache_hits: IntCounter,
271 pub(crate) transaction_manager_package_cache_misses: IntCounter,
272 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
273 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
274
275 pub(crate) execution_driver_executed_transactions: IntCounter,
276 pub(crate) execution_driver_dispatch_queue: IntGauge,
277 pub(crate) execution_queueing_delay_s: Histogram,
278 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
279 pub(crate) execution_gas_latency_ratio: Histogram,
280
281 pub(crate) skipped_consensus_txns: IntCounter,
282 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
283
284 pub(crate) authority_overload_status: IntGauge,
285 pub(crate) authority_load_shedding_percentage: IntGauge,
286
287 pub(crate) transaction_overload_sources: IntCounterVec,
288
289 post_processing_total_events_emitted: IntCounter,
291 post_processing_total_tx_indexed: IntCounter,
292 post_processing_total_tx_had_event_processed: IntCounter,
293 post_processing_total_failures: IntCounter,
294
295 pub consensus_handler_processed: IntCounterVec,
297 pub consensus_handler_transaction_sizes: HistogramVec,
298 pub consensus_handler_num_low_scoring_authorities: IntGauge,
299 pub consensus_handler_scores: IntGaugeVec,
300 pub consensus_handler_deferred_transactions: IntCounter,
301 pub consensus_handler_congested_transactions: IntCounter,
302 pub consensus_handler_cancelled_transactions: IntCounter,
303 pub consensus_handler_max_object_costs: IntGaugeVec,
304 pub consensus_committed_subdags: IntCounterVec,
305 pub consensus_committed_messages: IntGaugeVec,
306 pub consensus_committed_user_transactions: IntGaugeVec,
307 pub consensus_handler_leader_round: IntGauge,
308 pub consensus_calculated_throughput: IntGauge,
309 pub consensus_calculated_throughput_profile: IntGauge,
310
311 pub limits_metrics: Arc<LimitsMetrics>,
312
313 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
315
316 pub zklogin_sig_count: IntCounter,
318 pub multisig_sig_count: IntCounter,
320
321 pub execution_queueing_latency: LatencyObserver,
324
325 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
332
333 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
336}
337
338const POSITIVE_INT_BUCKETS: &[f64] = &[
340 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
341 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
342 7000000., 10000000.,
343];
344
345const LATENCY_SEC_BUCKETS: &[f64] = &[
346 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
347 10., 20., 30., 60., 90.,
348];
349
350const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
352 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
353 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
354];
355
356const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
357 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
358 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
359];
360
361pub const SIMULATION_GAS_COIN_VALUE: u64 = 1_000_000_000 * NANOS_PER_IOTA; impl AuthorityMetrics {
365 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
366 let execute_certificate_latency = register_histogram_vec_with_registry!(
367 "authority_state_execute_certificate_latency",
368 "Latency of executing certificates, including waiting for inputs",
369 &["tx_type"],
370 LATENCY_SEC_BUCKETS.to_vec(),
371 registry,
372 )
373 .unwrap();
374
375 let execute_certificate_latency_single_writer =
376 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
377 let execute_certificate_latency_shared_object =
378 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
379
380 Self {
381 tx_orders: register_int_counter_with_registry!(
382 "total_transaction_orders",
383 "Total number of transaction orders",
384 registry,
385 )
386 .unwrap(),
387 total_certs: register_int_counter_with_registry!(
388 "total_transaction_certificates",
389 "Total number of transaction certificates handled",
390 registry,
391 )
392 .unwrap(),
393 total_cert_attempts: register_int_counter_with_registry!(
394 "total_handle_certificate_attempts",
395 "Number of calls to handle_certificate",
396 registry,
397 )
398 .unwrap(),
399 total_effects: register_int_counter_with_registry!(
401 "total_transaction_effects",
402 "Total number of transaction effects produced",
403 registry,
404 )
405 .unwrap(),
406
407 shared_obj_tx: register_int_counter_with_registry!(
408 "num_shared_obj_tx",
409 "Number of transactions involving shared objects",
410 registry,
411 )
412 .unwrap(),
413
414 sponsored_tx: register_int_counter_with_registry!(
415 "num_sponsored_tx",
416 "Number of sponsored transactions",
417 registry,
418 )
419 .unwrap(),
420
421 tx_already_processed: register_int_counter_with_registry!(
422 "num_tx_already_processed",
423 "Number of transaction orders already processed previously",
424 registry,
425 )
426 .unwrap(),
427 num_input_objs: register_histogram_with_registry!(
428 "num_input_objects",
429 "Distribution of number of input TX objects per TX",
430 POSITIVE_INT_BUCKETS.to_vec(),
431 registry,
432 )
433 .unwrap(),
434 num_shared_objects: register_histogram_with_registry!(
435 "num_shared_objects",
436 "Number of shared input objects per TX",
437 POSITIVE_INT_BUCKETS.to_vec(),
438 registry,
439 )
440 .unwrap(),
441 batch_size: register_histogram_with_registry!(
442 "batch_size",
443 "Distribution of size of transaction batch",
444 POSITIVE_INT_BUCKETS.to_vec(),
445 registry,
446 )
447 .unwrap(),
448 authority_state_handle_transaction_latency: register_histogram_with_registry!(
449 "authority_state_handle_transaction_latency",
450 "Latency of handling transactions",
451 LATENCY_SEC_BUCKETS.to_vec(),
452 registry,
453 )
454 .unwrap(),
455 execute_certificate_latency_single_writer,
456 execute_certificate_latency_shared_object,
457 internal_execution_latency: register_histogram_with_registry!(
458 "authority_state_internal_execution_latency",
459 "Latency of actual certificate executions",
460 LATENCY_SEC_BUCKETS.to_vec(),
461 registry,
462 )
463 .unwrap(),
464 execution_load_input_objects_latency: register_histogram_with_registry!(
465 "authority_state_execution_load_input_objects_latency",
466 "Latency of loading input objects for execution",
467 LOW_LATENCY_SEC_BUCKETS.to_vec(),
468 registry,
469 )
470 .unwrap(),
471 prepare_certificate_latency: register_histogram_with_registry!(
472 "authority_state_prepare_certificate_latency",
473 "Latency of executing certificates, before committing the results",
474 LATENCY_SEC_BUCKETS.to_vec(),
475 registry,
476 )
477 .unwrap(),
478 commit_certificate_latency: register_histogram_with_registry!(
479 "authority_state_commit_certificate_latency",
480 "Latency of committing certificate execution results",
481 LATENCY_SEC_BUCKETS.to_vec(),
482 registry,
483 )
484 .unwrap(),
485 db_checkpoint_latency: register_histogram_with_registry!(
486 "db_checkpoint_latency",
487 "Latency of checkpointing dbs",
488 LATENCY_SEC_BUCKETS.to_vec(),
489 registry,
490 ).unwrap(),
491 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
492 "transaction_manager_num_enqueued_certificates",
493 "Current number of certificates enqueued to TransactionManager",
494 &["result"],
495 registry,
496 )
497 .unwrap(),
498 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
499 "transaction_manager_num_missing_objects",
500 "Current number of missing objects in TransactionManager",
501 registry,
502 )
503 .unwrap(),
504 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
505 "transaction_manager_num_pending_certificates",
506 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
507 registry,
508 )
509 .unwrap(),
510 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
511 "transaction_manager_num_executing_certificates",
512 "Number of executing certificates, including queued and actually running certificates",
513 registry,
514 )
515 .unwrap(),
516 transaction_manager_num_ready: register_int_gauge_with_registry!(
517 "transaction_manager_num_ready",
518 "Number of ready transactions in TransactionManager",
519 registry,
520 )
521 .unwrap(),
522 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
523 "transaction_manager_object_cache_size",
524 "Current size of object-availability cache in TransactionManager",
525 registry,
526 )
527 .unwrap(),
528 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
529 "transaction_manager_object_cache_hits",
530 "Number of object-availability cache hits in TransactionManager",
531 registry,
532 )
533 .unwrap(),
534 authority_overload_status: register_int_gauge_with_registry!(
535 "authority_overload_status",
536 "Whether authority is current experiencing overload and enters load shedding mode.",
537 registry)
538 .unwrap(),
539 authority_load_shedding_percentage: register_int_gauge_with_registry!(
540 "authority_load_shedding_percentage",
541 "The percentage of transactions is shed when the authority is in load shedding mode.",
542 registry)
543 .unwrap(),
544 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
545 "transaction_manager_object_cache_misses",
546 "Number of object-availability cache misses in TransactionManager",
547 registry,
548 )
549 .unwrap(),
550 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
551 "transaction_manager_object_cache_evictions",
552 "Number of object-availability cache evictions in TransactionManager",
553 registry,
554 )
555 .unwrap(),
556 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
557 "transaction_manager_package_cache_size",
558 "Current size of package-availability cache in TransactionManager",
559 registry,
560 )
561 .unwrap(),
562 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
563 "transaction_manager_package_cache_hits",
564 "Number of package-availability cache hits in TransactionManager",
565 registry,
566 )
567 .unwrap(),
568 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
569 "transaction_manager_package_cache_misses",
570 "Number of package-availability cache misses in TransactionManager",
571 registry,
572 )
573 .unwrap(),
574 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
575 "transaction_manager_package_cache_evictions",
576 "Number of package-availability cache evictions in TransactionManager",
577 registry,
578 )
579 .unwrap(),
580 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
581 "transaction_manager_transaction_queue_age_s",
582 "Time spent in waiting for transaction in the queue",
583 LATENCY_SEC_BUCKETS.to_vec(),
584 registry,
585 )
586 .unwrap(),
587 transaction_overload_sources: register_int_counter_vec_with_registry!(
588 "transaction_overload_sources",
589 "Number of times each source indicates transaction overload.",
590 &["source"],
591 registry)
592 .unwrap(),
593 execution_driver_executed_transactions: register_int_counter_with_registry!(
594 "execution_driver_executed_transactions",
595 "Cumulative number of transaction executed by execution driver",
596 registry,
597 )
598 .unwrap(),
599 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
600 "execution_driver_dispatch_queue",
601 "Number of transaction pending in execution driver dispatch queue",
602 registry,
603 )
604 .unwrap(),
605 execution_queueing_delay_s: register_histogram_with_registry!(
606 "execution_queueing_delay_s",
607 "Queueing delay between a transaction is ready for execution until it starts executing.",
608 LATENCY_SEC_BUCKETS.to_vec(),
609 registry
610 )
611 .unwrap(),
612 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
613 "prepare_cert_gas_latency_ratio",
614 "The ratio of computation gas divided by VM execution latency.",
615 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
616 registry
617 )
618 .unwrap(),
619 execution_gas_latency_ratio: register_histogram_with_registry!(
620 "execution_gas_latency_ratio",
621 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
622 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
623 registry
624 )
625 .unwrap(),
626 skipped_consensus_txns: register_int_counter_with_registry!(
627 "skipped_consensus_txns",
628 "Total number of consensus transactions skipped",
629 registry,
630 )
631 .unwrap(),
632 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
633 "skipped_consensus_txns_cache_hit",
634 "Total number of consensus transactions skipped because of local cache hit",
635 registry,
636 )
637 .unwrap(),
638 post_processing_total_events_emitted: register_int_counter_with_registry!(
639 "post_processing_total_events_emitted",
640 "Total number of events emitted in post processing",
641 registry,
642 )
643 .unwrap(),
644 post_processing_total_tx_indexed: register_int_counter_with_registry!(
645 "post_processing_total_tx_indexed",
646 "Total number of txes indexed in post processing",
647 registry,
648 )
649 .unwrap(),
650 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
651 "post_processing_total_tx_had_event_processed",
652 "Total number of txes finished event processing in post processing",
653 registry,
654 )
655 .unwrap(),
656 post_processing_total_failures: register_int_counter_with_registry!(
657 "post_processing_total_failures",
658 "Total number of failure in post processing",
659 registry,
660 )
661 .unwrap(),
662 consensus_handler_processed: register_int_counter_vec_with_registry!(
663 "consensus_handler_processed",
664 "Number of transactions processed by consensus handler",
665 &["class"],
666 registry
667 ).unwrap(),
668 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
669 "consensus_handler_transaction_sizes",
670 "Sizes of each type of transactions processed by consensus handler",
671 &["class"],
672 POSITIVE_INT_BUCKETS.to_vec(),
673 registry
674 ).unwrap(),
675 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
676 "consensus_handler_num_low_scoring_authorities",
677 "Number of low scoring authorities based on reputation scores from consensus",
678 registry
679 ).unwrap(),
680 consensus_handler_scores: register_int_gauge_vec_with_registry!(
681 "consensus_handler_scores",
682 "scores from consensus for each authority",
683 &["authority"],
684 registry,
685 ).unwrap(),
686 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
687 "consensus_handler_deferred_transactions",
688 "Number of transactions deferred by consensus handler",
689 registry,
690 ).unwrap(),
691 consensus_handler_congested_transactions: register_int_counter_with_registry!(
692 "consensus_handler_congested_transactions",
693 "Number of transactions deferred by consensus handler due to congestion",
694 registry,
695 ).unwrap(),
696 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
697 "consensus_handler_cancelled_transactions",
698 "Number of transactions cancelled by consensus handler",
699 registry,
700 ).unwrap(),
701 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
702 "consensus_handler_max_congestion_control_object_costs",
703 "Max object costs for congestion control in the current consensus commit",
704 &["commit_type"],
705 registry,
706 ).unwrap(),
707 consensus_committed_subdags: register_int_counter_vec_with_registry!(
708 "consensus_committed_subdags",
709 "Number of committed subdags, sliced by leader",
710 &["authority"],
711 registry,
712 ).unwrap(),
713 consensus_committed_messages: register_int_gauge_vec_with_registry!(
714 "consensus_committed_messages",
715 "Total number of committed consensus messages, sliced by author",
716 &["authority"],
717 registry,
718 ).unwrap(),
719 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
720 "consensus_committed_user_transactions",
721 "Number of committed user transactions, sliced by submitter",
722 &["authority"],
723 registry,
724 ).unwrap(),
725 consensus_handler_leader_round: register_int_gauge_with_registry!(
726 "consensus_handler_leader_round",
727 "The leader round of the current consensus output being processed in the consensus handler",
728 registry,
729 ).unwrap(),
730 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
731 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
732 zklogin_sig_count: register_int_counter_with_registry!(
733 "zklogin_sig_count",
734 "Count of zkLogin signatures",
735 registry,
736 )
737 .unwrap(),
738 multisig_sig_count: register_int_counter_with_registry!(
739 "multisig_sig_count",
740 "Count of zkLogin signatures",
741 registry,
742 )
743 .unwrap(),
744 consensus_calculated_throughput: register_int_gauge_with_registry!(
745 "consensus_calculated_throughput",
746 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
747 registry,
748 ).unwrap(),
749 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
750 "consensus_calculated_throughput_profile",
751 "The current active calculated throughput profile",
752 registry
753 ).unwrap(),
754 execution_queueing_latency: LatencyObserver::new(),
755 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
756 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
757 }
758 }
759
760 pub fn reset_on_reconfigure(&self) {
764 self.consensus_committed_messages.reset();
765 self.consensus_handler_scores.reset();
766 self.consensus_committed_user_transactions.reset();
767 }
768}
769
770pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
777
778pub struct AuthorityState {
779 pub name: AuthorityName,
782 pub secret: StableSyncAuthoritySigner,
784
785 input_loader: TransactionInputLoader,
787 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
788
789 epoch_store: ArcSwap<AuthorityPerEpochStore>,
790
791 execution_lock: RwLock<EpochId>,
797
798 pub indexes: Option<Arc<IndexStore>>,
799 pub rest_index: Option<Arc<RestIndexStore>>,
800
801 pub subscription_handler: Arc<SubscriptionHandler>,
802 pub checkpoint_store: Arc<CheckpointStore>,
803
804 committee_store: Arc<CommitteeStore>,
805
806 transaction_manager: Arc<TransactionManager>,
808
809 #[cfg_attr(not(test), expect(unused))]
811 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
812
813 pub metrics: Arc<AuthorityMetrics>,
814 _pruner: AuthorityStorePruner,
815 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
816
817 db_checkpoint_config: DBCheckpointConfig,
819
820 pub config: NodeConfig,
821
822 pub overload_info: AuthorityOverloadInfo,
824
825 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
826
827 chain_identifier: ChainIdentifier,
830
831 pub(crate) congestion_tracker: Arc<CongestionTracker>,
832}
833
834impl AuthorityState {
843 pub fn is_committee_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
844 epoch_store.committee().authority_exists(&self.name)
845 }
846
847 pub fn is_active_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
848 epoch_store
849 .active_validators()
850 .iter()
851 .any(|a| AuthorityName::from(a) == self.name)
852 }
853
854 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
855 !self.is_committee_validator(epoch_store)
856 }
857
858 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
859 &self.committee_store
860 }
861
862 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
863 self.committee_store.clone()
864 }
865
866 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
867 &self.config.authority_overload_config
868 }
869
870 pub fn get_epoch_state_commitments(
871 &self,
872 epoch: EpochId,
873 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
874 self.checkpoint_store.get_epoch_state_commitments(epoch)
875 }
876
877 #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
881 async fn handle_transaction_impl(
882 &self,
883 transaction: VerifiedTransaction,
884 epoch_store: &Arc<AuthorityPerEpochStore>,
885 ) -> IotaResult<VerifiedSignedTransaction> {
886 let _execution_lock = self.execution_lock_for_signing();
888
889 let protocol_config = epoch_store.protocol_config();
890 let reference_gas_price = epoch_store.reference_gas_price();
891
892 let epoch = epoch_store.epoch();
893
894 let tx_data = transaction.data().transaction_data();
895
896 iota_transaction_checks::deny::check_transaction_for_signing(
900 tx_data,
901 transaction.tx_signatures(),
902 &transaction.input_objects()?,
903 &tx_data.receiving_objects(),
904 &self.config.transaction_deny_config,
905 self.get_backing_package_store().as_ref(),
906 )?;
907
908 let (tx_input_objects, tx_receiving_objects, auth_input_objects, account_object) =
913 self.read_objects_for_signing(&transaction, epoch)?;
914
915 let move_authenticator = transaction.sender_move_authenticator();
919
920 let (
926 gas_status,
927 tx_checked_input_objects,
928 auth_checked_input_objects,
929 authenticator_function_ref,
930 ) = self.check_transaction_inputs_for_signing(
931 protocol_config,
932 reference_gas_price,
933 tx_data,
934 tx_input_objects,
935 &tx_receiving_objects,
936 move_authenticator,
937 auth_input_objects,
938 account_object,
939 )?;
940
941 check_coin_deny_list_v1_during_signing(
942 tx_data.sender(),
943 &tx_checked_input_objects,
944 &tx_receiving_objects,
945 &auth_checked_input_objects,
946 &self.get_object_store(),
947 )?;
948
949 if let Some(move_authenticator) = move_authenticator {
950 let auth_checked_input_objects = auth_checked_input_objects
954 .expect("MoveAuthenticator input objects must be provided");
955 let authenticator_function_ref = authenticator_function_ref
956 .expect("AuthenticatorFunctionRefV1 object must be provided");
957
958 let (kind, signer, _) = tx_data.execution_parts();
959
960 let validation_result = epoch_store.executor().authenticate_transaction(
962 self.get_backing_store().as_ref(),
963 protocol_config,
964 self.metrics.limits_metrics.clone(),
965 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
966 epoch_store
967 .epoch_start_config()
968 .epoch_data()
969 .epoch_start_timestamp(),
970 gas_status,
971 move_authenticator.to_owned(),
972 authenticator_function_ref,
973 auth_checked_input_objects,
974 kind,
975 signer,
976 transaction.digest().to_owned(),
977 &mut None,
978 );
979
980 if let Err(validation_error) = validation_result {
981 return Err(IotaError::MoveAuthenticatorExecutionFailure {
982 error: validation_error.to_string(),
983 });
984 }
985 }
986
987 let owned_objects = tx_checked_input_objects.inner().filter_owned_objects();
988
989 let signed_transaction =
990 VerifiedSignedTransaction::new(epoch, transaction, self.name, &*self.secret);
991
992 self.get_cache_writer().try_acquire_transaction_locks(
997 epoch_store,
998 &owned_objects,
999 signed_transaction.clone(),
1000 )?;
1001
1002 Ok(signed_transaction)
1003 }
1004
1005 #[instrument(name = "handle_transaction", level = "trace", skip_all)]
1007 pub async fn handle_transaction(
1008 &self,
1009 epoch_store: &Arc<AuthorityPerEpochStore>,
1010 transaction: VerifiedTransaction,
1011 ) -> IotaResult<HandleTransactionResponse> {
1012 let tx_digest = *transaction.digest();
1013 debug!("handle_transaction");
1014
1015 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
1017 return Ok(HandleTransactionResponse { status });
1018 }
1019
1020 let _metrics_guard = self
1021 .metrics
1022 .authority_state_handle_transaction_latency
1023 .start_timer();
1024 self.metrics.tx_orders.inc();
1025
1026 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
1027 match signed {
1028 Ok(s) => {
1029 if self.is_committee_validator(epoch_store) {
1030 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
1031 let tx = s.clone();
1032 let validator_tx_finalizer = validator_tx_finalizer.clone();
1033 let cache_reader = self.get_transaction_cache_reader().clone();
1034 let epoch_store = epoch_store.clone();
1035 spawn_monitored_task!(epoch_store.within_alive_epoch(
1036 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
1037 ));
1038 }
1039 }
1040 Ok(HandleTransactionResponse {
1041 status: TransactionStatus::Signed(s.into_inner().into_sig()),
1042 })
1043 }
1044 Err(err) => Ok(HandleTransactionResponse {
1048 status: self
1049 .get_transaction_status(&tx_digest, epoch_store)?
1050 .ok_or(err)?
1051 .1,
1052 }),
1053 }
1054 }
1055
1056 pub fn check_system_overload_at_signing(&self) -> bool {
1057 self.config
1058 .authority_overload_config
1059 .check_system_overload_at_signing
1060 }
1061
1062 pub fn check_system_overload_at_execution(&self) -> bool {
1063 self.config
1064 .authority_overload_config
1065 .check_system_overload_at_execution
1066 }
1067
1068 pub(crate) fn check_system_overload(
1069 &self,
1070 consensus_adapter: &Arc<ConsensusAdapter>,
1071 tx_data: &SenderSignedData,
1072 do_authority_overload_check: bool,
1073 ) -> IotaResult {
1074 if do_authority_overload_check {
1075 self.check_authority_overload(tx_data).tap_err(|_| {
1076 self.update_overload_metrics("execution_queue");
1077 })?;
1078 }
1079 self.transaction_manager
1080 .check_execution_overload(self.overload_config(), tx_data)
1081 .tap_err(|_| {
1082 self.update_overload_metrics("execution_pending");
1083 })?;
1084 consensus_adapter.check_consensus_overload().tap_err(|_| {
1085 self.update_overload_metrics("consensus");
1086 })?;
1087
1088 let pending_tx_count = self
1089 .get_cache_commit()
1090 .approximate_pending_transaction_count();
1091 if pending_tx_count
1092 > self
1093 .config
1094 .execution_cache_config
1095 .writeback_cache
1096 .backpressure_threshold_for_rpc()
1097 {
1098 return Err(IotaError::ValidatorOverloadedRetryAfter {
1099 retry_after_secs: 10,
1100 });
1101 }
1102
1103 Ok(())
1104 }
1105
1106 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1107 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1108 return Ok(());
1109 }
1110
1111 let load_shedding_percentage = self
1112 .overload_info
1113 .load_shedding_percentage
1114 .load(Ordering::Relaxed);
1115 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1116 }
1117
1118 fn update_overload_metrics(&self, source: &str) {
1119 self.metrics
1120 .transaction_overload_sources
1121 .with_label_values(&[source])
1122 .inc();
1123 }
1124
1125 #[instrument(level = "trace", skip_all)]
1127 pub async fn execute_certificate(
1128 &self,
1129 certificate: &VerifiedCertificate,
1130 epoch_store: &Arc<AuthorityPerEpochStore>,
1131 ) -> IotaResult<TransactionEffects> {
1132 let _metrics_guard = if certificate.contains_shared_object() {
1133 self.metrics
1134 .execute_certificate_latency_shared_object
1135 .start_timer()
1136 } else {
1137 self.metrics
1138 .execute_certificate_latency_single_writer
1139 .start_timer()
1140 };
1141 trace!("execute_certificate");
1142
1143 self.metrics.total_cert_attempts.inc();
1144
1145 if !certificate.contains_shared_object() {
1146 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1151 }
1152
1153 epoch_store
1156 .within_alive_epoch(self.notify_read_effects(certificate))
1157 .await
1158 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1159 .and_then(|r| r)
1160 }
1161
1162 #[instrument(level = "trace", skip_all, fields(tx_digest = ?certificate.digest()))]
1177 pub fn try_execute_immediately(
1178 &self,
1179 certificate: &VerifiedExecutableTransaction,
1180 expected_effects_digest: Option<TransactionEffectsDigest>,
1181 epoch_store: &Arc<AuthorityPerEpochStore>,
1182 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1183 let _scope = monitored_scope("Execution::try_execute_immediately");
1184 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1185
1186 let tx_digest = certificate.digest();
1187
1188 let tx_guard = epoch_store.acquire_tx_guard(certificate)?;
1190
1191 if let Some(effects) = self
1194 .get_transaction_cache_reader()
1195 .try_get_executed_effects(tx_digest)?
1196 {
1197 if let Some(expected_effects_digest_inner) = expected_effects_digest {
1198 assert_eq!(
1199 effects.digest(),
1200 expected_effects_digest_inner,
1201 "Unexpected effects digest for transaction {tx_digest:?}"
1202 );
1203 }
1204 tx_guard.release();
1205 return Ok((effects, None));
1206 }
1207
1208 let (tx_input_objects, authenticator_input_objects, account_object) =
1209 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1210
1211 let expected_effects_digest =
1217 expected_effects_digest.or(epoch_store.get_signed_effects_digest(tx_digest)?);
1218
1219 self.process_certificate(
1220 tx_guard,
1221 certificate,
1222 tx_input_objects,
1223 account_object,
1224 authenticator_input_objects,
1225 expected_effects_digest,
1226 epoch_store,
1227 )
1228 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1229 .tap_ok(
1230 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1231 )
1232 }
1233
1234 pub fn read_objects_for_execution(
1235 &self,
1236 tx_lock: &CertLockGuard,
1237 certificate: &VerifiedExecutableTransaction,
1238 epoch_store: &Arc<AuthorityPerEpochStore>,
1239 ) -> IotaResult<(InputObjects, Option<InputObjects>, Option<ObjectReadResult>)> {
1240 let _scope = monitored_scope("Execution::load_input_objects");
1241 let _metrics_guard = self
1242 .metrics
1243 .execution_load_input_objects_latency
1244 .start_timer();
1245
1246 let input_objects = certificate.collect_all_input_object_kind_for_reading()?;
1247
1248 let input_objects = self.input_loader.read_objects_for_execution(
1249 epoch_store,
1250 &certificate.key(),
1251 tx_lock,
1252 &input_objects,
1253 epoch_store.epoch(),
1254 )?;
1255
1256 certificate.split_input_objects_into_groups_for_reading(input_objects)
1257 }
1258
1259 pub fn try_execute_for_test(
1263 &self,
1264 certificate: &VerifiedCertificate,
1265 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1266 let epoch_store = self.epoch_store_for_testing();
1267 let (effects, execution_error_opt) = self.try_execute_immediately(
1268 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1269 None,
1270 &epoch_store,
1271 )?;
1272 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1273 Ok((signed_effects, execution_error_opt))
1274 }
1275
1276 pub fn execute_for_test(
1278 &self,
1279 certificate: &VerifiedCertificate,
1280 ) -> (VerifiedSignedTransactionEffects, Option<ExecutionError>) {
1281 self.try_execute_for_test(certificate)
1282 .expect("try_execute_for_test should not fail")
1283 }
1284
1285 pub async fn notify_read_effects(
1286 &self,
1287 certificate: &VerifiedCertificate,
1288 ) -> IotaResult<TransactionEffects> {
1289 self.get_transaction_cache_reader()
1290 .try_notify_read_executed_effects(&[*certificate.digest()])
1291 .await
1292 .map(|mut r| r.pop().expect("must return correct number of effects"))
1293 }
1294
1295 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1296 self.get_object_cache_reader()
1297 .try_check_owned_objects_are_live(owned_object_refs)
1298 }
1299
1300 pub(crate) fn debug_dump_transaction_state(
1305 &self,
1306 tx_digest: &TransactionDigest,
1307 effects: &TransactionEffects,
1308 expected_effects_digest: TransactionEffectsDigest,
1309 inner_temporary_store: &InnerTemporaryStore,
1310 certificate: &VerifiedExecutableTransaction,
1311 debug_dump_config: &StateDebugDumpConfig,
1312 ) -> IotaResult<PathBuf> {
1313 let dump_dir = debug_dump_config
1314 .dump_file_directory
1315 .as_ref()
1316 .cloned()
1317 .unwrap_or(std::env::temp_dir());
1318 let epoch_store = self.load_epoch_store_one_call_per_task();
1319
1320 NodeStateDump::new(
1321 tx_digest,
1322 effects,
1323 expected_effects_digest,
1324 self.get_object_store().as_ref(),
1325 &epoch_store,
1326 inner_temporary_store,
1327 certificate,
1328 )?
1329 .write_to_file(&dump_dir)
1330 .map_err(|e| IotaError::FileIO(e.to_string()))
1331 }
1332
1333 #[instrument(level = "trace", skip_all)]
1334 pub(crate) fn process_certificate(
1335 &self,
1336 tx_guard: CertTxGuard,
1337 certificate: &VerifiedExecutableTransaction,
1338 tx_input_objects: InputObjects,
1339 account_object: Option<ObjectReadResult>,
1340 authenticator_input_objects: Option<InputObjects>,
1341 expected_effects_digest: Option<TransactionEffectsDigest>,
1342 epoch_store: &Arc<AuthorityPerEpochStore>,
1343 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1344 let process_certificate_start_time = tokio::time::Instant::now();
1345 let digest = *certificate.digest();
1346
1347 fail_point_if!("correlated-crash-process-certificate", || {
1348 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1349 iota_simulator::task::kill_current_node(None);
1350 }
1351 });
1352
1353 let execution_guard = self.execution_lock_for_executable_transaction(certificate);
1354 let execution_guard = match execution_guard {
1360 Ok(execution_guard) => execution_guard,
1361 Err(err) => {
1362 tx_guard.release();
1363 return Err(err);
1364 }
1365 };
1366 if *execution_guard != epoch_store.epoch() {
1370 tx_guard.release();
1371 info!("The epoch of the execution_guard doesn't match the epoch store");
1372 return Err(IotaError::WrongEpoch {
1373 expected_epoch: epoch_store.epoch(),
1374 actual_epoch: *execution_guard,
1375 });
1376 }
1377
1378 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1384 &execution_guard,
1385 certificate,
1386 tx_input_objects,
1387 account_object,
1388 authenticator_input_objects,
1389 epoch_store,
1390 ) {
1391 Err(e) => {
1392 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1393 tx_guard.release();
1394 return Err(e);
1395 }
1396 Ok(res) => res,
1397 };
1398
1399 if let Some(expected_effects_digest) = expected_effects_digest {
1400 if effects.digest() != expected_effects_digest {
1401 match self.debug_dump_transaction_state(
1403 &digest,
1404 &effects,
1405 expected_effects_digest,
1406 &inner_temporary_store,
1407 certificate,
1408 &self.config.state_debug_dump_config,
1409 ) {
1410 Ok(out_path) => {
1411 info!(
1412 "Dumped node state for transaction {} to {}",
1413 digest,
1414 out_path.as_path().display().to_string()
1415 );
1416 }
1417 Err(e) => {
1418 error!("Error dumping state for transaction {}: {e}", digest);
1419 }
1420 }
1421 error!(
1422 tx_digest = ?digest,
1423 ?expected_effects_digest,
1424 actual_effects = ?effects,
1425 "fork detected!"
1426 );
1427 panic!(
1428 "Transaction {} is expected to have effects digest {}, but got {}!",
1429 digest,
1430 expected_effects_digest,
1431 effects.digest(),
1432 );
1433 }
1434 }
1435
1436 fail_point!("crash");
1437
1438 self.commit_certificate(
1439 certificate,
1440 inner_temporary_store,
1441 &effects,
1442 tx_guard,
1443 execution_guard,
1444 epoch_store,
1445 )?;
1446
1447 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1448 certificate.data().transaction_data().kind()
1449 {
1450 if let Some(err) = &execution_error_opt {
1451 debug_fatal!("Authenticator state update failed: {:?}", err);
1452 }
1453 epoch_store.update_authenticator_state(auth_state);
1454
1455 if cfg!(debug_assertions) {
1458 let authenticator_state = get_authenticator_state(self.get_object_store())
1459 .expect("Read cannot fail")
1460 .expect("Authenticator state must exist");
1461
1462 let mut sys_jwks: Vec<_> = authenticator_state
1463 .active_jwks
1464 .into_iter()
1465 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1466 .collect();
1467 let mut active_jwks: Vec<_> = epoch_store
1468 .signature_verifier
1469 .get_jwks()
1470 .into_iter()
1471 .collect();
1472 sys_jwks.sort();
1473 active_jwks.sort();
1474
1475 assert_eq!(sys_jwks, active_jwks);
1476 }
1477 }
1478
1479 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1480 if elapsed > 0.0 {
1481 self.metrics
1482 .execution_gas_latency_ratio
1483 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1484 };
1485 Ok((effects, execution_error_opt))
1486 }
1487
1488 #[instrument(level = "trace", skip_all)]
1489 fn commit_certificate(
1490 &self,
1491 certificate: &VerifiedExecutableTransaction,
1492 inner_temporary_store: InnerTemporaryStore,
1493 effects: &TransactionEffects,
1494 tx_guard: CertTxGuard,
1495 _execution_guard: ExecutionLockReadGuard<'_>,
1496 epoch_store: &Arc<AuthorityPerEpochStore>,
1497 ) -> IotaResult {
1498 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1499 monitored_scope("Execution::commit_certificate");
1500 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1501
1502 let tx_key = certificate.key();
1503 let tx_digest = certificate.digest();
1504 let input_object_count = inner_temporary_store.input_objects.len();
1505 let shared_object_count = effects.input_shared_objects().len();
1506
1507 let output_keys = inner_temporary_store.get_output_keys(effects);
1508
1509 let _ = self
1511 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1512 .tap_err(|e| {
1513 self.metrics.post_processing_total_failures.inc();
1514 error!(?tx_digest, "tx post processing failed: {e}");
1515 });
1516
1517 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1521
1522 fail_point!("crash");
1524
1525 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1526 certificate.clone().into_unsigned(),
1527 effects.clone(),
1528 inner_temporary_store,
1529 );
1530 self.get_cache_writer()
1531 .try_write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())?;
1532
1533 if certificate.transaction_data().is_end_of_epoch_tx() {
1534 self.get_object_cache_reader()
1537 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1538 }
1539
1540 tx_guard.commit_tx();
1542
1543 self.transaction_manager
1547 .notify_commit(tx_digest, output_keys, epoch_store);
1548
1549 self.update_metrics(certificate, input_object_count, shared_object_count);
1550
1551 Ok(())
1552 }
1553
1554 fn update_metrics(
1555 &self,
1556 certificate: &VerifiedExecutableTransaction,
1557 input_object_count: usize,
1558 shared_object_count: usize,
1559 ) {
1560 if certificate.has_zklogin_sig() {
1562 self.metrics.zklogin_sig_count.inc();
1563 } else if certificate.has_upgraded_multisig() {
1564 self.metrics.multisig_sig_count.inc();
1565 }
1566
1567 self.metrics.total_effects.inc();
1568 self.metrics.total_certs.inc();
1569
1570 if shared_object_count > 0 {
1571 self.metrics.shared_obj_tx.inc();
1572 }
1573
1574 if certificate.is_sponsored_tx() {
1575 self.metrics.sponsored_tx.inc();
1576 }
1577
1578 self.metrics
1579 .num_input_objs
1580 .observe(input_object_count as f64);
1581 self.metrics
1582 .num_shared_objects
1583 .observe(shared_object_count as f64);
1584 self.metrics.batch_size.observe(
1585 certificate
1586 .data()
1587 .intent_message()
1588 .value
1589 .kind()
1590 .num_commands() as f64,
1591 );
1592 }
1593
1594 #[instrument(level = "trace", skip_all)]
1606 fn prepare_certificate(
1607 &self,
1608 _execution_guard: &ExecutionLockReadGuard<'_>,
1609 certificate: &VerifiedExecutableTransaction,
1610 tx_input_objects: InputObjects,
1611 account_object: Option<ObjectReadResult>,
1612 move_authenticator_input_objects: Option<InputObjects>,
1613 epoch_store: &Arc<AuthorityPerEpochStore>,
1614 ) -> IotaResult<(
1615 InnerTemporaryStore,
1616 TransactionEffects,
1617 Option<ExecutionError>,
1618 )> {
1619 let _scope = monitored_scope("Execution::prepare_certificate");
1620 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1621 let prepare_certificate_start_time = tokio::time::Instant::now();
1622
1623 let protocol_config = epoch_store.protocol_config();
1624
1625 let reference_gas_price = epoch_store.reference_gas_price();
1626
1627 let epoch_id = epoch_store.epoch_start_config().epoch_data().epoch_id();
1628 let epoch_start_timestamp = epoch_store
1629 .epoch_start_config()
1630 .epoch_data()
1631 .epoch_start_timestamp();
1632
1633 let backing_store = self.get_backing_store().as_ref();
1634
1635 let tx_digest = *certificate.digest();
1636
1637 let tx_data = certificate.data().transaction_data();
1640 tx_data.validity_check(protocol_config)?;
1641
1642 let (kind, signer, gas) = tx_data.execution_parts();
1643
1644 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1645 let (inner_temp_store, _, mut effects, execution_error_opt) = if let Some(
1646 move_authenticator,
1647 ) =
1648 certificate.sender_move_authenticator()
1649 {
1650 let (
1657 auth_account_object_id,
1658 auth_account_object_seq_number,
1659 auth_account_object_digest,
1660 ) = move_authenticator.object_to_authenticate_components()?;
1661
1662 let account_object = account_object.expect("Account object must be provided");
1665
1666 let authenticator_function_ref_for_execution = self.check_move_account(
1667 auth_account_object_id,
1668 auth_account_object_seq_number,
1669 auth_account_object_digest,
1670 account_object,
1671 &signer,
1672 )?;
1673
1674 let move_authenticator_input_objects = move_authenticator_input_objects.expect(
1675 "In case of a `MoveAuthenticator` signature, the authenticator input objects must be provided",
1676 );
1677
1678 let authenticator_gas_budget = protocol_config.max_auth_gas();
1683 let (
1684 gas_status,
1685 authenticator_checked_input_objects,
1686 authenticator_and_tx_checked_input_objects,
1687 ) = iota_transaction_checks::check_certificate_and_move_authenticator_input(
1688 certificate,
1689 tx_input_objects,
1690 move_authenticator_input_objects,
1691 authenticator_gas_budget,
1692 protocol_config,
1693 reference_gas_price,
1694 )?;
1695
1696 let owned_object_refs = authenticator_and_tx_checked_input_objects
1697 .inner()
1698 .filter_owned_objects();
1699 self.check_owned_locks(&owned_object_refs)?;
1700
1701 epoch_store
1702 .executor()
1703 .authenticate_then_execute_transaction_to_effects(
1704 backing_store,
1705 protocol_config,
1706 self.metrics.limits_metrics.clone(),
1707 self.config
1708 .expensive_safety_check_config
1709 .enable_deep_per_tx_iota_conservation_check(),
1710 self.config.certificate_deny_config.certificate_deny_set(),
1711 &epoch_id,
1712 epoch_start_timestamp,
1713 gas_status,
1714 gas,
1715 move_authenticator.to_owned(),
1716 authenticator_function_ref_for_execution,
1717 authenticator_checked_input_objects,
1718 authenticator_and_tx_checked_input_objects,
1719 kind,
1720 signer,
1721 tx_digest,
1722 &mut None,
1723 )
1724 } else {
1725 let (tx_gas_status, tx_checked_input_objects) =
1730 iota_transaction_checks::check_certificate_input(
1731 certificate,
1732 tx_input_objects,
1733 protocol_config,
1734 reference_gas_price,
1735 )?;
1736
1737 let owned_object_refs = tx_checked_input_objects.inner().filter_owned_objects();
1738 self.check_owned_locks(&owned_object_refs)?;
1739 epoch_store.executor().execute_transaction_to_effects(
1740 backing_store,
1741 protocol_config,
1742 self.metrics.limits_metrics.clone(),
1743 self.config
1746 .expensive_safety_check_config
1747 .enable_deep_per_tx_iota_conservation_check(),
1748 self.config.certificate_deny_config.certificate_deny_set(),
1749 &epoch_id,
1750 epoch_start_timestamp,
1751 tx_checked_input_objects,
1752 gas,
1753 tx_gas_status,
1754 kind,
1755 signer,
1756 tx_digest,
1757 &mut None,
1758 )
1759 };
1760
1761 fail_point_if!("cp_execution_nondeterminism", || {
1762 #[cfg(msim)]
1763 self.create_fail_state(certificate, epoch_store, &mut effects);
1764 });
1765
1766 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1767 if elapsed > 0.0 {
1768 self.metrics
1769 .prepare_cert_gas_latency_ratio
1770 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1771 }
1772
1773 Ok((inner_temp_store, effects, execution_error_opt.err()))
1774 }
1775
1776 pub fn prepare_certificate_for_benchmark(
1777 &self,
1778 certificate: &VerifiedExecutableTransaction,
1779 input_objects: InputObjects,
1780 epoch_store: &Arc<AuthorityPerEpochStore>,
1781 ) -> IotaResult<(
1782 InnerTemporaryStore,
1783 TransactionEffects,
1784 Option<ExecutionError>,
1785 )> {
1786 let lock = RwLock::new(epoch_store.epoch());
1787 let execution_guard = lock.try_read().unwrap();
1788
1789 self.prepare_certificate(
1790 &execution_guard,
1791 certificate,
1792 input_objects,
1793 None,
1794 None,
1795 epoch_store,
1796 )
1797 }
1798
1799 #[instrument("dry_exec_tx", level = "trace", skip_all)]
1802 #[allow(clippy::type_complexity)]
1803 pub fn dry_exec_transaction(
1804 &self,
1805 transaction: TransactionData,
1806 transaction_digest: TransactionDigest,
1807 ) -> IotaResult<(
1808 DryRunTransactionBlockResponse,
1809 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1810 TransactionEffects,
1811 Option<ObjectID>,
1812 )> {
1813 let epoch_store = self.load_epoch_store_one_call_per_task();
1814 if !self.is_fullnode(&epoch_store) {
1815 return Err(IotaError::UnsupportedFeature {
1816 error: "dry-exec is only supported on fullnodes".to_string(),
1817 });
1818 }
1819
1820 if transaction.kind().is_system_tx() {
1821 return Err(IotaError::UnsupportedFeature {
1822 error: "dry-exec does not support system transactions".to_string(),
1823 });
1824 }
1825
1826 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1827 }
1828
1829 #[allow(clippy::type_complexity)]
1830 pub fn dry_exec_transaction_for_benchmark(
1831 &self,
1832 transaction: TransactionData,
1833 transaction_digest: TransactionDigest,
1834 ) -> IotaResult<(
1835 DryRunTransactionBlockResponse,
1836 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1837 TransactionEffects,
1838 Option<ObjectID>,
1839 )> {
1840 let epoch_store = self.load_epoch_store_one_call_per_task();
1841 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1842 }
1843
1844 #[instrument(level = "trace", skip_all)]
1845 #[allow(clippy::type_complexity)]
1846 fn dry_exec_transaction_impl(
1847 &self,
1848 epoch_store: &AuthorityPerEpochStore,
1849 transaction: TransactionData,
1850 transaction_digest: TransactionDigest,
1851 ) -> IotaResult<(
1852 DryRunTransactionBlockResponse,
1853 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1854 TransactionEffects,
1855 Option<ObjectID>,
1856 )> {
1857 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1859
1860 let input_object_kinds = transaction.input_objects()?;
1861 let receiving_object_refs = transaction.receiving_objects();
1862
1863 iota_transaction_checks::deny::check_transaction_for_signing(
1864 &transaction,
1865 &[],
1866 &input_object_kinds,
1867 &receiving_object_refs,
1868 &self.config.transaction_deny_config,
1869 self.get_backing_package_store().as_ref(),
1870 )?;
1871
1872 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1873 None,
1875 &input_object_kinds,
1876 &receiving_object_refs,
1877 epoch_store.epoch(),
1878 )?;
1879
1880 let mut transaction = transaction;
1882 let mut gas_object_refs = transaction.gas().to_vec();
1883 let reference_gas_price = epoch_store.reference_gas_price();
1884 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1885 let sender = transaction.gas_owner();
1886 let gas_object_id = ObjectID::random();
1887 let gas_object = Object::new_move(
1888 MoveObject::new_gas_coin(
1889 OBJECT_START_VERSION,
1890 gas_object_id,
1891 SIMULATION_GAS_COIN_VALUE,
1892 ),
1893 Owner::AddressOwner(sender),
1894 TransactionDigest::genesis_marker(),
1895 );
1896 let gas_object_ref = gas_object.compute_object_reference();
1897 gas_object_refs = vec![gas_object_ref];
1898 transaction.gas_data_mut().payment = gas_object_refs.clone();
1900 (
1901 iota_transaction_checks::check_transaction_input_with_given_gas(
1902 epoch_store.protocol_config(),
1903 reference_gas_price,
1904 &transaction,
1905 input_objects,
1906 receiving_objects,
1907 gas_object,
1908 &self.metrics.bytecode_verifier_metrics,
1909 &self.config.verifier_signing_config,
1910 )?,
1911 Some(gas_object_id),
1912 )
1913 } else {
1914 let authenticator_gas_budget = 0;
1917
1918 (
1919 iota_transaction_checks::check_transaction_input(
1920 epoch_store.protocol_config(),
1921 reference_gas_price,
1922 &transaction,
1923 input_objects,
1924 &receiving_objects,
1925 &self.metrics.bytecode_verifier_metrics,
1926 &self.config.verifier_signing_config,
1927 authenticator_gas_budget,
1928 )?,
1929 None,
1930 )
1931 };
1932
1933 let protocol_config = epoch_store.protocol_config();
1934 let (kind, signer, _) = transaction.execution_parts();
1935
1936 let silent = true;
1937 let executor = iota_execution::executor(protocol_config, silent, None)
1938 .expect("Creating an executor should not fail here");
1939
1940 let expensive_checks = false;
1941 let (inner_temp_store, _, effects, execution_error) = executor
1942 .execute_transaction_to_effects(
1943 self.get_backing_store().as_ref(),
1944 protocol_config,
1945 self.metrics.limits_metrics.clone(),
1946 expensive_checks,
1947 self.config.certificate_deny_config.certificate_deny_set(),
1948 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1949 epoch_store
1950 .epoch_start_config()
1951 .epoch_data()
1952 .epoch_start_timestamp(),
1953 checked_input_objects,
1954 gas_object_refs,
1955 gas_status,
1956 kind,
1957 signer,
1958 transaction_digest,
1959 &mut None,
1960 );
1961 let tx_digest = *effects.transaction_digest();
1962
1963 let module_cache =
1964 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1965
1966 let mut layout_resolver =
1967 epoch_store
1968 .executor()
1969 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1970 &inner_temp_store,
1971 self.get_backing_package_store(),
1972 )));
1973 let object_changes = Vec::new();
1975
1976 let balance_changes = Vec::new();
1978
1979 let written_with_kind = effects
1980 .created()
1981 .into_iter()
1982 .map(|(oref, _)| (oref, WriteKind::Create))
1983 .chain(
1984 effects
1985 .unwrapped()
1986 .into_iter()
1987 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1988 )
1989 .chain(
1990 effects
1991 .mutated()
1992 .into_iter()
1993 .map(|(oref, _)| (oref, WriteKind::Mutate)),
1994 )
1995 .map(|(oref, kind)| {
1996 let obj = inner_temp_store.written.get(&oref.0).unwrap();
1997 (oref.0, (oref, obj.clone(), kind))
1999 })
2000 .collect();
2001
2002 let execution_error_source = execution_error
2003 .as_ref()
2004 .err()
2005 .and_then(|e| e.source().as_ref().map(|e| e.to_string()));
2006
2007 Ok((
2008 DryRunTransactionBlockResponse {
2009 suggested_gas_price: self
2011 .congestion_tracker
2012 .get_prediction_suggested_gas_price(&transaction),
2013 input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
2014 .map_err(|e| IotaError::TransactionSerialization {
2015 error: format!(
2016 "Failed to convert transaction to IotaTransactionBlockData: {e}",
2017 ),
2018 })?, effects: effects.clone().try_into()?,
2020 events: IotaTransactionBlockEvents::try_from(
2021 inner_temp_store.events.clone(),
2022 tx_digest,
2023 None,
2024 layout_resolver.as_mut(),
2025 )?,
2026 object_changes,
2027 balance_changes,
2028 execution_error_source,
2029 },
2030 written_with_kind,
2031 effects,
2032 mock_gas,
2033 ))
2034 }
2035
2036 pub fn simulate_transaction(
2037 &self,
2038 mut transaction: TransactionData,
2039 checks: VmChecks,
2040 ) -> IotaResult<SimulateTransactionResult> {
2041 if transaction.kind().is_system_tx() {
2042 return Err(IotaError::UnsupportedFeature {
2043 error: "simulate does not support system transactions".to_string(),
2044 });
2045 }
2046
2047 let epoch_store = self.load_epoch_store_one_call_per_task();
2048 if !self.is_fullnode(&epoch_store) {
2049 return Err(IotaError::UnsupportedFeature {
2050 error: "simulate is only supported on fullnodes".to_string(),
2051 });
2052 }
2053
2054 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
2058
2059 let input_object_kinds = transaction.input_objects()?;
2060 let receiving_object_refs = transaction.receiving_objects();
2061
2062 iota_transaction_checks::deny::check_transaction_for_signing(
2065 &transaction,
2066 &[],
2067 &input_object_kinds,
2068 &receiving_object_refs,
2069 &self.config.transaction_deny_config,
2070 self.get_backing_package_store().as_ref(),
2071 )?;
2072
2073 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2075 None,
2077 &input_object_kinds,
2078 &receiving_object_refs,
2079 epoch_store.epoch(),
2080 )?;
2081
2082 let mock_gas_id = if transaction.gas().is_empty() {
2084 let mock_gas_object = Object::new_move(
2085 MoveObject::new_gas_coin(
2086 OBJECT_START_VERSION,
2087 ObjectID::MAX,
2088 SIMULATION_GAS_COIN_VALUE,
2089 ),
2090 Owner::AddressOwner(transaction.gas_data().owner),
2091 TransactionDigest::genesis_marker(),
2092 );
2093 let mock_gas_object_ref = mock_gas_object.compute_object_reference();
2094 transaction.gas_data_mut().payment = vec![mock_gas_object_ref];
2095 input_objects.push(ObjectReadResult::new_from_gas_object(&mock_gas_object));
2096 Some(mock_gas_object.id())
2097 } else {
2098 None
2099 };
2100
2101 let protocol_config = epoch_store.protocol_config();
2102
2103 let authenticator_gas_budget = 0;
2106
2107 let (gas_status, checked_input_objects) = if checks.enabled() {
2110 iota_transaction_checks::check_transaction_input(
2111 protocol_config,
2112 epoch_store.reference_gas_price(),
2113 &transaction,
2114 input_objects,
2115 &receiving_objects,
2116 &self.metrics.bytecode_verifier_metrics,
2117 &self.config.verifier_signing_config,
2118 authenticator_gas_budget,
2119 )?
2120 } else {
2121 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2122 protocol_config,
2123 transaction.kind(),
2124 input_objects,
2125 receiving_objects,
2126 )?;
2127 let gas_status = IotaGasStatus::new(
2128 transaction.gas_budget(),
2129 transaction.gas_price(),
2130 epoch_store.reference_gas_price(),
2131 protocol_config,
2132 )?;
2133
2134 (gas_status, checked_input_objects)
2135 };
2136
2137 let executor = iota_execution::executor(
2139 protocol_config,
2140 true, None,
2142 )
2143 .expect("Creating an executor should not fail here");
2144
2145 let (kind, signer, gas_coins) = transaction.execution_parts();
2147 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2148 self.get_backing_store().as_ref(),
2149 protocol_config,
2150 self.metrics.limits_metrics.clone(),
2151 false, self.config.certificate_deny_config.certificate_deny_set(),
2153 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2154 epoch_store
2155 .epoch_start_config()
2156 .epoch_data()
2157 .epoch_start_timestamp(),
2158 checked_input_objects,
2159 gas_coins,
2160 gas_status,
2161 kind,
2162 signer,
2163 transaction.digest(),
2164 checks.disabled(),
2165 );
2166
2167 Ok(SimulateTransactionResult {
2170 input_objects: inner_temp_store.input_objects,
2171 output_objects: inner_temp_store.written,
2172 events: effects.events_digest().map(|_| inner_temp_store.events),
2173 effects,
2174 execution_result,
2175 mock_gas_id,
2176 })
2177 }
2178
2179 #[instrument("dev_inspect_tx", level = "trace", skip_all)]
2184 pub async fn dev_inspect_transaction_block(
2185 &self,
2186 sender: IotaAddress,
2187 transaction_kind: TransactionKind,
2188 gas_price: Option<u64>,
2189 gas_budget: Option<u64>,
2190 gas_sponsor: Option<IotaAddress>,
2191 gas_objects: Option<Vec<ObjectRef>>,
2192 show_raw_txn_data_and_effects: Option<bool>,
2193 skip_checks: Option<bool>,
2194 ) -> IotaResult<DevInspectResults> {
2195 let epoch_store = self.load_epoch_store_one_call_per_task();
2196
2197 if !self.is_fullnode(&epoch_store) {
2198 return Err(IotaError::UnsupportedFeature {
2199 error: "dev-inspect is only supported on fullnodes".to_string(),
2200 });
2201 }
2202
2203 if transaction_kind.is_system_tx() {
2204 return Err(IotaError::UnsupportedFeature {
2205 error: "system transactions are not supported".to_string(),
2206 });
2207 }
2208
2209 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2210 let skip_checks = skip_checks.unwrap_or(true);
2211 let reference_gas_price = epoch_store.reference_gas_price();
2212 let protocol_config = epoch_store.protocol_config();
2213 let max_tx_gas = protocol_config.max_tx_gas();
2214
2215 let price = gas_price.unwrap_or(reference_gas_price);
2216 let budget = gas_budget.unwrap_or(max_tx_gas);
2217 let owner = gas_sponsor.unwrap_or(sender);
2218 let payment = gas_objects.unwrap_or_default();
2221 let transaction = TransactionData::V1(TransactionDataV1 {
2222 kind: transaction_kind.clone(),
2223 sender,
2224 gas_data: GasData {
2225 payment,
2226 owner,
2227 price,
2228 budget,
2229 },
2230 expiration: TransactionExpiration::None,
2231 });
2232
2233 let raw_txn_data = if show_raw_txn_data_and_effects {
2234 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2235 error: "Failed to serialize transaction during dev inspect".to_string(),
2236 })?
2237 } else {
2238 vec![]
2239 };
2240
2241 transaction.validity_check_no_gas_check(protocol_config)?;
2242
2243 let input_object_kinds = transaction.input_objects()?;
2244 let receiving_object_refs = transaction.receiving_objects();
2245
2246 iota_transaction_checks::deny::check_transaction_for_signing(
2247 &transaction,
2248 &[],
2249 &input_object_kinds,
2250 &receiving_object_refs,
2251 &self.config.transaction_deny_config,
2252 self.get_backing_package_store().as_ref(),
2253 )?;
2254
2255 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2256 None,
2258 &input_object_kinds,
2259 &receiving_object_refs,
2260 epoch_store.epoch(),
2261 )?;
2262
2263 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2265 SIMULATION_GAS_COIN_VALUE,
2266 transaction.gas_owner(),
2267 );
2268
2269 let gas_objects = if transaction.gas().is_empty() {
2270 let gas_object_ref = dummy_gas_object.compute_object_reference();
2271 vec![gas_object_ref]
2272 } else {
2273 transaction.gas().to_vec()
2274 };
2275
2276 let (gas_status, checked_input_objects) = if skip_checks {
2277 if transaction.gas().is_empty() {
2282 input_objects.push(ObjectReadResult::new(
2283 InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2284 dummy_gas_object.into(),
2285 ));
2286 }
2287 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2288 protocol_config,
2289 &transaction_kind,
2290 input_objects,
2291 receiving_objects,
2292 )?;
2293 let gas_status = IotaGasStatus::new(
2294 max_tx_gas,
2295 transaction.gas_price(),
2296 reference_gas_price,
2297 protocol_config,
2298 )?;
2299
2300 (gas_status, checked_input_objects)
2301 } else {
2302 if transaction.gas().is_empty() {
2306 iota_transaction_checks::check_transaction_input_with_given_gas(
2307 epoch_store.protocol_config(),
2308 reference_gas_price,
2309 &transaction,
2310 input_objects,
2311 receiving_objects,
2312 dummy_gas_object,
2313 &self.metrics.bytecode_verifier_metrics,
2314 &self.config.verifier_signing_config,
2315 )?
2316 } else {
2317 let authenticator_gas_budget = 0;
2320
2321 iota_transaction_checks::check_transaction_input(
2322 epoch_store.protocol_config(),
2323 reference_gas_price,
2324 &transaction,
2325 input_objects,
2326 &receiving_objects,
2327 &self.metrics.bytecode_verifier_metrics,
2328 &self.config.verifier_signing_config,
2329 authenticator_gas_budget,
2330 )?
2331 }
2332 };
2333
2334 let executor = iota_execution::executor(protocol_config, true, None)
2335 .expect("Creating an executor should not fail here");
2336 let intent_msg = IntentMessage::new(
2337 Intent {
2338 version: IntentVersion::V0,
2339 scope: IntentScope::TransactionData,
2340 app_id: IntentAppId::Iota,
2341 },
2342 transaction,
2343 );
2344 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2345 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2346 self.get_backing_store().as_ref(),
2347 protocol_config,
2348 self.metrics.limits_metrics.clone(),
2349 false,
2351 self.config.certificate_deny_config.certificate_deny_set(),
2352 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2353 epoch_store
2354 .epoch_start_config()
2355 .epoch_data()
2356 .epoch_start_timestamp(),
2357 checked_input_objects,
2358 gas_objects,
2359 gas_status,
2360 transaction_kind,
2361 sender,
2362 transaction_digest,
2363 skip_checks,
2364 );
2365
2366 let raw_effects = if show_raw_txn_data_and_effects {
2367 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2368 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2369 })?
2370 } else {
2371 vec![]
2372 };
2373
2374 let mut layout_resolver =
2375 epoch_store
2376 .executor()
2377 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2378 &inner_temp_store,
2379 self.get_backing_package_store(),
2380 )));
2381
2382 DevInspectResults::new(
2383 effects,
2384 inner_temp_store.events.clone(),
2385 execution_result,
2386 raw_txn_data,
2387 raw_effects,
2388 layout_resolver.as_mut(),
2389 )
2390 }
2391
2392 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2394 let epoch_store = self.epoch_store_for_testing();
2395 Ok(epoch_store.reference_gas_price())
2396 }
2397
2398 #[instrument(level = "trace", skip_all)]
2399 pub fn try_is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2400 self.get_transaction_cache_reader()
2401 .try_is_tx_already_executed(digest)
2402 }
2403
2404 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> bool {
2406 self.try_is_tx_already_executed(digest)
2407 .expect("storage access failed")
2408 }
2409
2410 #[instrument(level = "debug", skip_all, err)]
2412 fn index_tx(
2413 &self,
2414 indexes: &IndexStore,
2415 digest: &TransactionDigest,
2416 cert: &VerifiedExecutableTransaction,
2418 effects: &TransactionEffects,
2419 events: &TransactionEvents,
2420 timestamp_ms: u64,
2421 tx_coins: Option<TxCoins>,
2422 written: &WrittenObjects,
2423 inner_temporary_store: &InnerTemporaryStore,
2424 ) -> IotaResult<u64> {
2425 let changes = self
2426 .process_object_index(effects, written, inner_temporary_store)
2427 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2428
2429 indexes.index_tx(
2430 cert.data().intent_message().value.sender(),
2431 cert.data()
2432 .intent_message()
2433 .value
2434 .input_objects()?
2435 .iter()
2436 .map(|o| o.object_id()),
2437 effects
2438 .all_changed_objects()
2439 .into_iter()
2440 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2441 cert.data()
2442 .intent_message()
2443 .value
2444 .move_calls()
2445 .into_iter()
2446 .map(|(package, module, function)| {
2447 (*package, module.to_owned(), function.to_owned())
2448 }),
2449 events,
2450 changes,
2451 digest,
2452 timestamp_ms,
2453 tx_coins,
2454 )
2455 }
2456
2457 #[cfg(msim)]
2458 fn create_fail_state(
2459 &self,
2460 certificate: &VerifiedExecutableTransaction,
2461 epoch_store: &Arc<AuthorityPerEpochStore>,
2462 effects: &mut TransactionEffects,
2463 ) {
2464 use std::cell::RefCell;
2465 thread_local! {
2466 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2467 }
2468 if !certificate.data().intent_message().value.is_system_tx() {
2469 let committee = epoch_store.committee();
2470 let cur_stake = (**committee).weight(&self.name);
2471 if cur_stake > 0 {
2472 FAIL_STATE.with_borrow_mut(|fail_state| {
2473 if fail_state.0 < committee.validity_threshold() {
2475 fail_state.0 += cur_stake;
2476 fail_state.1.insert(self.name);
2477 }
2478
2479 if fail_state.1.contains(&self.name) {
2480 info!("cp_exec failing tx");
2481 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2482 }
2483 });
2484 }
2485 }
2486 }
2487
2488 fn process_object_index(
2489 &self,
2490 effects: &TransactionEffects,
2491 written: &WrittenObjects,
2492 inner_temporary_store: &InnerTemporaryStore,
2493 ) -> IotaResult<ObjectIndexChanges> {
2494 let epoch_store = self.load_epoch_store_one_call_per_task();
2495 let mut layout_resolver =
2496 epoch_store
2497 .executor()
2498 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2499 inner_temporary_store,
2500 self.get_backing_package_store(),
2501 )));
2502
2503 let modified_at_version = effects
2504 .modified_at_versions()
2505 .into_iter()
2506 .collect::<HashMap<_, _>>();
2507
2508 let tx_digest = effects.transaction_digest();
2509 let mut deleted_owners = vec![];
2510 let mut deleted_dynamic_fields = vec![];
2511 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2512 let old_version = modified_at_version.get(&id).unwrap();
2513 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2516 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2517 ) {
2518 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2519 Owner::ObjectOwner(object_id) => {
2520 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2521 }
2522 _ => {}
2523 }
2524 }
2525
2526 let mut new_owners = vec![];
2527 let mut new_dynamic_fields = vec![];
2528
2529 for (oref, owner, kind) in effects.all_changed_objects() {
2530 let id = &oref.0;
2531 if let WriteKind::Mutate = kind {
2534 let Some(old_version) = modified_at_version.get(id) else {
2535 panic!(
2536 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2537 );
2538 };
2539 let Some(old_object) = self
2542 .get_object_store()
2543 .try_get_object_by_key(id, *old_version)?
2544 else {
2545 panic!(
2546 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2547 );
2548 };
2549 if old_object.owner != owner {
2550 match old_object.owner {
2551 Owner::AddressOwner(addr) => {
2552 deleted_owners.push((addr, *id));
2553 }
2554 Owner::ObjectOwner(object_id) => {
2555 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2556 }
2557 _ => {}
2558 }
2559 }
2560 }
2561
2562 match owner {
2563 Owner::AddressOwner(addr) => {
2564 let new_object = written.get(id).unwrap_or_else(
2567 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2568 );
2569 assert_eq!(
2570 new_object.version(),
2571 oref.1,
2572 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2573 tx_digest,
2574 id,
2575 new_object.version(),
2576 oref.1
2577 );
2578
2579 let type_ = new_object
2580 .type_()
2581 .map(|type_| ObjectType::Struct(type_.clone()))
2582 .unwrap_or(ObjectType::Package);
2583
2584 new_owners.push((
2585 (addr, *id),
2586 ObjectInfo {
2587 object_id: *id,
2588 version: oref.1,
2589 digest: oref.2,
2590 type_,
2591 owner,
2592 previous_transaction: *effects.transaction_digest(),
2593 },
2594 ));
2595 }
2596 Owner::ObjectOwner(owner) => {
2597 let new_object = written.get(id).unwrap_or_else(
2598 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2599 );
2600 assert_eq!(
2601 new_object.version(),
2602 oref.1,
2603 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2604 tx_digest,
2605 id,
2606 new_object.version(),
2607 oref.1
2608 );
2609
2610 let Some(df_info) = self
2611 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2612 .unwrap_or_else(|e| {
2613 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2614 None
2615 }
2616 )
2617 else {
2618 continue;
2620 };
2621 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2622 }
2623 _ => {}
2624 }
2625 }
2626
2627 Ok(ObjectIndexChanges {
2628 deleted_owners,
2629 deleted_dynamic_fields,
2630 new_owners,
2631 new_dynamic_fields,
2632 })
2633 }
2634
2635 fn try_create_dynamic_field_info(
2636 &self,
2637 o: &Object,
2638 written: &WrittenObjects,
2639 resolver: &mut dyn LayoutResolver,
2640 ) -> IotaResult<Option<DynamicFieldInfo>> {
2641 let Some(move_object) = o.data.try_as_move().cloned() else {
2643 return Ok(None);
2644 };
2645
2646 if !move_object.type_().is_dynamic_field() {
2648 return Ok(None);
2649 }
2650
2651 let layout = resolver
2652 .get_annotated_layout(&move_object.type_().clone().into())?
2653 .into_layout();
2654
2655 let field =
2656 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2657 IotaError::ObjectDeserialization {
2658 error: e.to_string(),
2659 }
2660 })?;
2661
2662 let type_ = field.kind;
2663 let name_type: TypeTag = field.name_layout.into();
2664 let bcs_name = field.name_bytes.to_owned();
2665
2666 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2667 .map_err(|e| {
2668 warn!("{e}");
2669 IotaError::ObjectDeserialization {
2670 error: e.to_string(),
2671 }
2672 })?;
2673
2674 let name = DynamicFieldName {
2675 type_: name_type,
2676 value: IotaMoveValue::from(name_value).to_json_value(),
2677 };
2678
2679 let value_metadata = field.value_metadata().map_err(|e| {
2680 warn!("{e}");
2681 IotaError::ObjectDeserialization {
2682 error: e.to_string(),
2683 }
2684 })?;
2685
2686 Ok(Some(match value_metadata {
2687 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2688 name,
2689 bcs_name,
2690 type_,
2691 object_type: object_type.to_canonical_string(true),
2692 object_id: o.id(),
2693 version: o.version(),
2694 digest: o.digest(),
2695 },
2696
2697 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2698 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2703 (
2704 object.version(),
2705 object.digest(),
2706 object.data.type_().unwrap().clone(),
2707 )
2708 } else {
2709 let object = self
2711 .get_object_store()
2712 .try_get_object_by_key(&object_id, o.version())?
2713 .ok_or_else(|| UserInputError::ObjectNotFound {
2714 object_id,
2715 version: Some(o.version()),
2716 })?;
2717 let version = object.version();
2718 let digest = object.digest();
2719 let object_type = object.data.type_().unwrap().clone();
2720 (version, digest, object_type)
2721 };
2722
2723 DynamicFieldInfo {
2724 name,
2725 bcs_name,
2726 type_,
2727 object_type: object_type.to_string(),
2728 object_id,
2729 version,
2730 digest,
2731 }
2732 }
2733 }))
2734 }
2735
2736 #[instrument(level = "trace", skip_all, err)]
2737 fn post_process_one_tx(
2738 &self,
2739 certificate: &VerifiedExecutableTransaction,
2740 effects: &TransactionEffects,
2741 inner_temporary_store: &InnerTemporaryStore,
2742 epoch_store: &Arc<AuthorityPerEpochStore>,
2743 ) -> IotaResult {
2744 if self.indexes.is_none() {
2745 return Ok(());
2746 }
2747
2748 let tx_digest = certificate.digest();
2749 let timestamp_ms = Self::unixtime_now_ms();
2750 let events = &inner_temporary_store.events;
2751 let written = &inner_temporary_store.written;
2752 let tx_coins =
2753 self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2754
2755 if let Some(indexes) = &self.indexes {
2757 let _ = self
2758 .index_tx(
2759 indexes.as_ref(),
2760 tx_digest,
2761 certificate,
2762 effects,
2763 events,
2764 timestamp_ms,
2765 tx_coins,
2766 written,
2767 inner_temporary_store,
2768 )
2769 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2770 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2771 .expect("Indexing tx should not fail");
2772
2773 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2774 let events = self.make_transaction_block_events(
2775 events.clone(),
2776 *tx_digest,
2777 timestamp_ms,
2778 epoch_store,
2779 inner_temporary_store,
2780 )?;
2781 self.subscription_handler
2783 .process_tx(certificate.data().transaction_data(), &effects, &events)
2784 .tap_ok(|_| {
2785 self.metrics
2786 .post_processing_total_tx_had_event_processed
2787 .inc()
2788 })
2789 .tap_err(|e| {
2790 warn!(
2791 ?tx_digest,
2792 "Post processing - Couldn't process events for tx: {}", e
2793 )
2794 })?;
2795
2796 self.metrics
2797 .post_processing_total_events_emitted
2798 .inc_by(events.data.len() as u64);
2799 };
2800 Ok(())
2801 }
2802
2803 fn make_transaction_block_events(
2804 &self,
2805 transaction_events: TransactionEvents,
2806 digest: TransactionDigest,
2807 timestamp_ms: u64,
2808 epoch_store: &Arc<AuthorityPerEpochStore>,
2809 inner_temporary_store: &InnerTemporaryStore,
2810 ) -> IotaResult<IotaTransactionBlockEvents> {
2811 let mut layout_resolver =
2812 epoch_store
2813 .executor()
2814 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2815 inner_temporary_store,
2816 self.get_backing_package_store(),
2817 )));
2818 IotaTransactionBlockEvents::try_from(
2819 transaction_events,
2820 digest,
2821 Some(timestamp_ms),
2822 layout_resolver.as_mut(),
2823 )
2824 }
2825
2826 pub fn unixtime_now_ms() -> u64 {
2827 let now = SystemTime::now()
2828 .duration_since(UNIX_EPOCH)
2829 .expect("Time went backwards")
2830 .as_millis();
2831 u64::try_from(now).expect("Travelling in time machine")
2832 }
2833
2834 #[instrument(level = "trace", skip_all)]
2835 pub async fn handle_transaction_info_request(
2836 &self,
2837 request: TransactionInfoRequest,
2838 ) -> IotaResult<TransactionInfoResponse> {
2839 let epoch_store = self.load_epoch_store_one_call_per_task();
2840 let (transaction, status) = self
2841 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2842 .ok_or(IotaError::TransactionNotFound {
2843 digest: request.transaction_digest,
2844 })?;
2845 Ok(TransactionInfoResponse {
2846 transaction,
2847 status,
2848 })
2849 }
2850
2851 #[instrument(level = "trace", skip_all)]
2852 pub async fn handle_object_info_request(
2853 &self,
2854 request: ObjectInfoRequest,
2855 ) -> IotaResult<ObjectInfoResponse> {
2856 let epoch_store = self.load_epoch_store_one_call_per_task();
2857
2858 let requested_object_seq = match request.request_kind {
2859 ObjectInfoRequestKind::LatestObjectInfo => {
2860 let (_, seq, _) = self
2861 .try_get_object_or_tombstone(request.object_id)
2862 .await?
2863 .ok_or_else(|| {
2864 IotaError::from(UserInputError::ObjectNotFound {
2865 object_id: request.object_id,
2866 version: None,
2867 })
2868 })?;
2869 seq
2870 }
2871 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2872 };
2873
2874 let object = self
2875 .get_object_store()
2876 .try_get_object_by_key(&request.object_id, requested_object_seq)?
2877 .ok_or_else(|| {
2878 IotaError::from(UserInputError::ObjectNotFound {
2879 object_id: request.object_id,
2880 version: Some(requested_object_seq),
2881 })
2882 })?;
2883
2884 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2885 (request.generate_layout, object.data.try_as_move())
2886 {
2887 Some(into_struct_layout(
2888 epoch_store
2889 .executor()
2890 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2891 .get_annotated_layout(&move_obj.type_().clone().into())?,
2892 )?)
2893 } else {
2894 None
2895 };
2896
2897 let lock = if !object.is_address_owned() {
2898 None
2900 } else {
2901 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2902 .await?
2903 .map(|s| s.into_inner())
2904 };
2905
2906 Ok(ObjectInfoResponse {
2907 object,
2908 layout,
2909 lock_for_debugging: lock,
2910 })
2911 }
2912
2913 #[instrument(level = "trace", skip_all)]
2914 pub fn handle_checkpoint_request(
2915 &self,
2916 request: &CheckpointRequest,
2917 ) -> IotaResult<CheckpointResponse> {
2918 let summary = if request.certified {
2919 let summary = match request.sequence_number {
2920 Some(seq) => self
2921 .checkpoint_store
2922 .get_checkpoint_by_sequence_number(seq)?,
2923 None => self.checkpoint_store.get_latest_certified_checkpoint()?,
2924 }
2925 .map(|v| v.into_inner());
2926 summary.map(CheckpointSummaryResponse::Certified)
2927 } else {
2928 let summary = match request.sequence_number {
2929 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2930 None => self
2931 .checkpoint_store
2932 .get_latest_locally_computed_checkpoint()?,
2933 };
2934 summary.map(CheckpointSummaryResponse::Pending)
2935 };
2936 let contents = match &summary {
2937 Some(s) => self
2938 .checkpoint_store
2939 .get_checkpoint_contents(&s.content_digest())?,
2940 None => None,
2941 };
2942 Ok(CheckpointResponse {
2943 checkpoint: summary,
2944 contents,
2945 })
2946 }
2947
2948 fn check_protocol_version(
2949 supported_protocol_versions: SupportedProtocolVersions,
2950 current_version: ProtocolVersion,
2951 ) {
2952 info!("current protocol version is now {:?}", current_version);
2953 info!("supported versions are: {:?}", supported_protocol_versions);
2954 if !supported_protocol_versions.is_version_supported(current_version) {
2955 let msg = format!(
2956 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2957 );
2958
2959 error!("{}", msg);
2960 eprintln!("{msg}");
2961
2962 #[cfg(not(msim))]
2963 std::process::exit(1);
2964
2965 #[cfg(msim)]
2966 iota_simulator::task::shutdown_current_node();
2967 }
2968 }
2969
2970 #[expect(clippy::disallowed_methods)] pub async fn new(
2972 name: AuthorityName,
2973 secret: StableSyncAuthoritySigner,
2974 supported_protocol_versions: SupportedProtocolVersions,
2975 store: Arc<AuthorityStore>,
2976 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2977 epoch_store: Arc<AuthorityPerEpochStore>,
2978 committee_store: Arc<CommitteeStore>,
2979 indexes: Option<Arc<IndexStore>>,
2980 rest_index: Option<Arc<RestIndexStore>>,
2981 checkpoint_store: Arc<CheckpointStore>,
2982 prometheus_registry: &Registry,
2983 genesis_objects: &[Object],
2984 db_checkpoint_config: &DBCheckpointConfig,
2985 config: NodeConfig,
2986 archive_readers: ArchiveReaderBalancer,
2987 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2988 chain_identifier: ChainIdentifier,
2989 pruner_db: Option<Arc<AuthorityPrunerTables>>,
2990 ) -> Arc<Self> {
2991 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2992
2993 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2994 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2995 let transaction_manager = Arc::new(TransactionManager::new(
2996 execution_cache_trait_pointers.object_cache_reader.clone(),
2997 execution_cache_trait_pointers
2998 .transaction_cache_reader
2999 .clone(),
3000 &epoch_store,
3001 tx_ready_certificates,
3002 metrics.clone(),
3003 ));
3004 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
3005
3006 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
3007 epoch_store.get_parent_path(),
3008 &config.authority_store_pruning_config,
3009 );
3010 let _pruner = AuthorityStorePruner::new(
3011 store.perpetual_tables.clone(),
3012 checkpoint_store.clone(),
3013 rest_index.clone(),
3014 indexes.clone(),
3015 config.authority_store_pruning_config.clone(),
3016 epoch_store.committee().authority_exists(&name),
3017 epoch_store.epoch_start_state().epoch_duration_ms(),
3018 prometheus_registry,
3019 archive_readers,
3020 pruner_db,
3021 );
3022 let input_loader =
3023 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
3024 let epoch = epoch_store.epoch();
3025 let rgp = epoch_store.reference_gas_price();
3026 let state = Arc::new(AuthorityState {
3027 name,
3028 secret,
3029 execution_lock: RwLock::new(epoch),
3030 epoch_store: ArcSwap::new(epoch_store.clone()),
3031 input_loader,
3032 execution_cache_trait_pointers,
3033 indexes,
3034 rest_index,
3035 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
3036 checkpoint_store,
3037 committee_store,
3038 transaction_manager,
3039 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
3040 metrics,
3041 _pruner,
3042 _authority_per_epoch_pruner,
3043 db_checkpoint_config: db_checkpoint_config.clone(),
3044 config,
3045 overload_info: AuthorityOverloadInfo::default(),
3046 validator_tx_finalizer,
3047 chain_identifier,
3048 congestion_tracker: Arc::new(CongestionTracker::new(rgp)),
3049 });
3050
3051 let authority_state = Arc::downgrade(&state);
3053 spawn_monitored_task!(execution_process(
3054 authority_state,
3055 rx_ready_certificates,
3056 rx_execution_shutdown,
3057 ));
3058
3059 state
3061 .create_owner_index_if_empty(genesis_objects, &epoch_store)
3062 .expect("Error indexing genesis objects.");
3063
3064 state
3065 }
3066
3067 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
3069 &self.execution_cache_trait_pointers.object_cache_reader
3070 }
3071
3072 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
3073 &self.execution_cache_trait_pointers.transaction_cache_reader
3074 }
3075
3076 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
3077 &self.execution_cache_trait_pointers.cache_writer
3078 }
3079
3080 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
3081 &self.execution_cache_trait_pointers.backing_store
3082 }
3083
3084 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
3085 &self.execution_cache_trait_pointers.backing_package_store
3086 }
3087
3088 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
3089 &self.execution_cache_trait_pointers.object_store
3090 }
3091
3092 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
3093 &self.execution_cache_trait_pointers.reconfig_api
3094 }
3095
3096 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
3097 &self.execution_cache_trait_pointers.accumulator_store
3098 }
3099
3100 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
3101 &self.execution_cache_trait_pointers.checkpoint_cache
3102 }
3103
3104 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
3105 &self.execution_cache_trait_pointers.state_sync_store
3106 }
3107
3108 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
3109 &self.execution_cache_trait_pointers.cache_commit
3110 }
3111
3112 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
3113 self.execution_cache_trait_pointers
3114 .testing_api
3115 .database_for_testing()
3116 }
3117
3118 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
3119 &self,
3120 config: NodeConfig,
3121 metrics: Arc<AuthorityStorePruningMetrics>,
3122 ) -> anyhow::Result<()> {
3123 let archive_readers =
3124 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
3125 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
3126 &self.database_for_testing().perpetual_tables,
3127 &self.checkpoint_store,
3128 self.rest_index.as_deref(),
3129 None,
3130 config.authority_store_pruning_config,
3131 metrics,
3132 archive_readers,
3133 EPOCH_DURATION_MS_FOR_TESTING,
3134 )
3135 .await
3136 }
3137
3138 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
3139 &self.transaction_manager
3140 }
3141
3142 pub fn enqueue_transactions_for_execution(
3145 &self,
3146 txns: Vec<VerifiedExecutableTransaction>,
3147 epoch_store: &Arc<AuthorityPerEpochStore>,
3148 ) {
3149 self.transaction_manager.enqueue(txns, epoch_store)
3150 }
3151
3152 pub fn enqueue_certificates_for_execution(
3154 &self,
3155 certs: Vec<VerifiedCertificate>,
3156 epoch_store: &Arc<AuthorityPerEpochStore>,
3157 ) {
3158 self.transaction_manager
3159 .enqueue_certificates(certs, epoch_store)
3160 }
3161
3162 pub fn enqueue_with_expected_effects_digest(
3163 &self,
3164 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
3165 epoch_store: &AuthorityPerEpochStore,
3166 ) {
3167 self.transaction_manager
3168 .enqueue_with_expected_effects_digest(certs, epoch_store)
3169 }
3170
3171 fn create_owner_index_if_empty(
3172 &self,
3173 genesis_objects: &[Object],
3174 epoch_store: &Arc<AuthorityPerEpochStore>,
3175 ) -> IotaResult {
3176 let Some(index_store) = &self.indexes else {
3177 return Ok(());
3178 };
3179 if !index_store.is_empty() {
3180 return Ok(());
3181 }
3182
3183 let mut new_owners = vec![];
3184 let mut new_dynamic_fields = vec![];
3185 let mut layout_resolver = epoch_store
3186 .executor()
3187 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3188 for o in genesis_objects.iter() {
3189 match o.owner {
3190 Owner::AddressOwner(addr) => new_owners.push((
3191 (addr, o.id()),
3192 ObjectInfo::new(&o.compute_object_reference(), o),
3193 )),
3194 Owner::ObjectOwner(object_id) => {
3195 let id = o.id();
3196 let info = match self.try_create_dynamic_field_info(
3197 o,
3198 &BTreeMap::new(),
3199 layout_resolver.as_mut(),
3200 ) {
3201 Ok(Some(info)) => info,
3202 Ok(None) => continue,
3203 Err(IotaError::UserInput {
3204 error:
3205 UserInputError::ObjectNotFound {
3206 object_id: not_found_id,
3207 version,
3208 },
3209 }) => {
3210 warn!(
3211 ?not_found_id,
3212 ?version,
3213 object_owner=?object_id,
3214 field=?id,
3215 "Skipping dynamic field: referenced genesis object not found"
3216 );
3217 continue;
3218 }
3219 Err(e) => return Err(e),
3220 };
3221 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3222 }
3223 _ => {}
3224 }
3225 }
3226
3227 index_store.insert_genesis_objects(ObjectIndexChanges {
3228 deleted_owners: vec![],
3229 deleted_dynamic_fields: vec![],
3230 new_owners,
3231 new_dynamic_fields,
3232 })
3233 }
3234
3235 pub fn execution_lock_for_executable_transaction(
3239 &self,
3240 transaction: &VerifiedExecutableTransaction,
3241 ) -> IotaResult<ExecutionLockReadGuard<'_>> {
3242 let lock = self
3243 .execution_lock
3244 .try_read()
3245 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)?;
3246 if *lock == transaction.auth_sig().epoch() {
3247 Ok(lock)
3248 } else {
3249 Err(IotaError::WrongEpoch {
3250 expected_epoch: *lock,
3251 actual_epoch: transaction.auth_sig().epoch(),
3252 })
3253 }
3254 }
3255
3256 pub fn execution_lock_for_signing(&self) -> IotaResult<ExecutionLockReadGuard<'_>> {
3262 self.execution_lock
3263 .try_read()
3264 .map_err(|_| IotaError::ValidatorHaltedAtEpochEnd)
3265 }
3266
3267 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard<'_> {
3268 self.execution_lock.write().await
3269 }
3270
3271 #[instrument(level = "error", skip_all)]
3272 pub async fn reconfigure(
3273 &self,
3274 cur_epoch_store: &AuthorityPerEpochStore,
3275 supported_protocol_versions: SupportedProtocolVersions,
3276 new_committee: Committee,
3277 epoch_start_configuration: EpochStartConfiguration,
3278 accumulator: Arc<StateAccumulator>,
3279 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3280 epoch_supply_change: i64,
3281 epoch_last_checkpoint: CheckpointSequenceNumber,
3282 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3283 Self::check_protocol_version(
3284 supported_protocol_versions,
3285 epoch_start_configuration
3286 .epoch_start_state()
3287 .protocol_version(),
3288 );
3289 self.metrics.reset_on_reconfigure();
3290 self.committee_store.insert_new_committee(&new_committee)?;
3291
3292 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3294
3295 cur_epoch_store.epoch_terminated().await;
3297
3298 let highest_locally_built_checkpoint_seq = self
3299 .checkpoint_store
3300 .get_latest_locally_computed_checkpoint()?
3301 .map(|c| *c.sequence_number())
3302 .unwrap_or(0);
3303
3304 assert!(
3305 epoch_last_checkpoint >= highest_locally_built_checkpoint_seq,
3306 "expected {epoch_last_checkpoint} >= {highest_locally_built_checkpoint_seq}"
3307 );
3308 if highest_locally_built_checkpoint_seq == epoch_last_checkpoint {
3309 let num_shared_version_assignments = cur_epoch_store.num_shared_version_assignments();
3313 if num_shared_version_assignments > 1 {
3316 debug_fatal!(
3318 "all shared_version_assignments should have been removed \
3319 (num_shared_version_assignments: {num_shared_version_assignments})"
3320 );
3321 }
3322 }
3323
3324 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3330 .await?;
3331 self.get_reconfig_api()
3332 .clear_state_end_of_epoch(&execution_lock);
3333 self.check_system_consistency(
3334 cur_epoch_store,
3335 accumulator,
3336 expensive_safety_check_config,
3337 epoch_supply_change,
3338 )?;
3339 self.get_reconfig_api()
3340 .try_set_epoch_start_configuration(&epoch_start_configuration)?;
3341 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3342 if self
3343 .db_checkpoint_config
3344 .perform_db_checkpoints_at_epoch_end
3345 {
3346 let checkpoint_indexes = self
3347 .db_checkpoint_config
3348 .perform_index_db_checkpoints_at_epoch_end
3349 .unwrap_or(false);
3350 let current_epoch = cur_epoch_store.epoch();
3351 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3352 self.checkpoint_all_dbs(
3353 &epoch_checkpoint_path,
3354 cur_epoch_store,
3355 checkpoint_indexes,
3356 )?;
3357 }
3358 }
3359
3360 self.get_reconfig_api()
3361 .reconfigure_cache(&epoch_start_configuration)
3362 .await;
3363
3364 let new_epoch = new_committee.epoch;
3365 let new_epoch_store = self
3366 .reopen_epoch_db(
3367 cur_epoch_store,
3368 new_committee,
3369 epoch_start_configuration,
3370 expensive_safety_check_config,
3371 epoch_last_checkpoint,
3372 )
3373 .await?;
3374 assert_eq!(new_epoch_store.epoch(), new_epoch);
3375 self.transaction_manager.reconfigure(new_epoch);
3376 *execution_lock = new_epoch;
3377 Ok(new_epoch_store)
3381 }
3382
3383 pub async fn reconfigure_for_testing(&self) {
3388 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3389 let epoch_store = self.epoch_store_for_testing().clone();
3390 let protocol_config = epoch_store.protocol_config().clone();
3391 let _guard =
3399 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3400 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3401 self.get_backing_package_store().clone(),
3402 self.get_object_store().clone(),
3403 &self.config.expensive_safety_check_config,
3404 self.checkpoint_store
3405 .get_epoch_last_checkpoint(epoch_store.epoch())
3406 .unwrap()
3407 .map(|c| *c.sequence_number())
3408 .unwrap_or_default(),
3409 );
3410 let new_epoch = new_epoch_store.epoch();
3411 self.transaction_manager.reconfigure(new_epoch);
3412 self.epoch_store.store(new_epoch_store);
3413 epoch_store.epoch_terminated().await;
3414 *execution_lock = new_epoch;
3415 }
3416
3417 #[instrument(level = "error", skip_all)]
3418 fn check_system_consistency(
3419 &self,
3420 cur_epoch_store: &AuthorityPerEpochStore,
3421 accumulator: Arc<StateAccumulator>,
3422 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3423 epoch_supply_change: i64,
3424 ) -> IotaResult<()> {
3425 info!(
3426 "Performing iota conservation consistency check for epoch {}",
3427 cur_epoch_store.epoch()
3428 );
3429
3430 if cfg!(debug_assertions) {
3431 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3432 }
3433
3434 self.get_reconfig_api()
3435 .try_expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3436
3437 if expensive_safety_check_config.enable_state_consistency_check() {
3439 info!(
3440 "Performing state consistency check for epoch {}",
3441 cur_epoch_store.epoch()
3442 );
3443 self.expensive_check_is_consistent_state(
3444 accumulator,
3445 cur_epoch_store,
3446 cfg!(debug_assertions), );
3448 }
3449
3450 if expensive_safety_check_config.enable_secondary_index_checks() {
3451 if let Some(indexes) = self.indexes.clone() {
3452 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3453 .expect("secondary indexes are inconsistent");
3454 }
3455 }
3456
3457 Ok(())
3458 }
3459
3460 fn expensive_check_is_consistent_state(
3461 &self,
3462 accumulator: Arc<StateAccumulator>,
3463 cur_epoch_store: &AuthorityPerEpochStore,
3464 panic: bool,
3465 ) {
3466 let live_object_set_hash = accumulator.digest_live_object_set();
3467
3468 let root_state_hash: ECMHLiveObjectSetDigest = self
3469 .get_accumulator_store()
3470 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3471 .expect("Retrieving root state hash cannot fail")
3472 .expect("Root state hash for epoch must exist")
3473 .1
3474 .digest()
3475 .into();
3476
3477 let is_inconsistent = root_state_hash != live_object_set_hash;
3478 if is_inconsistent {
3479 if panic {
3480 panic!(
3481 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3482 );
3483 } else {
3484 error!(
3485 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3486 root_state_hash, live_object_set_hash
3487 );
3488 }
3489 } else {
3490 info!("State consistency check passed");
3491 }
3492
3493 if !panic {
3494 accumulator.set_inconsistent_state(is_inconsistent);
3495 }
3496 }
3497
3498 pub fn current_epoch_for_testing(&self) -> EpochId {
3499 self.epoch_store_for_testing().epoch()
3500 }
3501
3502 #[instrument(level = "error", skip_all)]
3503 pub fn checkpoint_all_dbs(
3504 &self,
3505 checkpoint_path: &Path,
3506 cur_epoch_store: &AuthorityPerEpochStore,
3507 checkpoint_indexes: bool,
3508 ) -> IotaResult {
3509 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3510 let current_epoch = cur_epoch_store.epoch();
3511
3512 if checkpoint_path.exists() {
3513 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3514 return Ok(());
3515 }
3516
3517 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3518 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3519
3520 if checkpoint_path_tmp.exists() {
3521 fs::remove_dir_all(&checkpoint_path_tmp)
3522 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3523 }
3524
3525 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3526 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3527
3528 self.checkpoint_store
3531 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3532
3533 self.get_reconfig_api()
3534 .try_checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3535
3536 self.committee_store
3537 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3538
3539 if checkpoint_indexes {
3540 if let Some(indexes) = self.indexes.as_ref() {
3541 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3542 }
3543 if let Some(rest_index) = self.rest_index.as_ref() {
3544 rest_index.checkpoint_db(&checkpoint_path_tmp.join("grpc_indexes"))?;
3545 }
3546 }
3547
3548 fs::rename(checkpoint_path_tmp, checkpoint_path)
3549 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3550 Ok(())
3551 }
3552
3553 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3560 self.epoch_store.load()
3561 }
3562
3563 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3565 self.load_epoch_store_one_call_per_task()
3566 }
3567
3568 pub fn clone_committee_for_testing(&self) -> Committee {
3569 Committee::clone(self.epoch_store_for_testing().committee())
3570 }
3571
3572 #[instrument(level = "trace", skip_all)]
3573 pub async fn try_get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3574 self.get_object_store()
3575 .try_get_object(object_id)
3576 .map_err(Into::into)
3577 }
3578
3579 pub async fn get_object(&self, object_id: &ObjectID) -> Option<Object> {
3581 self.try_get_object(object_id)
3582 .await
3583 .expect("storage access failed")
3584 }
3585
3586 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3587 Ok(self
3588 .try_get_object(&IOTA_SYSTEM_ADDRESS.into())
3589 .await?
3590 .expect("framework object should always exist")
3591 .compute_object_reference())
3592 }
3593
3594 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3596 self.get_object_cache_reader()
3597 .try_get_iota_system_state_object_unsafe()
3598 }
3599
3600 #[instrument(level = "trace", skip_all)]
3601 pub fn get_checkpoint_by_sequence_number(
3602 &self,
3603 sequence_number: CheckpointSequenceNumber,
3604 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3605 Ok(self
3606 .checkpoint_store
3607 .get_checkpoint_by_sequence_number(sequence_number)?)
3608 }
3609
3610 #[instrument(level = "trace", skip_all)]
3611 pub fn get_transaction_checkpoint_for_tests(
3612 &self,
3613 digest: &TransactionDigest,
3614 epoch_store: &AuthorityPerEpochStore,
3615 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3616 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3617 let Some(checkpoint) = checkpoint else {
3618 return Ok(None);
3619 };
3620 let checkpoint = self
3621 .checkpoint_store
3622 .get_checkpoint_by_sequence_number(checkpoint)?;
3623 Ok(checkpoint)
3624 }
3625
3626 #[instrument(level = "trace", skip_all)]
3627 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3628 Ok(
3629 match self
3630 .get_object_cache_reader()
3631 .try_get_latest_object_or_tombstone(*object_id)?
3632 {
3633 Some((_, ObjectOrTombstone::Object(object))) => {
3634 let layout = self.get_object_layout(&object)?;
3635 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3636 }
3637 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3638 None => ObjectRead::NotExists(*object_id),
3639 },
3640 )
3641 }
3642
3643 pub fn get_chain_identifier(&self) -> ChainIdentifier {
3645 self.chain_identifier
3646 }
3647
3648 #[instrument(level = "trace", skip_all)]
3649 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3650 where
3651 T: DeserializeOwned,
3652 {
3653 let o = self.get_object_read(object_id)?.into_object()?;
3654 if let Some(move_object) = o.data.try_as_move() {
3655 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3656 IotaError::ObjectDeserialization {
3657 error: format!("{e}"),
3658 }
3659 })?)
3660 } else {
3661 Err(IotaError::ObjectDeserialization {
3662 error: format!("Provided object : [{object_id}] is not a Move object."),
3663 })
3664 }
3665 }
3666
3667 #[instrument(level = "trace", skip_all)]
3673 pub fn get_past_object_read(
3674 &self,
3675 object_id: &ObjectID,
3676 version: SequenceNumber,
3677 ) -> IotaResult<PastObjectRead> {
3678 let Some(obj_ref) = self
3680 .get_object_cache_reader()
3681 .try_get_latest_object_ref_or_tombstone(*object_id)?
3682 else {
3683 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3684 };
3685
3686 if version > obj_ref.1 {
3687 return Ok(PastObjectRead::VersionTooHigh {
3688 object_id: *object_id,
3689 asked_version: version,
3690 latest_version: obj_ref.1,
3691 });
3692 }
3693
3694 if version < obj_ref.1 {
3695 return Ok(match self.read_object_at_version(object_id, version)? {
3697 Some((object, layout)) => {
3698 let obj_ref = object.compute_object_reference();
3699 PastObjectRead::VersionFound(obj_ref, object, layout)
3700 }
3701
3702 None => PastObjectRead::VersionNotFound(*object_id, version),
3703 });
3704 }
3705
3706 if !obj_ref.2.is_alive() {
3707 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3708 }
3709
3710 match self.read_object_at_version(object_id, obj_ref.1)? {
3711 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3712 None => {
3713 error!(
3714 "Object with in parent_entry is missing from object store, datastore is \
3715 inconsistent",
3716 );
3717 Err(UserInputError::ObjectNotFound {
3718 object_id: *object_id,
3719 version: Some(obj_ref.1),
3720 }
3721 .into())
3722 }
3723 }
3724 }
3725
3726 #[instrument(level = "trace", skip_all)]
3727 fn read_object_at_version(
3728 &self,
3729 object_id: &ObjectID,
3730 version: SequenceNumber,
3731 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3732 let Some(object) = self
3733 .get_object_cache_reader()
3734 .try_get_object_by_key(object_id, version)?
3735 else {
3736 return Ok(None);
3737 };
3738
3739 let layout = self.get_object_layout(&object)?;
3740 Ok(Some((object, layout)))
3741 }
3742
3743 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3744 let layout = object
3745 .data
3746 .try_as_move()
3747 .map(|object| {
3748 into_struct_layout(
3749 self.load_epoch_store_one_call_per_task()
3750 .executor()
3751 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3753 .get_annotated_layout(&object.type_().clone().into())?,
3754 )
3755 })
3756 .transpose()?;
3757 Ok(layout)
3758 }
3759
3760 fn get_owner_at_version(
3761 &self,
3762 object_id: &ObjectID,
3763 version: SequenceNumber,
3764 ) -> IotaResult<Owner> {
3765 self.get_object_store()
3766 .try_get_object_by_key(object_id, version)?
3767 .ok_or_else(|| {
3768 IotaError::from(UserInputError::ObjectNotFound {
3769 object_id: *object_id,
3770 version: Some(version),
3771 })
3772 })
3773 .map(|o| o.owner)
3774 }
3775
3776 #[instrument(level = "trace", skip_all)]
3777 pub fn get_owner_objects(
3778 &self,
3779 owner: IotaAddress,
3780 cursor: Option<ObjectID>,
3782 limit: usize,
3783 filter: Option<IotaObjectDataFilter>,
3784 ) -> IotaResult<Vec<ObjectInfo>> {
3785 if let Some(indexes) = &self.indexes {
3786 indexes.get_owner_objects(owner, cursor, limit, filter)
3787 } else {
3788 Err(IotaError::IndexStoreNotAvailable)
3789 }
3790 }
3791
3792 #[instrument(level = "trace", skip_all)]
3793 pub fn get_owned_coins_iterator_with_cursor(
3794 &self,
3795 owner: IotaAddress,
3796 cursor: (String, ObjectID),
3798 limit: usize,
3799 one_coin_type_only: bool,
3800 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3801 if let Some(indexes) = &self.indexes {
3802 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3803 } else {
3804 Err(IotaError::IndexStoreNotAvailable)
3805 }
3806 }
3807
3808 #[instrument(level = "trace", skip_all)]
3809 pub fn get_owner_objects_iterator(
3810 &self,
3811 owner: IotaAddress,
3812 cursor: Option<ObjectID>,
3814 filter: Option<IotaObjectDataFilter>,
3815 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3816 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3817 if let Some(indexes) = &self.indexes {
3818 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3819 } else {
3820 Err(IotaError::IndexStoreNotAvailable)
3821 }
3822 }
3823
3824 #[instrument(level = "trace", skip_all)]
3825 pub async fn get_move_objects<T>(
3826 &self,
3827 owner: IotaAddress,
3828 type_: MoveObjectType,
3829 ) -> IotaResult<Vec<T>>
3830 where
3831 T: DeserializeOwned,
3832 {
3833 let object_ids = self
3834 .get_owner_objects_iterator(owner, None, None)?
3835 .filter(|o| match &o.type_ {
3836 ObjectType::Struct(s) => &type_ == s,
3837 ObjectType::Package => false,
3838 })
3839 .map(|info| ObjectKey(info.object_id, info.version))
3840 .collect::<Vec<_>>();
3841 let mut move_objects = vec![];
3842
3843 let objects = self
3844 .get_object_store()
3845 .try_multi_get_objects_by_key(&object_ids)?;
3846
3847 for (o, id) in objects.into_iter().zip(object_ids) {
3848 let object = o.ok_or_else(|| {
3849 IotaError::from(UserInputError::ObjectNotFound {
3850 object_id: id.0,
3851 version: Some(id.1),
3852 })
3853 })?;
3854 let move_object = object.data.try_as_move().ok_or_else(|| {
3855 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3856 })?;
3857 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3858 IotaError::ObjectDeserialization {
3859 error: format!("{e}"),
3860 }
3861 })?);
3862 }
3863 Ok(move_objects)
3864 }
3865
3866 #[instrument(level = "trace", skip_all)]
3867 pub fn get_dynamic_fields(
3868 &self,
3869 owner: ObjectID,
3870 cursor: Option<ObjectID>,
3872 limit: usize,
3873 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3874 Ok(self
3875 .get_dynamic_fields_iterator(owner, cursor)?
3876 .take(limit)
3877 .collect::<Result<Vec<_>, _>>()?)
3878 }
3879
3880 fn get_dynamic_fields_iterator(
3881 &self,
3882 owner: ObjectID,
3883 cursor: Option<ObjectID>,
3885 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3886 {
3887 if let Some(indexes) = &self.indexes {
3888 indexes.get_dynamic_fields_iterator(owner, cursor)
3889 } else {
3890 Err(IotaError::IndexStoreNotAvailable)
3891 }
3892 }
3893
3894 #[instrument(level = "trace", skip_all)]
3895 pub fn get_dynamic_field_object_id(
3896 &self,
3897 owner: ObjectID,
3898 name_type: TypeTag,
3899 name_bcs_bytes: &[u8],
3900 ) -> IotaResult<Option<ObjectID>> {
3901 if let Some(indexes) = &self.indexes {
3902 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3903 } else {
3904 Err(IotaError::IndexStoreNotAvailable)
3905 }
3906 }
3907
3908 #[instrument(level = "trace", skip_all)]
3909 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3910 Ok(self.get_indexes()?.next_sequence_number())
3911 }
3912
3913 #[instrument(level = "trace", skip_all)]
3914 pub async fn get_executed_transaction_and_effects(
3915 &self,
3916 digest: TransactionDigest,
3917 kv_store: Arc<TransactionKeyValueStore>,
3918 ) -> IotaResult<(Transaction, TransactionEffects)> {
3919 let transaction = kv_store.get_tx(digest).await?;
3920 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3921 Ok((transaction, effects))
3922 }
3923
3924 #[instrument(level = "trace", skip_all)]
3925 pub fn multi_get_checkpoint_by_sequence_number(
3926 &self,
3927 sequence_numbers: &[CheckpointSequenceNumber],
3928 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3929 Ok(self
3930 .checkpoint_store
3931 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3932 }
3933
3934 #[instrument(level = "trace", skip_all)]
3935 pub fn get_transaction_events(
3936 &self,
3937 digest: &TransactionEventsDigest,
3938 ) -> IotaResult<TransactionEvents> {
3939 self.get_transaction_cache_reader()
3940 .try_get_events(digest)?
3941 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3942 }
3943
3944 pub fn get_transaction_input_objects(
3945 &self,
3946 effects: &TransactionEffects,
3947 ) -> anyhow::Result<Vec<Object>> {
3948 iota_types::storage::get_transaction_input_objects(self.get_object_store(), effects)
3949 .map_err(Into::into)
3950 }
3951
3952 pub fn get_transaction_output_objects(
3953 &self,
3954 effects: &TransactionEffects,
3955 ) -> anyhow::Result<Vec<Object>> {
3956 iota_types::storage::get_transaction_output_objects(self.get_object_store(), effects)
3957 .map_err(Into::into)
3958 }
3959
3960 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3961 match &self.indexes {
3962 Some(i) => Ok(i.clone()),
3963 None => Err(IotaError::UnsupportedFeature {
3964 error: "extended object indexing is not enabled on this server".into(),
3965 }),
3966 }
3967 }
3968
3969 pub async fn get_transactions_for_tests(
3970 self: &Arc<Self>,
3971 filter: Option<TransactionFilter>,
3972 cursor: Option<TransactionDigest>,
3973 limit: Option<usize>,
3974 reverse: bool,
3975 ) -> IotaResult<Vec<TransactionDigest>> {
3976 let metrics = KeyValueStoreMetrics::new_for_tests();
3977 let kv_store = Arc::new(TransactionKeyValueStore::new(
3978 "rocksdb",
3979 metrics,
3980 self.clone(),
3981 ));
3982 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3983 .await
3984 }
3985
3986 #[instrument(level = "trace", skip_all)]
3987 pub async fn get_transactions(
3988 &self,
3989 kv_store: &Arc<TransactionKeyValueStore>,
3990 filter: Option<TransactionFilter>,
3991 cursor: Option<TransactionDigest>,
3993 limit: Option<usize>,
3994 reverse: bool,
3995 ) -> IotaResult<Vec<TransactionDigest>> {
3996 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3997 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3998 let iter = checkpoint_contents.iter().map(|c| c.transaction);
3999 if reverse {
4000 let iter = iter
4001 .rev()
4002 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4003 .skip(usize::from(cursor.is_some()));
4004 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4005 } else {
4006 let iter = iter
4007 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
4008 .skip(usize::from(cursor.is_some()));
4009 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
4010 }
4011 }
4012 self.get_indexes()?
4013 .get_transactions(filter, cursor, limit, reverse)
4014 }
4015
4016 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
4017 &self.checkpoint_store
4018 }
4019
4020 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
4021 self.get_checkpoint_store()
4022 .get_highest_executed_checkpoint_seq_number()?
4023 .ok_or(IotaError::UserInput {
4024 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
4025 })
4026 }
4027
4028 #[cfg(msim)]
4029 pub fn get_highest_pruned_checkpoint_for_testing(
4030 &self,
4031 ) -> IotaResult<CheckpointSequenceNumber> {
4032 self.database_for_testing()
4033 .perpetual_tables
4034 .get_highest_pruned_checkpoint()
4035 .map(|c| c.unwrap_or(0))
4036 .map_err(Into::into)
4037 }
4038
4039 #[instrument(level = "trace", skip_all)]
4040 pub fn get_checkpoint_summary_by_sequence_number(
4041 &self,
4042 sequence_number: CheckpointSequenceNumber,
4043 ) -> IotaResult<CheckpointSummary> {
4044 let verified_checkpoint = self
4045 .get_checkpoint_store()
4046 .get_checkpoint_by_sequence_number(sequence_number)?;
4047 match verified_checkpoint {
4048 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4049 None => Err(IotaError::UserInput {
4050 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4051 }),
4052 }
4053 }
4054
4055 #[instrument(level = "trace", skip_all)]
4056 pub fn get_checkpoint_summary_by_digest(
4057 &self,
4058 digest: CheckpointDigest,
4059 ) -> IotaResult<CheckpointSummary> {
4060 let verified_checkpoint = self
4061 .get_checkpoint_store()
4062 .get_checkpoint_by_digest(&digest)?;
4063 match verified_checkpoint {
4064 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
4065 None => Err(IotaError::UserInput {
4066 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4067 }),
4068 }
4069 }
4070
4071 #[instrument(level = "trace", skip_all)]
4072 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
4073 if is_system_package(package_id) {
4074 return self.find_genesis_txn_digest();
4075 }
4076 Ok(self
4077 .get_object_read(&package_id)?
4078 .into_object()?
4079 .previous_transaction)
4080 }
4081
4082 #[instrument(level = "trace", skip_all)]
4083 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
4084 let summary = self
4085 .get_verified_checkpoint_by_sequence_number(0)?
4086 .into_message();
4087 let content = self.get_checkpoint_contents(summary.content_digest)?;
4088 let genesis_transaction = content.enumerate_transactions(&summary).next();
4089 Ok(genesis_transaction
4090 .ok_or(IotaError::UserInput {
4091 error: UserInputError::GenesisTransactionNotFound,
4092 })?
4093 .1
4094 .transaction)
4095 }
4096
4097 #[instrument(level = "trace", skip_all)]
4098 pub fn get_verified_checkpoint_by_sequence_number(
4099 &self,
4100 sequence_number: CheckpointSequenceNumber,
4101 ) -> IotaResult<VerifiedCheckpoint> {
4102 let verified_checkpoint = self
4103 .get_checkpoint_store()
4104 .get_checkpoint_by_sequence_number(sequence_number)?;
4105 match verified_checkpoint {
4106 Some(verified_checkpoint) => Ok(verified_checkpoint),
4107 None => Err(IotaError::UserInput {
4108 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4109 }),
4110 }
4111 }
4112
4113 #[instrument(level = "trace", skip_all)]
4114 pub fn get_verified_checkpoint_summary_by_digest(
4115 &self,
4116 digest: CheckpointDigest,
4117 ) -> IotaResult<VerifiedCheckpoint> {
4118 let verified_checkpoint = self
4119 .get_checkpoint_store()
4120 .get_checkpoint_by_digest(&digest)?;
4121 match verified_checkpoint {
4122 Some(verified_checkpoint) => Ok(verified_checkpoint),
4123 None => Err(IotaError::UserInput {
4124 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
4125 }),
4126 }
4127 }
4128
4129 #[instrument(level = "trace", skip_all)]
4130 pub fn get_checkpoint_contents(
4131 &self,
4132 digest: CheckpointContentsDigest,
4133 ) -> IotaResult<CheckpointContents> {
4134 self.get_checkpoint_store()
4135 .get_checkpoint_contents(&digest)?
4136 .ok_or(IotaError::UserInput {
4137 error: UserInputError::CheckpointContentsNotFound(digest),
4138 })
4139 }
4140
4141 #[instrument(level = "trace", skip_all)]
4142 pub fn get_checkpoint_contents_by_sequence_number(
4143 &self,
4144 sequence_number: CheckpointSequenceNumber,
4145 ) -> IotaResult<CheckpointContents> {
4146 let verified_checkpoint = self
4147 .get_checkpoint_store()
4148 .get_checkpoint_by_sequence_number(sequence_number)?;
4149 match verified_checkpoint {
4150 Some(verified_checkpoint) => {
4151 let content_digest = verified_checkpoint.into_inner().content_digest;
4152 self.get_checkpoint_contents(content_digest)
4153 }
4154 None => Err(IotaError::UserInput {
4155 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
4156 }),
4157 }
4158 }
4159
4160 #[instrument(level = "trace", skip_all)]
4161 pub async fn query_events(
4162 &self,
4163 kv_store: &Arc<TransactionKeyValueStore>,
4164 query: EventFilter,
4165 cursor: Option<EventID>,
4167 limit: usize,
4168 descending: bool,
4169 ) -> IotaResult<Vec<IotaEvent>> {
4170 let index_store = self.get_indexes()?;
4171
4172 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
4174 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
4175 IotaError::TransactionNotFound {
4176 digest: cursor.tx_digest,
4177 },
4178 )?;
4179 (tx_seq, cursor.event_seq as usize)
4180 } else if descending {
4181 (u64::MAX, usize::MAX)
4182 } else {
4183 (0, 0)
4184 };
4185
4186 let limit = limit + 1;
4187 let mut event_keys = match query {
4188 EventFilter::All(filters) => {
4189 if filters.is_empty() {
4190 index_store.all_events(tx_num, event_num, limit, descending)?
4191 } else {
4192 return Err(IotaError::UserInput {
4193 error: UserInputError::Unsupported(
4194 "This query type does not currently support filter combinations"
4195 .to_string(),
4196 ),
4197 });
4198 }
4199 }
4200 EventFilter::Transaction(digest) => {
4201 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4202 }
4203 EventFilter::MoveModule { package, module } => {
4204 let module_id = ModuleId::new(package.into(), module);
4205 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4206 }
4207 EventFilter::MoveEventType(struct_name) => index_store
4208 .events_by_move_event_struct_name(
4209 &struct_name,
4210 tx_num,
4211 event_num,
4212 limit,
4213 descending,
4214 )?,
4215 EventFilter::Sender(sender) => {
4216 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4217 }
4218 EventFilter::TimeRange {
4219 start_time,
4220 end_time,
4221 } => index_store
4222 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4223 EventFilter::MoveEventModule { package, module } => index_store
4224 .events_by_move_event_module(
4225 &ModuleId::new(package.into(), module),
4226 tx_num,
4227 event_num,
4228 limit,
4229 descending,
4230 )?,
4231 EventFilter::Package(_)
4233 | EventFilter::MoveEventField { .. }
4234 | EventFilter::Any(_)
4235 | EventFilter::And(_, _)
4236 | EventFilter::Or(_, _) => {
4237 return Err(IotaError::UserInput {
4238 error: UserInputError::Unsupported(
4239 "This query type is not supported by the full node.".to_string(),
4240 ),
4241 });
4242 }
4243 };
4244
4245 if cursor.is_some() {
4248 if !event_keys.is_empty() {
4249 event_keys.remove(0);
4250 }
4251 } else {
4252 event_keys.truncate(limit - 1);
4253 }
4254
4255 let transaction_digests = event_keys
4257 .iter()
4258 .map(|(_, digest, _, _)| *digest)
4259 .collect::<HashSet<_>>()
4260 .into_iter()
4261 .collect::<Vec<_>>();
4262
4263 let events = kv_store
4264 .multi_get_events_by_tx_digests(&transaction_digests)
4265 .await?;
4266
4267 let events_map: HashMap<_, _> =
4268 transaction_digests.iter().zip(events.into_iter()).collect();
4269
4270 let stored_events = event_keys
4271 .into_iter()
4272 .map(|k| {
4273 (
4274 k,
4275 events_map
4276 .get(&k.1)
4277 .expect("fetched digest is missing")
4278 .clone()
4279 .and_then(|e| e.data.get(k.2).cloned()),
4280 )
4281 })
4282 .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4283 event
4284 .map(|e| (e, tx_digest, event_seq, timestamp))
4285 .ok_or(IotaError::TransactionEventsNotFound { digest })
4286 })
4287 .collect::<Result<Vec<_>, _>>()?;
4288
4289 let epoch_store = self.load_epoch_store_one_call_per_task();
4290 let backing_store = self.get_backing_package_store().as_ref();
4291 let mut layout_resolver = epoch_store
4292 .executor()
4293 .type_layout_resolver(Box::new(backing_store));
4294 let mut events = vec![];
4295 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4296 events.push(IotaEvent::try_from(
4297 e.clone(),
4298 tx_digest,
4299 event_seq as u64,
4300 Some(timestamp),
4301 layout_resolver.get_annotated_layout(&e.type_)?,
4302 )?)
4303 }
4304 Ok(events)
4305 }
4306
4307 pub async fn insert_genesis_object(&self, object: Object) {
4308 self.get_reconfig_api()
4309 .try_insert_genesis_object(object)
4310 .expect("Cannot insert genesis object")
4311 }
4312
4313 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4314 futures::future::join_all(
4315 objects
4316 .iter()
4317 .map(|o| self.insert_genesis_object(o.clone())),
4318 )
4319 .await;
4320 }
4321
4322 #[instrument(level = "trace", skip_all)]
4324 pub fn get_transaction_status(
4325 &self,
4326 transaction_digest: &TransactionDigest,
4327 epoch_store: &Arc<AuthorityPerEpochStore>,
4328 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4329 if let Some(effects) =
4331 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4332 {
4333 if let Some(transaction) = self
4334 .get_transaction_cache_reader()
4335 .try_get_transaction_block(transaction_digest)?
4336 {
4337 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4338 let events = if let Some(digest) = effects.events_digest() {
4339 self.get_transaction_events(digest)?
4340 } else {
4341 TransactionEvents::default()
4342 };
4343 return Ok(Some((
4344 (*transaction).clone().into_message(),
4345 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4346 )));
4347 } else {
4348 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4353 }
4354 }
4355 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4356 self.metrics.tx_already_processed.inc();
4357 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4358 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4359 } else {
4360 Ok(None)
4361 }
4362 }
4363
4364 #[instrument(level = "trace", skip_all)]
4368 pub fn get_signed_effects_and_maybe_resign(
4369 &self,
4370 transaction_digest: &TransactionDigest,
4371 epoch_store: &Arc<AuthorityPerEpochStore>,
4372 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4373 let effects = self
4374 .get_transaction_cache_reader()
4375 .try_get_executed_effects(transaction_digest)?;
4376 match effects {
4377 Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4378 None => Ok(None),
4379 }
4380 }
4381
4382 #[instrument(level = "trace", skip_all)]
4383 pub(crate) fn sign_effects(
4384 &self,
4385 effects: TransactionEffects,
4386 epoch_store: &Arc<AuthorityPerEpochStore>,
4387 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4388 let tx_digest = *effects.transaction_digest();
4389 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4390 Some(sig) if sig.epoch == epoch_store.epoch() => {
4391 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4392 }
4393 _ => {
4394 debug!(
4417 ?tx_digest,
4418 epoch=?epoch_store.epoch(),
4419 "Re-signing the effects with the current epoch"
4420 );
4421
4422 let sig = AuthoritySignInfo::new(
4423 epoch_store.epoch(),
4424 &effects,
4425 Intent::iota_app(IntentScope::TransactionEffects),
4426 self.name,
4427 &*self.secret,
4428 );
4429
4430 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4431
4432 epoch_store.insert_effects_digest_and_signature(
4433 &tx_digest,
4434 effects.digest(),
4435 &sig,
4436 )?;
4437
4438 effects
4439 }
4440 };
4441
4442 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4443 signed_effects,
4444 ))
4445 }
4446
4447 #[instrument(level = "trace", skip_all)]
4449 fn fullnode_only_get_tx_coins_for_indexing(
4450 &self,
4451 inner_temporary_store: &InnerTemporaryStore,
4452 epoch_store: &Arc<AuthorityPerEpochStore>,
4453 ) -> Option<TxCoins> {
4454 if self.indexes.is_none() || self.is_committee_validator(epoch_store) {
4455 return None;
4456 }
4457 let written_coin_objects = inner_temporary_store
4458 .written
4459 .iter()
4460 .filter_map(|(k, v)| {
4461 if v.is_coin() {
4462 Some((*k, v.clone()))
4463 } else {
4464 None
4465 }
4466 })
4467 .collect();
4468 let input_coin_objects = inner_temporary_store
4469 .input_objects
4470 .iter()
4471 .filter_map(|(k, v)| {
4472 if v.is_coin() {
4473 Some((*k, v.clone()))
4474 } else {
4475 None
4476 }
4477 })
4478 .collect::<ObjectMap>();
4479 Some((input_coin_objects, written_coin_objects))
4480 }
4481
4482 #[instrument(level = "trace", skip_all)]
4494 pub async fn get_transaction_lock(
4495 &self,
4496 object_ref: &ObjectRef,
4497 epoch_store: &AuthorityPerEpochStore,
4498 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4499 let lock_info = self
4500 .get_object_cache_reader()
4501 .try_get_lock(*object_ref, epoch_store)?;
4502 let lock_info = match lock_info {
4503 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4504 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4505 provided_obj_ref: *object_ref,
4506 current_version: locked_ref.1,
4507 }
4508 .into());
4509 }
4510 ObjectLockStatus::Initialized => {
4511 return Ok(None);
4512 }
4513 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4514 };
4515
4516 epoch_store.get_signed_transaction(&lock_info)
4517 }
4518
4519 pub async fn try_get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4520 self.get_object_cache_reader().try_get_objects(objects)
4521 }
4522
4523 pub async fn get_objects(&self, objects: &[ObjectID]) -> Vec<Option<Object>> {
4525 self.try_get_objects(objects)
4526 .await
4527 .expect("storage access failed")
4528 }
4529
4530 pub async fn try_get_object_or_tombstone(
4531 &self,
4532 object_id: ObjectID,
4533 ) -> IotaResult<Option<ObjectRef>> {
4534 self.get_object_cache_reader()
4535 .try_get_latest_object_ref_or_tombstone(object_id)
4536 }
4537
4538 pub async fn get_object_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
4540 self.try_get_object_or_tombstone(object_id)
4541 .await
4542 .expect("storage access failed")
4543 }
4544
4545 pub fn set_override_protocol_upgrade_buffer_stake(
4555 &self,
4556 expected_epoch: EpochId,
4557 buffer_stake_bps: u64,
4558 ) -> IotaResult {
4559 let epoch_store = self.load_epoch_store_one_call_per_task();
4560 let actual_epoch = epoch_store.epoch();
4561 if actual_epoch != expected_epoch {
4562 return Err(IotaError::WrongEpoch {
4563 expected_epoch,
4564 actual_epoch,
4565 });
4566 }
4567
4568 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4569 }
4570
4571 pub fn clear_override_protocol_upgrade_buffer_stake(
4572 &self,
4573 expected_epoch: EpochId,
4574 ) -> IotaResult {
4575 let epoch_store = self.load_epoch_store_one_call_per_task();
4576 let actual_epoch = epoch_store.epoch();
4577 if actual_epoch != expected_epoch {
4578 return Err(IotaError::WrongEpoch {
4579 expected_epoch,
4580 actual_epoch,
4581 });
4582 }
4583
4584 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4585 }
4586
4587 pub async fn get_available_system_packages(
4591 &self,
4592 binary_config: &BinaryConfig,
4593 ) -> Vec<ObjectRef> {
4594 let mut results = vec![];
4595
4596 let system_packages = BuiltInFramework::iter_system_packages();
4597
4598 #[cfg(msim)]
4600 let extra_packages = framework_injection::get_extra_packages(self.name);
4601 #[cfg(msim)]
4602 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4603
4604 for system_package in system_packages {
4605 let modules = system_package.modules().to_vec();
4606 #[cfg(msim)]
4608 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4609 .unwrap_or(modules);
4610
4611 let Some(obj_ref) = iota_framework::compare_system_package(
4612 &self.get_object_store(),
4613 &system_package.id,
4614 &modules,
4615 system_package.dependencies.to_vec(),
4616 binary_config,
4617 )
4618 .await
4619 else {
4620 return vec![];
4621 };
4622 results.push(obj_ref);
4623 }
4624
4625 results
4626 }
4627
4628 async fn get_system_package_bytes(
4645 &self,
4646 system_packages: Vec<ObjectRef>,
4647 binary_config: &BinaryConfig,
4648 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4649 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4650 let objects = self.get_objects(&ids).await;
4651
4652 let mut res = Vec::with_capacity(system_packages.len());
4653 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4654 let prev_transaction = match object {
4655 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4656 info!("Framework {} does not need updating", system_package_ref.0);
4658 continue;
4659 }
4660
4661 Some(cur_object) => cur_object.previous_transaction,
4662 None => TransactionDigest::genesis_marker(),
4663 };
4664
4665 #[cfg(msim)]
4666 let SystemPackage {
4667 id: _,
4668 bytes,
4669 dependencies,
4670 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4671 .unwrap_or_else(|| {
4672 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4673 });
4674
4675 #[cfg(not(msim))]
4676 let SystemPackage {
4677 id: _,
4678 bytes,
4679 dependencies,
4680 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4681
4682 let modules: Vec<_> = bytes
4683 .iter()
4684 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4685 .collect();
4686
4687 let new_object = Object::new_system_package(
4688 &modules,
4689 system_package_ref.1,
4690 dependencies.clone(),
4691 prev_transaction,
4692 );
4693
4694 let new_ref = new_object.compute_object_reference();
4695 if new_ref != system_package_ref {
4696 error!(
4697 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4698 );
4699 return None;
4700 }
4701
4702 res.push((system_package_ref.1, bytes, dependencies));
4703 }
4704
4705 Some(res)
4706 }
4707
4708 fn is_protocol_version_supported_v1(
4712 proposed_protocol_version: ProtocolVersion,
4713 committee: &Committee,
4714 capabilities: Vec<AuthorityCapabilitiesV1>,
4715 mut buffer_stake_bps: u64,
4716 ) -> Option<(ProtocolVersion, Digest, Vec<ObjectRef>)> {
4717 if buffer_stake_bps > 10000 {
4718 warn!("clamping buffer_stake_bps to 10000");
4719 buffer_stake_bps = 10000;
4720 }
4721
4722 let mut desired_upgrades: Vec<_> = capabilities
4725 .into_iter()
4726 .filter_map(|mut cap| {
4727 if cap.available_system_packages.is_empty() {
4729 return None;
4730 }
4731
4732 cap.available_system_packages.sort();
4733
4734 info!(
4735 "validator {:?} supports {:?} with system packages: {:?}",
4736 cap.authority.concise(),
4737 cap.supported_protocol_versions,
4738 cap.available_system_packages,
4739 );
4740
4741 cap.supported_protocol_versions
4745 .get_version_digest(proposed_protocol_version)
4746 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4747 })
4748 .collect();
4749
4750 desired_upgrades.sort();
4753 desired_upgrades
4754 .into_iter()
4755 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4756 .into_iter()
4757 .find_map(|((digest, packages), group)| {
4758 assert!(!packages.is_empty());
4760
4761 let mut stake_aggregator: StakeAggregator<(), true> =
4762 StakeAggregator::new(Arc::new(committee.clone()));
4763
4764 for (_, _, authority) in group {
4765 stake_aggregator.insert_generic(authority, ());
4766 }
4767
4768 let total_votes = stake_aggregator.total_votes();
4769 let quorum_threshold = committee.quorum_threshold();
4770 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
4771
4772 info!(
4773 protocol_config_digest = ?digest,
4774 ?total_votes,
4775 ?quorum_threshold,
4776 ?buffer_stake_bps,
4777 ?effective_threshold,
4778 ?proposed_protocol_version,
4779 ?packages,
4780 "support for upgrade"
4781 );
4782
4783 let has_support = total_votes >= effective_threshold;
4784 has_support.then_some((proposed_protocol_version, digest, packages))
4785 })
4786 }
4787
4788 fn choose_protocol_version_and_system_packages_v1(
4792 current_protocol_version: ProtocolVersion,
4793 current_protocol_digest: Digest,
4794 committee: &Committee,
4795 capabilities: Vec<AuthorityCapabilitiesV1>,
4796 buffer_stake_bps: u64,
4797 ) -> (ProtocolVersion, Digest, Vec<ObjectRef>) {
4798 let mut next_protocol_version = current_protocol_version;
4799 let mut system_packages = vec![];
4800 let mut protocol_version_digest = current_protocol_digest;
4801
4802 while let Some((version, digest, packages)) = Self::is_protocol_version_supported_v1(
4806 next_protocol_version + 1,
4807 committee,
4808 capabilities.clone(),
4809 buffer_stake_bps,
4810 ) {
4811 next_protocol_version = version;
4812 protocol_version_digest = digest;
4813 system_packages = packages;
4814 }
4815
4816 (
4817 next_protocol_version,
4818 protocol_version_digest,
4819 system_packages,
4820 )
4821 }
4822
4823 fn get_validators_supporting_protocol_version(
4828 target_protocol_version: ProtocolVersion,
4829 target_digest: Digest,
4830 active_validators: &[AuthorityPublicKey],
4831 capabilities: &[AuthorityCapabilitiesV1],
4832 ) -> Vec<u64> {
4833 let mut eligible_validators = Vec::new();
4834
4835 for capability in capabilities {
4836 if let Some(digest) = capability
4838 .supported_protocol_versions
4839 .get_version_digest(target_protocol_version)
4840 {
4841 if digest == target_digest {
4842 if let Some(index) = active_validators
4844 .iter()
4845 .position(|name| AuthorityName::from(name) == capability.authority)
4846 {
4847 eligible_validators.push(index as u64);
4848 }
4849 }
4850 }
4851 }
4852
4853 eligible_validators.sort();
4855 eligible_validators
4856 }
4857
4858 fn calculate_eligible_validators_weight(
4863 eligible_validator_indices: &[u64],
4864 active_validators: &[AuthorityPublicKey],
4865 committee: &Committee,
4866 ) -> u64 {
4867 let mut total_weight = 0u64;
4868
4869 for &index in eligible_validator_indices {
4870 let authority_pubkey = &active_validators[index as usize];
4871 if let Some((_, weight)) = committee
4873 .members()
4874 .find(|(name, _)| *name == AuthorityName::from(authority_pubkey))
4875 {
4876 total_weight += weight;
4877 }
4878 }
4879
4880 total_weight
4881 }
4882
4883 #[instrument(level = "debug", skip_all)]
4884 fn create_authenticator_state_tx(
4885 &self,
4886 epoch_store: &Arc<AuthorityPerEpochStore>,
4887 ) -> Option<EndOfEpochTransactionKind> {
4888 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4889 info!("authenticator state transactions not enabled");
4890 return None;
4891 }
4892
4893 let authenticator_state_exists = epoch_store.authenticator_state_exists();
4894 let tx = if authenticator_state_exists {
4895 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4896 let min_epoch =
4897 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4898 let authenticator_obj_initial_shared_version = epoch_store
4899 .epoch_start_config()
4900 .authenticator_obj_initial_shared_version()
4901 .expect("initial version must exist");
4902
4903 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4904 min_epoch,
4905 authenticator_obj_initial_shared_version,
4906 );
4907
4908 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4909
4910 tx
4911 } else {
4912 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4913 info!("Creating AuthenticatorStateCreate tx");
4914 tx
4915 };
4916 Some(tx)
4917 }
4918
4919 #[instrument(level = "error", skip_all)]
4932 pub async fn create_and_execute_advance_epoch_tx(
4933 &self,
4934 epoch_store: &Arc<AuthorityPerEpochStore>,
4935 gas_cost_summary: &GasCostSummary,
4936 checkpoint: CheckpointSequenceNumber,
4937 epoch_start_timestamp_ms: CheckpointTimestamp,
4938 scores: Vec<u64>,
4939 ) -> anyhow::Result<(
4940 IotaSystemState,
4941 Option<SystemEpochInfoEvent>,
4942 TransactionEffects,
4943 )> {
4944 let mut txns = Vec::new();
4945
4946 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4947 txns.push(tx);
4948 }
4949
4950 let next_epoch = epoch_store.epoch() + 1;
4951
4952 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4953 let authority_capabilities = epoch_store
4954 .get_capabilities_v1()
4955 .expect("read capabilities from db cannot fail");
4956 let (next_epoch_protocol_version, next_epoch_protocol_digest, next_epoch_system_packages) =
4957 Self::choose_protocol_version_and_system_packages_v1(
4958 epoch_store.protocol_version(),
4959 SupportedProtocolVersionsWithHashes::protocol_config_digest(
4960 epoch_store.protocol_config(),
4961 ),
4962 epoch_store.committee(),
4963 authority_capabilities.clone(),
4964 buffer_stake_bps,
4965 );
4966
4967 let config = epoch_store.protocol_config();
4971 let binary_config = to_binary_config(config);
4972 let Some(next_epoch_system_package_bytes) = self
4973 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4974 .await
4975 else {
4976 error!(
4977 "upgraded system packages {:?} are not locally available, cannot create \
4978 ChangeEpochTx. validator binary must be upgraded to the correct version!",
4979 next_epoch_system_packages
4980 );
4981 bail!("missing system packages: cannot form ChangeEpochTx");
4991 };
4992
4993 if config.select_committee_from_eligible_validators() {
4996 let active_validators = epoch_store.epoch_start_state().get_active_validators();
4998
4999 let mut eligible_active_validators = (0..active_validators.len() as u64).collect();
5000
5001 if config.select_committee_supporting_next_epoch_version() {
5005 eligible_active_validators = Self::get_validators_supporting_protocol_version(
5006 next_epoch_protocol_version,
5007 next_epoch_protocol_digest,
5008 &active_validators,
5009 &authority_capabilities,
5010 );
5011
5012 let eligible_validators_weight = Self::calculate_eligible_validators_weight(
5014 &eligible_active_validators,
5015 &active_validators,
5016 epoch_store.committee(),
5017 );
5018
5019 let committee = epoch_store.committee();
5023 let effective_threshold = committee.effective_threshold(buffer_stake_bps);
5024
5025 if eligible_validators_weight < effective_threshold {
5026 error!(
5027 "Eligible validators weight {eligible_validators_weight} is less than effective threshold {effective_threshold}. \
5028 This could indicate a bug in validator selection logic or inconsistency with protocol version decision.",
5029 );
5030 eligible_active_validators = (0..active_validators.len() as u64).collect();
5033 }
5034 }
5035
5036 if config.pass_validator_scores_to_advance_epoch() {
5039 txns.push(EndOfEpochTransactionKind::new_change_epoch_v4(
5040 next_epoch,
5041 next_epoch_protocol_version,
5042 gas_cost_summary.storage_cost,
5043 gas_cost_summary.computation_cost,
5044 gas_cost_summary.computation_cost_burned,
5045 gas_cost_summary.storage_rebate,
5046 gas_cost_summary.non_refundable_storage_fee,
5047 epoch_start_timestamp_ms,
5048 next_epoch_system_package_bytes,
5049 eligible_active_validators,
5050 scores,
5051 config.adjust_rewards_by_score(),
5052 ));
5053 } else {
5054 txns.push(EndOfEpochTransactionKind::new_change_epoch_v3(
5055 next_epoch,
5056 next_epoch_protocol_version,
5057 gas_cost_summary.storage_cost,
5058 gas_cost_summary.computation_cost,
5059 gas_cost_summary.computation_cost_burned,
5060 gas_cost_summary.storage_rebate,
5061 gas_cost_summary.non_refundable_storage_fee,
5062 epoch_start_timestamp_ms,
5063 next_epoch_system_package_bytes,
5064 eligible_active_validators,
5065 ));
5066 }
5067 } else if config.protocol_defined_base_fee()
5068 && config.max_committee_members_count_as_option().is_some()
5069 {
5070 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
5071 next_epoch,
5072 next_epoch_protocol_version,
5073 gas_cost_summary.storage_cost,
5074 gas_cost_summary.computation_cost,
5075 gas_cost_summary.computation_cost_burned,
5076 gas_cost_summary.storage_rebate,
5077 gas_cost_summary.non_refundable_storage_fee,
5078 epoch_start_timestamp_ms,
5079 next_epoch_system_package_bytes,
5080 ));
5081 } else {
5082 txns.push(EndOfEpochTransactionKind::new_change_epoch(
5083 next_epoch,
5084 next_epoch_protocol_version,
5085 gas_cost_summary.storage_cost,
5086 gas_cost_summary.computation_cost,
5087 gas_cost_summary.storage_rebate,
5088 gas_cost_summary.non_refundable_storage_fee,
5089 epoch_start_timestamp_ms,
5090 next_epoch_system_package_bytes,
5091 ));
5092 }
5093
5094 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
5095
5096 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
5097 tx.clone(),
5098 epoch_store.epoch(),
5099 checkpoint,
5100 );
5101
5102 let tx_digest = executable_tx.digest();
5103
5104 info!(
5105 ?next_epoch,
5106 ?next_epoch_protocol_version,
5107 ?next_epoch_system_packages,
5108 computation_cost=?gas_cost_summary.computation_cost,
5109 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
5110 storage_cost=?gas_cost_summary.storage_cost,
5111 storage_rebate=?gas_cost_summary.storage_rebate,
5112 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
5113 ?tx_digest,
5114 "Creating advance epoch transaction"
5115 );
5116
5117 fail_point_async!("change_epoch_tx_delay");
5118 let tx_lock = epoch_store.acquire_tx_lock(tx_digest);
5119
5120 if self
5124 .get_transaction_cache_reader()
5125 .try_is_tx_already_executed(tx_digest)?
5126 {
5127 warn!("change epoch tx has already been executed via state sync");
5128 bail!("change epoch tx has already been executed via state sync",);
5129 }
5130
5131 let execution_guard = self.execution_lock_for_executable_transaction(&executable_tx)?;
5132
5133 epoch_store.assign_shared_object_versions_idempotent(
5137 self.get_object_cache_reader().as_ref(),
5138 std::slice::from_ref(&executable_tx),
5139 )?;
5140
5141 let (input_objects, _, _) =
5142 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
5143
5144 let (temporary_store, effects, _execution_error_opt) = self.prepare_certificate(
5145 &execution_guard,
5146 &executable_tx,
5147 input_objects,
5148 None,
5149 None,
5150 epoch_store,
5151 )?;
5152 let system_obj = get_iota_system_state(&temporary_store.written)
5153 .expect("change epoch tx must write to system object");
5154 let system_epoch_info_event = temporary_store
5156 .events
5157 .data
5158 .into_iter()
5159 .find(|event| event.is_system_epoch_info_event())
5160 .map(SystemEpochInfoEvent::from);
5161 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
5164
5165 self.get_state_sync_store()
5169 .try_insert_transaction_and_effects(&tx, &effects)
5170 .map_err(|err| {
5171 let err: anyhow::Error = err.into();
5172 err
5173 })?;
5174
5175 info!(
5176 "Effects summary of the change epoch transaction: {:?}",
5177 effects.summary_for_debug()
5178 );
5179 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
5180 assert!(effects.status().is_ok());
5182 Ok((system_obj, system_epoch_info_event, effects))
5183 }
5184
5185 #[instrument(level = "error", skip_all)]
5189 async fn revert_uncommitted_epoch_transactions(
5190 &self,
5191 epoch_store: &AuthorityPerEpochStore,
5192 ) -> IotaResult {
5193 {
5194 let state = epoch_store.get_reconfig_state_write_lock_guard();
5195 if state.should_accept_user_certs() {
5196 epoch_store.close_user_certs(state);
5205 }
5206 }
5208 let pending_certificates = epoch_store.pending_consensus_certificates();
5209 info!(
5210 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
5211 pending_certificates.len(),
5212 pending_certificates,
5213 );
5214 for digest in pending_certificates {
5215 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
5216 info!(
5217 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
5218 digest
5219 );
5220 continue;
5221 }
5222 info!("Reverting {:?} at the end of epoch", digest);
5223 epoch_store.revert_executed_transaction(&digest)?;
5224 self.get_reconfig_api().try_revert_state_update(&digest)?;
5225 }
5226 info!("All uncommitted local transactions reverted");
5227 Ok(())
5228 }
5229
5230 #[instrument(level = "error", skip_all)]
5231 async fn reopen_epoch_db(
5232 &self,
5233 cur_epoch_store: &AuthorityPerEpochStore,
5234 new_committee: Committee,
5235 epoch_start_configuration: EpochStartConfiguration,
5236 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
5237 epoch_last_checkpoint: CheckpointSequenceNumber,
5238 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
5239 let new_epoch = new_committee.epoch;
5240 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
5241 assert_eq!(
5242 epoch_start_configuration.epoch_start_state().epoch(),
5243 new_committee.epoch
5244 );
5245 fail_point!("before-open-new-epoch-store");
5246 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
5247 self.name,
5248 new_committee,
5249 epoch_start_configuration,
5250 self.get_backing_package_store().clone(),
5251 self.get_object_store().clone(),
5252 expensive_safety_check_config,
5253 epoch_last_checkpoint,
5254 )?;
5255 self.epoch_store.store(new_epoch_store.clone());
5256 Ok(new_epoch_store)
5257 }
5258
5259 fn check_move_account(
5262 &self,
5263 auth_account_object_id: ObjectID,
5264 auth_account_object_seq_number: Option<SequenceNumber>,
5265 auth_account_object_digest: Option<ObjectDigest>,
5266 account_object: ObjectReadResult,
5267 signer: &IotaAddress,
5268 ) -> IotaResult<AuthenticatorFunctionRefForExecution> {
5269 let account_object = match account_object.object {
5270 ObjectReadResultKind::Object(object) => Ok(object),
5271 ObjectReadResultKind::DeletedSharedObject(version, digest) => {
5272 Err(UserInputError::AccountObjectDeleted {
5273 account_id: account_object.id(),
5274 account_version: version,
5275 transaction_digest: digest,
5276 })
5277 }
5278 ObjectReadResultKind::CancelledTransactionSharedObject(version) => {
5281 Err(UserInputError::AccountObjectInCanceledTransaction {
5282 account_id: account_object.id(),
5283 account_version: version,
5284 })
5285 }
5286 }?;
5287
5288 let account_object_addr = IotaAddress::from(auth_account_object_id);
5289
5290 fp_ensure!(
5291 signer == &account_object_addr,
5292 UserInputError::IncorrectUserSignature {
5293 error: format!("Move authenticator is trying to unlock {account_object_addr:?}, but given signer address is {signer:?}")
5294 }
5295 .into()
5296 );
5297
5298 fp_ensure!(
5299 account_object.is_shared() || account_object.is_immutable(),
5300 UserInputError::AccountObjectNotSupported {
5301 object_id: auth_account_object_id
5302 }
5303 .into()
5304 );
5305
5306 let auth_account_object_seq_number =
5307 if let Some(auth_account_object_seq_number) = auth_account_object_seq_number {
5308 let account_object_version = account_object.version();
5309
5310 fp_ensure!(
5311 account_object_version == auth_account_object_seq_number,
5312 UserInputError::AccountObjectVersionMismatch {
5313 object_id: auth_account_object_id,
5314 expected_version: auth_account_object_seq_number,
5315 actual_version: account_object_version,
5316 }
5317 .into()
5318 );
5319
5320 auth_account_object_seq_number
5321 } else {
5322 account_object.version()
5323 };
5324
5325 if let Some(auth_account_object_digest) = auth_account_object_digest {
5326 let expected_digest = account_object.digest();
5327 fp_ensure!(
5328 expected_digest == auth_account_object_digest,
5329 UserInputError::InvalidAccountObjectDigest {
5330 object_id: auth_account_object_id,
5331 expected_digest,
5332 actual_digest: auth_account_object_digest,
5333 }
5334 .into()
5335 );
5336 }
5337
5338 let authenticator_function_ref_field_id = dynamic_field::derive_dynamic_field_id(
5339 auth_account_object_id,
5340 &AuthenticatorFunctionRefV1Key::tag().into(),
5341 &AuthenticatorFunctionRefV1Key::default().to_bcs_bytes(),
5342 )
5343 .map_err(|_| UserInputError::UnableToGetMoveAuthenticatorId {
5344 account_object_id: auth_account_object_id,
5345 })?;
5346
5347 let authenticator_function_ref_field = self
5348 .get_object_cache_reader()
5349 .try_find_object_lt_or_eq_version(
5350 authenticator_function_ref_field_id,
5351 auth_account_object_seq_number,
5352 )?;
5353
5354 if let Some(authenticator_function_ref_field_obj) = authenticator_function_ref_field {
5355 let field_move_object = authenticator_function_ref_field_obj
5356 .data
5357 .try_as_move()
5358 .expect("dynamic field should never be a package object");
5359
5360 let field: Field<AuthenticatorFunctionRefV1Key, AuthenticatorFunctionRefV1> =
5361 field_move_object.to_rust().ok_or(
5362 UserInputError::InvalidAuthenticatorFunctionRefField {
5363 account_object_id: auth_account_object_id,
5364 },
5365 )?;
5366
5367 Ok(AuthenticatorFunctionRefForExecution::new_v1(
5368 field.value,
5369 authenticator_function_ref_field_obj.compute_object_reference(),
5370 authenticator_function_ref_field_obj.owner,
5371 authenticator_function_ref_field_obj.storage_rebate,
5372 authenticator_function_ref_field_obj.previous_transaction,
5373 ))
5374 } else {
5375 Err(UserInputError::MoveAuthenticatorNotFound {
5376 authenticator_function_ref_id: authenticator_function_ref_field_id,
5377 account_object_id: auth_account_object_id,
5378 account_object_version: auth_account_object_seq_number,
5379 }
5380 .into())
5381 }
5382 }
5383
5384 fn read_objects_for_signing(
5385 &self,
5386 transaction: &VerifiedTransaction,
5387 epoch: u64,
5388 ) -> IotaResult<(
5389 InputObjects,
5390 ReceivingObjects,
5391 Option<InputObjects>,
5392 Option<ObjectReadResult>,
5393 )> {
5394 let (input_objects, tx_receiving_objects) = self.input_loader.read_objects_for_signing(
5395 Some(transaction.digest()),
5396 &transaction.collect_all_input_object_kind_for_reading()?,
5397 &transaction.data().transaction_data().receiving_objects(),
5398 epoch,
5399 )?;
5400
5401 transaction
5402 .split_input_objects_into_groups_for_reading(input_objects)
5403 .map(|(tx_input_objects, auth_input_objects, account_object)| {
5404 (
5405 tx_input_objects,
5406 tx_receiving_objects,
5407 auth_input_objects,
5408 account_object,
5409 )
5410 })
5411 }
5412
5413 fn check_transaction_inputs_for_signing(
5414 &self,
5415 protocol_config: &ProtocolConfig,
5416 reference_gas_price: u64,
5417 tx_data: &TransactionData,
5418 tx_input_objects: InputObjects,
5419 tx_receiving_objects: &ReceivingObjects,
5420 move_authenticator: Option<&MoveAuthenticator>,
5421 auth_input_objects: Option<InputObjects>,
5422 account_object: Option<ObjectReadResult>,
5423 ) -> IotaResult<(
5424 IotaGasStatus,
5425 CheckedInputObjects,
5426 Option<CheckedInputObjects>,
5427 Option<AuthenticatorFunctionRef>,
5428 )> {
5429 let (
5430 auth_checked_input_objects_union,
5431 authenticator_function_ref,
5432 authenticator_gas_budget,
5433 ) = if let Some(move_authenticator) = move_authenticator {
5434 let auth_input_objects =
5435 auth_input_objects.expect("MoveAuthenticator input objects must be provided");
5436 let account_object = account_object.expect("Move account object must be provided");
5437
5438 let (
5440 auth_account_object_id,
5441 auth_account_object_seq_number,
5442 auth_account_object_digest,
5443 ) = move_authenticator.object_to_authenticate_components()?;
5444
5445 let AuthenticatorFunctionRefForExecution {
5447 authenticator_function_ref,
5448 ..
5449 } = self.check_move_account(
5450 auth_account_object_id,
5451 auth_account_object_seq_number,
5452 auth_account_object_digest,
5453 account_object,
5454 &tx_data.sender(),
5455 )?;
5456
5457 let auth_checked_input_objects =
5459 iota_transaction_checks::check_move_authenticator_input_for_signing(
5460 auth_input_objects,
5461 )?;
5462
5463 let authenticator_gas_budget = protocol_config.max_auth_gas();
5466
5467 (
5468 Some(auth_checked_input_objects),
5469 Some(authenticator_function_ref),
5470 authenticator_gas_budget,
5471 )
5472 } else {
5473 (None, None, 0)
5474 };
5475
5476 let (gas_status, tx_checked_input_objects) =
5478 iota_transaction_checks::check_transaction_input(
5479 protocol_config,
5480 reference_gas_price,
5481 tx_data,
5482 tx_input_objects,
5483 tx_receiving_objects,
5484 &self.metrics.bytecode_verifier_metrics,
5485 &self.config.verifier_signing_config,
5486 authenticator_gas_budget,
5487 )?;
5488
5489 Ok((
5490 gas_status,
5491 tx_checked_input_objects,
5492 auth_checked_input_objects_union,
5493 authenticator_function_ref,
5494 ))
5495 }
5496
5497 #[cfg(test)]
5498 pub(crate) fn iter_live_object_set_for_testing(
5499 &self,
5500 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
5501 self.get_accumulator_store()
5502 .iter_cached_live_object_set_for_testing()
5503 }
5504
5505 #[cfg(test)]
5506 pub(crate) fn shutdown_execution_for_test(&self) {
5507 self.tx_execution_shutdown
5508 .lock()
5509 .take()
5510 .unwrap()
5511 .send(())
5512 .unwrap();
5513 }
5514
5515 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) {
5518 self.get_reconfig_api().bulk_insert_genesis_objects(objects);
5519 self.get_object_cache_reader()
5520 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
5521 self.get_reconfig_api()
5522 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
5523 }
5524}
5525
5526pub struct RandomnessRoundReceiver {
5527 authority_state: Arc<AuthorityState>,
5528 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5529}
5530
5531impl RandomnessRoundReceiver {
5532 pub fn spawn(
5533 authority_state: Arc<AuthorityState>,
5534 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
5535 ) -> JoinHandle<()> {
5536 let rrr = RandomnessRoundReceiver {
5537 authority_state,
5538 randomness_rx,
5539 };
5540 spawn_monitored_task!(rrr.run())
5541 }
5542
5543 async fn run(mut self) {
5544 info!("RandomnessRoundReceiver event loop started");
5545
5546 loop {
5547 tokio::select! {
5548 maybe_recv = self.randomness_rx.recv() => {
5549 if let Some((epoch, round, bytes)) = maybe_recv {
5550 self.handle_new_randomness(epoch, round, bytes);
5551 } else {
5552 break;
5553 }
5554 },
5555 }
5556 }
5557
5558 info!("RandomnessRoundReceiver event loop ended");
5559 }
5560
5561 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
5562 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
5563 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
5564 if epoch_store.epoch() != epoch {
5565 warn!(
5566 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
5567 epoch_store.epoch()
5568 );
5569 return;
5570 }
5571 let transaction = VerifiedTransaction::new_randomness_state_update(
5572 epoch,
5573 round,
5574 bytes,
5575 epoch_store
5576 .epoch_start_config()
5577 .randomness_obj_initial_shared_version(),
5578 );
5579 debug!(
5580 "created randomness state update transaction with digest: {:?}",
5581 transaction.digest()
5582 );
5583 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
5584 let digest = *transaction.digest();
5585
5586 self.authority_state
5591 .get_cache_commit()
5592 .persist_transaction(&transaction);
5593
5594 self.authority_state
5596 .transaction_manager()
5597 .enqueue(vec![transaction], &epoch_store);
5598
5599 let authority_state = self.authority_state.clone();
5600 spawn_monitored_task!(async move {
5601 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5610 let result = tokio::time::timeout(
5611 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5612 authority_state
5613 .get_transaction_cache_reader()
5614 .try_notify_read_executed_effects(&[digest]),
5615 )
5616 .await;
5617 let result = match result {
5618 Ok(result) => result,
5619 Err(_) => {
5620 if cfg!(debug_assertions) {
5621 panic!(
5623 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5624 );
5625 }
5626 warn!(
5627 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5628 );
5629 authority_state
5631 .get_transaction_cache_reader()
5632 .try_notify_read_executed_effects(&[digest])
5633 .await
5634 }
5635 };
5636
5637 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5638 let effects = effects.pop().expect("should return effects");
5639 if *effects.status() != ExecutionStatus::Success {
5640 fatal!(
5641 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5642 );
5643 }
5644 debug!(
5645 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5646 );
5647 });
5648 }
5649}
5650
5651#[async_trait]
5652impl TransactionKeyValueStoreTrait for AuthorityState {
5653 async fn multi_get(
5654 &self,
5655 transaction_keys: &[TransactionDigest],
5656 effects_keys: &[TransactionDigest],
5657 ) -> IotaResult<KVStoreTransactionData> {
5658 let txns = if !transaction_keys.is_empty() {
5659 self.get_transaction_cache_reader()
5660 .try_multi_get_transaction_blocks(transaction_keys)?
5661 .into_iter()
5662 .map(|t| t.map(|t| (*t).clone().into_inner()))
5663 .collect()
5664 } else {
5665 vec![]
5666 };
5667
5668 let fx = if !effects_keys.is_empty() {
5669 self.get_transaction_cache_reader()
5670 .try_multi_get_executed_effects(effects_keys)?
5671 } else {
5672 vec![]
5673 };
5674
5675 Ok((txns, fx))
5676 }
5677
5678 async fn multi_get_checkpoints(
5679 &self,
5680 checkpoint_summaries: &[CheckpointSequenceNumber],
5681 checkpoint_contents: &[CheckpointSequenceNumber],
5682 checkpoint_summaries_by_digest: &[CheckpointDigest],
5683 ) -> IotaResult<(
5684 Vec<Option<CertifiedCheckpointSummary>>,
5685 Vec<Option<CheckpointContents>>,
5686 Vec<Option<CertifiedCheckpointSummary>>,
5687 )> {
5688 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5690 let store = self.get_checkpoint_store();
5691 for seq in checkpoint_summaries {
5692 let checkpoint = store
5693 .get_checkpoint_by_sequence_number(*seq)?
5694 .map(|c| c.into_inner());
5695
5696 summaries.push(checkpoint);
5697 }
5698
5699 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5700 for seq in checkpoint_contents {
5701 let checkpoint = store
5702 .get_checkpoint_by_sequence_number(*seq)?
5703 .and_then(|summary| {
5704 store
5705 .get_checkpoint_contents(&summary.content_digest)
5706 .expect("db read cannot fail")
5707 });
5708 contents.push(checkpoint);
5709 }
5710
5711 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5712 for digest in checkpoint_summaries_by_digest {
5713 let checkpoint = store
5714 .get_checkpoint_by_digest(digest)?
5715 .map(|c| c.into_inner());
5716 summaries_by_digest.push(checkpoint);
5717 }
5718
5719 Ok((summaries, contents, summaries_by_digest))
5720 }
5721
5722 async fn get_transaction_perpetual_checkpoint(
5723 &self,
5724 digest: TransactionDigest,
5725 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5726 self.get_checkpoint_cache()
5727 .try_get_transaction_perpetual_checkpoint(&digest)
5728 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5729 }
5730
5731 async fn get_object(
5732 &self,
5733 object_id: ObjectID,
5734 version: VersionNumber,
5735 ) -> IotaResult<Option<Object>> {
5736 self.get_object_cache_reader()
5737 .try_get_object_by_key(&object_id, version)
5738 }
5739
5740 async fn multi_get_transactions_perpetual_checkpoints(
5741 &self,
5742 digests: &[TransactionDigest],
5743 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5744 let res = self
5745 .get_checkpoint_cache()
5746 .try_multi_get_transactions_perpetual_checkpoints(digests)?;
5747
5748 Ok(res
5749 .into_iter()
5750 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5751 .collect())
5752 }
5753
5754 #[instrument(skip(self))]
5755 async fn multi_get_events_by_tx_digests(
5756 &self,
5757 digests: &[TransactionDigest],
5758 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5759 if digests.is_empty() {
5760 return Ok(vec![]);
5761 }
5762 let events_digests: Vec<_> = self
5763 .get_transaction_cache_reader()
5764 .try_multi_get_executed_effects(digests)?
5765 .into_iter()
5766 .map(|t| t.and_then(|t| t.events_digest().cloned()))
5767 .collect();
5768 let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5769 let mut events = self
5770 .get_transaction_cache_reader()
5771 .try_multi_get_events(&non_empty_events)?
5772 .into_iter();
5773 Ok(events_digests
5774 .into_iter()
5775 .map(|ev| ev.and_then(|_| events.next()?))
5776 .collect())
5777 }
5778}
5779
5780#[cfg(msim)]
5781pub mod framework_injection {
5782 use std::{
5783 cell::RefCell,
5784 collections::{BTreeMap, BTreeSet},
5785 };
5786
5787 use iota_framework::{BuiltInFramework, SystemPackage};
5788 use iota_types::{
5789 base_types::{AuthorityName, ObjectID},
5790 is_system_package,
5791 };
5792 use move_binary_format::CompiledModule;
5793
5794 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5795
5796 thread_local! {
5798 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5799 }
5800
5801 type Framework = Vec<CompiledModule>;
5802
5803 pub type PackageUpgradeCallback =
5804 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5805
5806 enum PackageOverrideConfig {
5807 Global(Framework),
5808 PerValidator(PackageUpgradeCallback),
5809 }
5810
5811 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5812 modules
5813 .iter()
5814 .map(|m| {
5815 let mut buf = Vec::new();
5816 m.serialize_with_version(m.version, &mut buf).unwrap();
5817 buf
5818 })
5819 .collect()
5820 }
5821
5822 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5823 OVERRIDE.with(|bs| {
5824 bs.borrow_mut()
5825 .insert(package_id, PackageOverrideConfig::Global(modules))
5826 });
5827 }
5828
5829 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5830 OVERRIDE.with(|bs| {
5831 bs.borrow_mut()
5832 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5833 });
5834 }
5835
5836 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5837 OVERRIDE.with(|cfg| {
5838 cfg.borrow().get(package_id).and_then(|entry| match entry {
5839 PackageOverrideConfig::Global(framework) => {
5840 Some(compiled_modules_to_bytes(framework))
5841 }
5842 PackageOverrideConfig::PerValidator(func) => {
5843 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5844 }
5845 })
5846 })
5847 }
5848
5849 pub fn get_override_modules(
5850 package_id: &ObjectID,
5851 name: AuthorityName,
5852 ) -> Option<Vec<CompiledModule>> {
5853 OVERRIDE.with(|cfg| {
5854 cfg.borrow().get(package_id).and_then(|entry| match entry {
5855 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5856 PackageOverrideConfig::PerValidator(func) => func(name),
5857 })
5858 })
5859 }
5860
5861 pub fn get_override_system_package(
5862 package_id: &ObjectID,
5863 name: AuthorityName,
5864 ) -> Option<SystemPackage> {
5865 let bytes = get_override_bytes(package_id, name)?;
5866 let dependencies = if is_system_package(*package_id) {
5867 BuiltInFramework::get_package_by_id(package_id)
5868 .dependencies
5869 .to_vec()
5870 } else {
5871 BuiltInFramework::all_package_ids()
5874 };
5875 Some(SystemPackage {
5876 id: *package_id,
5877 bytes,
5878 dependencies,
5879 })
5880 }
5881
5882 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5883 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5884 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5885 cfg.borrow()
5886 .keys()
5887 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5888 .collect()
5889 });
5890
5891 extra
5892 .into_iter()
5893 .map(|package| SystemPackage {
5894 id: package,
5895 bytes: get_override_bytes(&package, name).unwrap(),
5896 dependencies: BuiltInFramework::all_package_ids(),
5897 })
5898 .collect()
5899 }
5900}
5901
5902#[derive(Debug, Serialize, Deserialize, Clone)]
5903pub struct ObjDumpFormat {
5904 pub id: ObjectID,
5905 pub version: VersionNumber,
5906 pub digest: ObjectDigest,
5907 pub object: Object,
5908}
5909
5910impl ObjDumpFormat {
5911 fn new(object: Object) -> Self {
5912 let oref = object.compute_object_reference();
5913 Self {
5914 id: oref.0,
5915 version: oref.1,
5916 digest: oref.2,
5917 object,
5918 }
5919 }
5920}
5921
5922#[derive(Debug, Serialize, Deserialize, Clone)]
5923pub struct NodeStateDump {
5924 pub tx_digest: TransactionDigest,
5925 pub sender_signed_data: SenderSignedData,
5926 pub executed_epoch: u64,
5927 pub reference_gas_price: u64,
5928 pub protocol_version: u64,
5929 pub epoch_start_timestamp_ms: u64,
5930 pub computed_effects: TransactionEffects,
5931 pub expected_effects_digest: TransactionEffectsDigest,
5932 pub relevant_system_packages: Vec<ObjDumpFormat>,
5933 pub shared_objects: Vec<ObjDumpFormat>,
5934 pub loaded_child_objects: Vec<ObjDumpFormat>,
5935 pub modified_at_versions: Vec<ObjDumpFormat>,
5936 pub runtime_reads: Vec<ObjDumpFormat>,
5937 pub input_objects: Vec<ObjDumpFormat>,
5938}
5939
5940impl NodeStateDump {
5941 pub fn new(
5942 tx_digest: &TransactionDigest,
5943 effects: &TransactionEffects,
5944 expected_effects_digest: TransactionEffectsDigest,
5945 object_store: &dyn ObjectStore,
5946 epoch_store: &Arc<AuthorityPerEpochStore>,
5947 inner_temporary_store: &InnerTemporaryStore,
5948 certificate: &VerifiedExecutableTransaction,
5949 ) -> IotaResult<Self> {
5950 let executed_epoch = epoch_store.epoch();
5952 let reference_gas_price = epoch_store.reference_gas_price();
5953 let epoch_start_config = epoch_store.epoch_start_config();
5954 let protocol_version = epoch_store.protocol_version().as_u64();
5955 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5956
5957 let mut relevant_system_packages = Vec::new();
5959 for sys_package_id in BuiltInFramework::all_package_ids() {
5960 if let Some(w) = object_store.try_get_object(&sys_package_id)? {
5961 relevant_system_packages.push(ObjDumpFormat::new(w))
5962 }
5963 }
5964
5965 let mut shared_objects = Vec::new();
5967 for kind in effects.input_shared_objects() {
5968 match kind {
5969 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5970 if let Some(w) = object_store.try_get_object_by_key(&obj_ref.0, obj_ref.1)? {
5971 shared_objects.push(ObjDumpFormat::new(w))
5972 }
5973 }
5974 InputSharedObject::ReadDeleted(..)
5975 | InputSharedObject::MutateDeleted(..)
5976 | InputSharedObject::Cancelled(..) => (), }
5979 }
5980
5981 let mut loaded_child_objects = Vec::new();
5984 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5985 if let Some(w) = object_store.try_get_object_by_key(id, meta.version)? {
5986 loaded_child_objects.push(ObjDumpFormat::new(w))
5987 }
5988 }
5989
5990 let mut modified_at_versions = Vec::new();
5992 for (id, ver) in effects.modified_at_versions() {
5993 if let Some(w) = object_store.try_get_object_by_key(&id, ver)? {
5994 modified_at_versions.push(ObjDumpFormat::new(w))
5995 }
5996 }
5997
5998 let mut runtime_reads = Vec::new();
6002 for obj in inner_temporary_store
6003 .runtime_packages_loaded_from_db
6004 .values()
6005 {
6006 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
6007 }
6008
6009 Ok(Self {
6012 tx_digest: *tx_digest,
6013 executed_epoch,
6014 reference_gas_price,
6015 epoch_start_timestamp_ms,
6016 protocol_version,
6017 relevant_system_packages,
6018 shared_objects,
6019 loaded_child_objects,
6020 modified_at_versions,
6021 runtime_reads,
6022 sender_signed_data: certificate.clone().into_message(),
6023 input_objects: inner_temporary_store
6024 .input_objects
6025 .values()
6026 .map(|o| ObjDumpFormat::new(o.clone()))
6027 .collect(),
6028 computed_effects: effects.clone(),
6029 expected_effects_digest,
6030 })
6031 }
6032
6033 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
6034 let mut objects = Vec::new();
6035 objects.extend(self.relevant_system_packages.clone());
6036 objects.extend(self.shared_objects.clone());
6037 objects.extend(self.loaded_child_objects.clone());
6038 objects.extend(self.modified_at_versions.clone());
6039 objects.extend(self.runtime_reads.clone());
6040 objects.extend(self.input_objects.clone());
6041 objects
6042 }
6043
6044 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
6045 let file_name = format!(
6046 "{}_{}_NODE_DUMP.json",
6047 self.tx_digest,
6048 AuthorityState::unixtime_now_ms()
6049 );
6050 let mut path = path.to_path_buf();
6051 path.push(&file_name);
6052 let mut file = File::create(path.clone())?;
6053 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
6054 Ok(path)
6055 }
6056
6057 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
6058 let file = File::open(path)?;
6059 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
6060 }
6061}