1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::Duration,
15 vec,
16};
17
18use anyhow::bail;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use chrono::prelude::*;
24use fastcrypto::{
25 encoding::{Base58, Encoding},
26 hash::MultisetHash,
27};
28use iota_archival::reader::ArchiveReaderBalancer;
29use iota_common::debug_fatal;
30use iota_config::{
31 NodeConfig,
32 genesis::Genesis,
33 node::{
34 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
35 StateDebugDumpConfig,
36 },
37};
38use iota_framework::{BuiltInFramework, SystemPackage};
39use iota_json_rpc_types::{
40 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
41 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
42 IotaTransactionBlockEvents, TransactionFilter,
43};
44use iota_macros::{fail_point, fail_point_async, fail_point_if};
45use iota_metrics::{
46 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
47};
48use iota_storage::{
49 key_value_store::{
50 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
51 },
52 key_value_store_metrics::KeyValueStoreMetrics,
53};
54#[cfg(msim)]
55use iota_types::committee::CommitteeTrait;
56use iota_types::{
57 IOTA_SYSTEM_ADDRESS, TypeTag,
58 authenticator_state::get_authenticator_state,
59 base_types::*,
60 committee::{Committee, EpochId, ProtocolVersion},
61 crypto::{AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer, default_hash},
62 deny_list_v1::check_coin_deny_list_v1_during_signing,
63 digests::{ChainIdentifier, TransactionEventsDigest},
64 dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
65 effects::{
66 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
67 TransactionEvents, VerifiedCertifiedTransactionEffects, VerifiedSignedTransactionEffects,
68 },
69 error::{ExecutionError, IotaError, IotaResult, UserInputError},
70 event::{Event, EventID, SystemEpochInfoEvent},
71 executable_transaction::VerifiedExecutableTransaction,
72 execution_config_utils::to_binary_config,
73 execution_status::ExecutionStatus,
74 fp_ensure,
75 gas::{GasCostSummary, IotaGasStatus},
76 inner_temporary_store::{
77 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
78 WrittenObjects,
79 },
80 iota_system_state::{
81 IotaSystemState, IotaSystemStateTrait,
82 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
83 },
84 is_system_package,
85 layout_resolver::{LayoutResolver, into_struct_layout},
86 message_envelope::Message,
87 messages_checkpoint::{
88 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
89 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
90 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
91 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
92 },
93 messages_consensus::AuthorityCapabilitiesV1,
94 messages_grpc::{
95 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
96 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
97 TransactionStatus,
98 },
99 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
100 object::{
101 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
102 bounded_visitor::BoundedVisitor,
103 },
104 storage::{
105 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
106 },
107 supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions},
108 transaction::*,
109 transaction_executor::SimulateTransactionResult,
110};
111use itertools::Itertools;
112use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
113use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
114use once_cell::sync::OnceCell;
115use parking_lot::Mutex;
116use prometheus::{
117 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
118 register_histogram_vec_with_registry, register_histogram_with_registry,
119 register_int_counter_vec_with_registry, register_int_counter_with_registry,
120 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
121};
122use serde::{Deserialize, Serialize, de::DeserializeOwned};
123use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
124use tap::{TapFallible, TapOptional};
125use tokio::{
126 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
127 task::JoinHandle,
128};
129use tracing::{Instrument, debug, error, info, instrument, trace, warn};
130use typed_store::TypedStoreError;
131
132use self::{
133 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
134};
135#[cfg(msim)]
136pub use crate::checkpoints::checkpoint_executor::{
137 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
138};
139use crate::{
140 authority::{
141 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
142 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
143 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
144 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
145 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
146 },
147 authority_client::NetworkAuthorityClient,
148 checkpoints::CheckpointStore,
149 consensus_adapter::ConsensusAdapter,
150 epoch::committee_store::CommitteeStore,
151 execution_cache::{
152 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
153 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
154 TransactionCacheRead,
155 },
156 execution_driver::execution_process,
157 jsonrpc_index::{CoinInfo, IndexStore, ObjectIndexChanges},
158 metrics::{LatencyObserver, RateTracker},
159 module_cache_metrics::ResolverMetrics,
160 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
161 rest_index::RestIndexStore,
162 stake_aggregator::StakeAggregator,
163 state_accumulator::{AccumulatorStore, StateAccumulator},
164 subscription_handler::SubscriptionHandler,
165 transaction_input_loader::TransactionInputLoader,
166 transaction_manager::TransactionManager,
167 transaction_outputs::TransactionOutputs,
168 validator_tx_finalizer::ValidatorTxFinalizer,
169 verify_indexes::verify_indexes,
170};
171
172#[cfg(test)]
173#[path = "unit_tests/authority_tests.rs"]
174pub mod authority_tests;
175
176#[cfg(test)]
177#[path = "unit_tests/transaction_tests.rs"]
178pub mod transaction_tests;
179
180#[cfg(test)]
181#[path = "unit_tests/batch_transaction_tests.rs"]
182mod batch_transaction_tests;
183
184#[cfg(test)]
185#[path = "unit_tests/move_integration_tests.rs"]
186pub mod move_integration_tests;
187
188#[cfg(test)]
189#[path = "unit_tests/gas_tests.rs"]
190mod gas_tests;
191
192#[cfg(test)]
193#[path = "unit_tests/batch_verification_tests.rs"]
194mod batch_verification_tests;
195
196#[cfg(test)]
197#[path = "unit_tests/coin_deny_list_tests.rs"]
198mod coin_deny_list_tests;
199
200#[cfg(any(test, feature = "test-utils"))]
201pub mod authority_test_utils;
202
203pub mod authority_per_epoch_store;
204pub mod authority_per_epoch_store_pruner;
205
206pub mod authority_store_pruner;
207pub mod authority_store_tables;
208pub mod authority_store_types;
209pub mod epoch_start_configuration;
210pub mod shared_object_congestion_tracker;
211pub mod shared_object_version_manager;
212#[cfg(any(test, feature = "test-utils"))]
213pub mod test_authority_builder;
214pub mod transaction_deferral;
215
216pub(crate) mod authority_store;
217
218pub static CHAIN_IDENTIFIER: OnceCell<ChainIdentifier> = OnceCell::new();
219
220pub struct AuthorityMetrics {
222 tx_orders: IntCounter,
223 total_certs: IntCounter,
224 total_cert_attempts: IntCounter,
225 total_effects: IntCounter,
226 pub shared_obj_tx: IntCounter,
227 sponsored_tx: IntCounter,
228 tx_already_processed: IntCounter,
229 num_input_objs: Histogram,
230 num_shared_objects: Histogram,
231 batch_size: Histogram,
232
233 authority_state_handle_transaction_latency: Histogram,
234
235 execute_certificate_latency_single_writer: Histogram,
236 execute_certificate_latency_shared_object: Histogram,
237
238 execute_certificate_with_effects_latency: Histogram,
239 internal_execution_latency: Histogram,
240 execution_load_input_objects_latency: Histogram,
241 prepare_certificate_latency: Histogram,
242 commit_certificate_latency: Histogram,
243 db_checkpoint_latency: Histogram,
244
245 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
246 pub(crate) transaction_manager_num_missing_objects: IntGauge,
247 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
248 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
249 pub(crate) transaction_manager_num_ready: IntGauge,
250 pub(crate) transaction_manager_object_cache_size: IntGauge,
251 pub(crate) transaction_manager_object_cache_hits: IntCounter,
252 pub(crate) transaction_manager_object_cache_misses: IntCounter,
253 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
254 pub(crate) transaction_manager_package_cache_size: IntGauge,
255 pub(crate) transaction_manager_package_cache_hits: IntCounter,
256 pub(crate) transaction_manager_package_cache_misses: IntCounter,
257 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
258 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
259
260 pub(crate) execution_driver_executed_transactions: IntCounter,
261 pub(crate) execution_driver_dispatch_queue: IntGauge,
262 pub(crate) execution_queueing_delay_s: Histogram,
263 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
264 pub(crate) execution_gas_latency_ratio: Histogram,
265
266 pub(crate) skipped_consensus_txns: IntCounter,
267 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
268
269 pub(crate) authority_overload_status: IntGauge,
270 pub(crate) authority_load_shedding_percentage: IntGauge,
271
272 pub(crate) transaction_overload_sources: IntCounterVec,
273
274 post_processing_total_events_emitted: IntCounter,
276 post_processing_total_tx_indexed: IntCounter,
277 post_processing_total_tx_had_event_processed: IntCounter,
278 post_processing_total_failures: IntCounter,
279
280 pub consensus_handler_processed: IntCounterVec,
282 pub consensus_handler_transaction_sizes: HistogramVec,
283 pub consensus_handler_num_low_scoring_authorities: IntGauge,
284 pub consensus_handler_scores: IntGaugeVec,
285 pub consensus_handler_deferred_transactions: IntCounter,
286 pub consensus_handler_congested_transactions: IntCounter,
287 pub consensus_handler_cancelled_transactions: IntCounter,
288 pub consensus_handler_max_object_costs: IntGaugeVec,
289 pub consensus_committed_subdags: IntCounterVec,
290 pub consensus_committed_messages: IntGaugeVec,
291 pub consensus_committed_user_transactions: IntGaugeVec,
292 pub consensus_calculated_throughput: IntGauge,
293 pub consensus_calculated_throughput_profile: IntGauge,
294
295 pub limits_metrics: Arc<LimitsMetrics>,
296
297 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
299
300 pub zklogin_sig_count: IntCounter,
302 pub multisig_sig_count: IntCounter,
304
305 pub execution_queueing_latency: LatencyObserver,
308
309 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
316
317 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
320}
321
322const POSITIVE_INT_BUCKETS: &[f64] = &[
324 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
325 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
326 7000000., 10000000.,
327];
328
329const LATENCY_SEC_BUCKETS: &[f64] = &[
330 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
331 10., 20., 30., 60., 90.,
332];
333
334const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
336 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
337 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
338];
339
340const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
341 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
342 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
343];
344
345pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000;
346
347impl AuthorityMetrics {
348 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
349 let execute_certificate_latency = register_histogram_vec_with_registry!(
350 "authority_state_execute_certificate_latency",
351 "Latency of executing certificates, including waiting for inputs",
352 &["tx_type"],
353 LATENCY_SEC_BUCKETS.to_vec(),
354 registry,
355 )
356 .unwrap();
357
358 let execute_certificate_latency_single_writer =
359 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
360 let execute_certificate_latency_shared_object =
361 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
362
363 Self {
364 tx_orders: register_int_counter_with_registry!(
365 "total_transaction_orders",
366 "Total number of transaction orders",
367 registry,
368 )
369 .unwrap(),
370 total_certs: register_int_counter_with_registry!(
371 "total_transaction_certificates",
372 "Total number of transaction certificates handled",
373 registry,
374 )
375 .unwrap(),
376 total_cert_attempts: register_int_counter_with_registry!(
377 "total_handle_certificate_attempts",
378 "Number of calls to handle_certificate",
379 registry,
380 )
381 .unwrap(),
382 total_effects: register_int_counter_with_registry!(
384 "total_transaction_effects",
385 "Total number of transaction effects produced",
386 registry,
387 )
388 .unwrap(),
389
390 shared_obj_tx: register_int_counter_with_registry!(
391 "num_shared_obj_tx",
392 "Number of transactions involving shared objects",
393 registry,
394 )
395 .unwrap(),
396
397 sponsored_tx: register_int_counter_with_registry!(
398 "num_sponsored_tx",
399 "Number of sponsored transactions",
400 registry,
401 )
402 .unwrap(),
403
404 tx_already_processed: register_int_counter_with_registry!(
405 "num_tx_already_processed",
406 "Number of transaction orders already processed previously",
407 registry,
408 )
409 .unwrap(),
410 num_input_objs: register_histogram_with_registry!(
411 "num_input_objects",
412 "Distribution of number of input TX objects per TX",
413 POSITIVE_INT_BUCKETS.to_vec(),
414 registry,
415 )
416 .unwrap(),
417 num_shared_objects: register_histogram_with_registry!(
418 "num_shared_objects",
419 "Number of shared input objects per TX",
420 POSITIVE_INT_BUCKETS.to_vec(),
421 registry,
422 )
423 .unwrap(),
424 batch_size: register_histogram_with_registry!(
425 "batch_size",
426 "Distribution of size of transaction batch",
427 POSITIVE_INT_BUCKETS.to_vec(),
428 registry,
429 )
430 .unwrap(),
431 authority_state_handle_transaction_latency: register_histogram_with_registry!(
432 "authority_state_handle_transaction_latency",
433 "Latency of handling transactions",
434 LATENCY_SEC_BUCKETS.to_vec(),
435 registry,
436 )
437 .unwrap(),
438 execute_certificate_latency_single_writer,
439 execute_certificate_latency_shared_object,
440 execute_certificate_with_effects_latency: register_histogram_with_registry!(
441 "authority_state_execute_certificate_with_effects_latency",
442 "Latency of executing certificates with effects, including waiting for inputs",
443 LATENCY_SEC_BUCKETS.to_vec(),
444 registry,
445 )
446 .unwrap(),
447 internal_execution_latency: register_histogram_with_registry!(
448 "authority_state_internal_execution_latency",
449 "Latency of actual certificate executions",
450 LATENCY_SEC_BUCKETS.to_vec(),
451 registry,
452 )
453 .unwrap(),
454 execution_load_input_objects_latency: register_histogram_with_registry!(
455 "authority_state_execution_load_input_objects_latency",
456 "Latency of loading input objects for execution",
457 LOW_LATENCY_SEC_BUCKETS.to_vec(),
458 registry,
459 )
460 .unwrap(),
461 prepare_certificate_latency: register_histogram_with_registry!(
462 "authority_state_prepare_certificate_latency",
463 "Latency of executing certificates, before committing the results",
464 LATENCY_SEC_BUCKETS.to_vec(),
465 registry,
466 )
467 .unwrap(),
468 commit_certificate_latency: register_histogram_with_registry!(
469 "authority_state_commit_certificate_latency",
470 "Latency of committing certificate execution results",
471 LATENCY_SEC_BUCKETS.to_vec(),
472 registry,
473 )
474 .unwrap(),
475 db_checkpoint_latency: register_histogram_with_registry!(
476 "db_checkpoint_latency",
477 "Latency of checkpointing dbs",
478 LATENCY_SEC_BUCKETS.to_vec(),
479 registry,
480 ).unwrap(),
481 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
482 "transaction_manager_num_enqueued_certificates",
483 "Current number of certificates enqueued to TransactionManager",
484 &["result"],
485 registry,
486 )
487 .unwrap(),
488 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
489 "transaction_manager_num_missing_objects",
490 "Current number of missing objects in TransactionManager",
491 registry,
492 )
493 .unwrap(),
494 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
495 "transaction_manager_num_pending_certificates",
496 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
497 registry,
498 )
499 .unwrap(),
500 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
501 "transaction_manager_num_executing_certificates",
502 "Number of executing certificates, including queued and actually running certificates",
503 registry,
504 )
505 .unwrap(),
506 transaction_manager_num_ready: register_int_gauge_with_registry!(
507 "transaction_manager_num_ready",
508 "Number of ready transactions in TransactionManager",
509 registry,
510 )
511 .unwrap(),
512 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
513 "transaction_manager_object_cache_size",
514 "Current size of object-availability cache in TransactionManager",
515 registry,
516 )
517 .unwrap(),
518 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
519 "transaction_manager_object_cache_hits",
520 "Number of object-availability cache hits in TransactionManager",
521 registry,
522 )
523 .unwrap(),
524 authority_overload_status: register_int_gauge_with_registry!(
525 "authority_overload_status",
526 "Whether authority is current experiencing overload and enters load shedding mode.",
527 registry)
528 .unwrap(),
529 authority_load_shedding_percentage: register_int_gauge_with_registry!(
530 "authority_load_shedding_percentage",
531 "The percentage of transactions is shed when the authority is in load shedding mode.",
532 registry)
533 .unwrap(),
534 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
535 "transaction_manager_object_cache_misses",
536 "Number of object-availability cache misses in TransactionManager",
537 registry,
538 )
539 .unwrap(),
540 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
541 "transaction_manager_object_cache_evictions",
542 "Number of object-availability cache evictions in TransactionManager",
543 registry,
544 )
545 .unwrap(),
546 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
547 "transaction_manager_package_cache_size",
548 "Current size of package-availability cache in TransactionManager",
549 registry,
550 )
551 .unwrap(),
552 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
553 "transaction_manager_package_cache_hits",
554 "Number of package-availability cache hits in TransactionManager",
555 registry,
556 )
557 .unwrap(),
558 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
559 "transaction_manager_package_cache_misses",
560 "Number of package-availability cache misses in TransactionManager",
561 registry,
562 )
563 .unwrap(),
564 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
565 "transaction_manager_package_cache_evictions",
566 "Number of package-availability cache evictions in TransactionManager",
567 registry,
568 )
569 .unwrap(),
570 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
571 "transaction_manager_transaction_queue_age_s",
572 "Time spent in waiting for transaction in the queue",
573 LATENCY_SEC_BUCKETS.to_vec(),
574 registry,
575 )
576 .unwrap(),
577 transaction_overload_sources: register_int_counter_vec_with_registry!(
578 "transaction_overload_sources",
579 "Number of times each source indicates transaction overload.",
580 &["source"],
581 registry)
582 .unwrap(),
583 execution_driver_executed_transactions: register_int_counter_with_registry!(
584 "execution_driver_executed_transactions",
585 "Cumulative number of transaction executed by execution driver",
586 registry,
587 )
588 .unwrap(),
589 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
590 "execution_driver_dispatch_queue",
591 "Number of transaction pending in execution driver dispatch queue",
592 registry,
593 )
594 .unwrap(),
595 execution_queueing_delay_s: register_histogram_with_registry!(
596 "execution_queueing_delay_s",
597 "Queueing delay between a transaction is ready for execution until it starts executing.",
598 LATENCY_SEC_BUCKETS.to_vec(),
599 registry
600 )
601 .unwrap(),
602 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
603 "prepare_cert_gas_latency_ratio",
604 "The ratio of computation gas divided by VM execution latency.",
605 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
606 registry
607 )
608 .unwrap(),
609 execution_gas_latency_ratio: register_histogram_with_registry!(
610 "execution_gas_latency_ratio",
611 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
612 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
613 registry
614 )
615 .unwrap(),
616 skipped_consensus_txns: register_int_counter_with_registry!(
617 "skipped_consensus_txns",
618 "Total number of consensus transactions skipped",
619 registry,
620 )
621 .unwrap(),
622 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
623 "skipped_consensus_txns_cache_hit",
624 "Total number of consensus transactions skipped because of local cache hit",
625 registry,
626 )
627 .unwrap(),
628 post_processing_total_events_emitted: register_int_counter_with_registry!(
629 "post_processing_total_events_emitted",
630 "Total number of events emitted in post processing",
631 registry,
632 )
633 .unwrap(),
634 post_processing_total_tx_indexed: register_int_counter_with_registry!(
635 "post_processing_total_tx_indexed",
636 "Total number of txes indexed in post processing",
637 registry,
638 )
639 .unwrap(),
640 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
641 "post_processing_total_tx_had_event_processed",
642 "Total number of txes finished event processing in post processing",
643 registry,
644 )
645 .unwrap(),
646 post_processing_total_failures: register_int_counter_with_registry!(
647 "post_processing_total_failures",
648 "Total number of failure in post processing",
649 registry,
650 )
651 .unwrap(),
652 consensus_handler_processed: register_int_counter_vec_with_registry!(
653 "consensus_handler_processed",
654 "Number of transactions processed by consensus handler",
655 &["class"],
656 registry
657 ).unwrap(),
658 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
659 "consensus_handler_transaction_sizes",
660 "Sizes of each type of transactions processed by consensus handler",
661 &["class"],
662 POSITIVE_INT_BUCKETS.to_vec(),
663 registry
664 ).unwrap(),
665 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
666 "consensus_handler_num_low_scoring_authorities",
667 "Number of low scoring authorities based on reputation scores from consensus",
668 registry
669 ).unwrap(),
670 consensus_handler_scores: register_int_gauge_vec_with_registry!(
671 "consensus_handler_scores",
672 "scores from consensus for each authority",
673 &["authority"],
674 registry,
675 ).unwrap(),
676 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
677 "consensus_handler_deferred_transactions",
678 "Number of transactions deferred by consensus handler",
679 registry,
680 ).unwrap(),
681 consensus_handler_congested_transactions: register_int_counter_with_registry!(
682 "consensus_handler_congested_transactions",
683 "Number of transactions deferred by consensus handler due to congestion",
684 registry,
685 ).unwrap(),
686 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
687 "consensus_handler_cancelled_transactions",
688 "Number of transactions cancelled by consensus handler",
689 registry,
690 ).unwrap(),
691 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
692 "consensus_handler_max_congestion_control_object_costs",
693 "Max object costs for congestion control in the current consensus commit",
694 &["commit_type"],
695 registry,
696 ).unwrap(),
697 consensus_committed_subdags: register_int_counter_vec_with_registry!(
698 "consensus_committed_subdags",
699 "Number of committed subdags, sliced by author",
700 &["authority"],
701 registry,
702 ).unwrap(),
703 consensus_committed_messages: register_int_gauge_vec_with_registry!(
704 "consensus_committed_messages",
705 "Total number of committed consensus messages, sliced by author",
706 &["authority"],
707 registry,
708 ).unwrap(),
709 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
710 "consensus_committed_user_transactions",
711 "Number of committed user transactions, sliced by submitter",
712 &["authority"],
713 registry,
714 ).unwrap(),
715 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
716 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
717 zklogin_sig_count: register_int_counter_with_registry!(
718 "zklogin_sig_count",
719 "Count of zkLogin signatures",
720 registry,
721 )
722 .unwrap(),
723 multisig_sig_count: register_int_counter_with_registry!(
724 "multisig_sig_count",
725 "Count of zkLogin signatures",
726 registry,
727 )
728 .unwrap(),
729 consensus_calculated_throughput: register_int_gauge_with_registry!(
730 "consensus_calculated_throughput",
731 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
732 registry,
733 ).unwrap(),
734 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
735 "consensus_calculated_throughput_profile",
736 "The current active calculated throughput profile",
737 registry
738 ).unwrap(),
739 execution_queueing_latency: LatencyObserver::new(),
740 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
741 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
742 }
743 }
744
745 pub fn reset_on_reconfigure(&self) {
749 self.consensus_committed_messages.reset();
750 self.consensus_handler_scores.reset();
751 self.consensus_committed_user_transactions.reset();
752 }
753}
754
755pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
762
763pub struct AuthorityState {
764 pub name: AuthorityName,
767 pub secret: StableSyncAuthoritySigner,
769
770 input_loader: TransactionInputLoader,
772 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
773
774 epoch_store: ArcSwap<AuthorityPerEpochStore>,
775
776 execution_lock: RwLock<EpochId>,
782
783 pub indexes: Option<Arc<IndexStore>>,
784 pub rest_index: Option<Arc<RestIndexStore>>,
785
786 pub subscription_handler: Arc<SubscriptionHandler>,
787 checkpoint_store: Arc<CheckpointStore>,
788
789 committee_store: Arc<CommitteeStore>,
790
791 transaction_manager: Arc<TransactionManager>,
793
794 #[cfg_attr(not(test), expect(unused))]
796 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
797
798 pub metrics: Arc<AuthorityMetrics>,
799 _pruner: AuthorityStorePruner,
800 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
801
802 db_checkpoint_config: DBCheckpointConfig,
804
805 pub config: NodeConfig,
806
807 pub overload_info: AuthorityOverloadInfo,
809
810 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
811}
812
813impl AuthorityState {
822 pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
823 epoch_store.committee().authority_exists(&self.name)
824 }
825
826 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
827 !self.is_validator(epoch_store)
828 }
829
830 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
831 &self.committee_store
832 }
833
834 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
835 self.committee_store.clone()
836 }
837
838 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
839 &self.config.authority_overload_config
840 }
841
842 pub fn get_epoch_state_commitments(
843 &self,
844 epoch: EpochId,
845 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
846 self.checkpoint_store.get_epoch_state_commitments(epoch)
847 }
848
849 #[instrument(level = "trace", skip_all)]
853 async fn handle_transaction_impl(
854 &self,
855 transaction: VerifiedTransaction,
856 epoch_store: &Arc<AuthorityPerEpochStore>,
857 ) -> IotaResult<VerifiedSignedTransaction> {
858 let _execution_lock = self.execution_lock_for_signing().await;
860
861 let tx_digest = transaction.digest();
862 let tx_data = transaction.data().transaction_data();
863
864 let input_object_kinds = tx_data.input_objects()?;
865 let receiving_objects_refs = tx_data.receiving_objects();
866
867 iota_transaction_checks::deny::check_transaction_for_signing(
871 tx_data,
872 transaction.tx_signatures(),
873 &input_object_kinds,
874 &receiving_objects_refs,
875 &self.config.transaction_deny_config,
876 self.get_backing_package_store().as_ref(),
877 )?;
878
879 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
880 Some(tx_digest),
881 &input_object_kinds,
882 &receiving_objects_refs,
883 epoch_store.epoch(),
884 )?;
885
886 let (_gas_status, checked_input_objects) =
887 iota_transaction_checks::check_transaction_input(
888 epoch_store.protocol_config(),
889 epoch_store.reference_gas_price(),
890 tx_data,
891 input_objects,
892 &receiving_objects,
893 &self.metrics.bytecode_verifier_metrics,
894 &self.config.verifier_signing_config,
895 )?;
896
897 check_coin_deny_list_v1_during_signing(
898 tx_data.sender(),
899 &checked_input_objects,
900 &receiving_objects,
901 &self.get_object_store(),
902 )?;
903
904 let owned_objects = checked_input_objects.inner().filter_owned_objects();
905
906 let signed_transaction = VerifiedSignedTransaction::new(
907 epoch_store.epoch(),
908 transaction,
909 self.name,
910 &*self.secret,
911 );
912
913 self.get_cache_writer()
918 .acquire_transaction_locks(epoch_store, &owned_objects, signed_transaction.clone())
919 .await?;
920
921 Ok(signed_transaction)
922 }
923
924 #[instrument(level = "trace", skip_all)]
926 pub async fn handle_transaction(
927 &self,
928 epoch_store: &Arc<AuthorityPerEpochStore>,
929 transaction: VerifiedTransaction,
930 ) -> IotaResult<HandleTransactionResponse> {
931 let tx_digest = *transaction.digest();
932 debug!("handle_transaction");
933
934 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
936 return Ok(HandleTransactionResponse { status });
937 }
938
939 let _metrics_guard = self
940 .metrics
941 .authority_state_handle_transaction_latency
942 .start_timer();
943 self.metrics.tx_orders.inc();
944
945 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
946 match signed {
947 Ok(s) => {
948 if self.is_validator(epoch_store) {
949 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
950 let tx = s.clone();
951 let validator_tx_finalizer = validator_tx_finalizer.clone();
952 let cache_reader = self.get_transaction_cache_reader().clone();
953 let epoch_store = epoch_store.clone();
954 spawn_monitored_task!(epoch_store.within_alive_epoch(
955 validator_tx_finalizer.track_signed_tx(cache_reader, &epoch_store, tx)
956 ));
957 }
958 }
959 Ok(HandleTransactionResponse {
960 status: TransactionStatus::Signed(s.into_inner().into_sig()),
961 })
962 }
963 Err(err) => Ok(HandleTransactionResponse {
967 status: self
968 .get_transaction_status(&tx_digest, epoch_store)?
969 .ok_or(err)?
970 .1,
971 }),
972 }
973 }
974
975 pub fn check_system_overload_at_signing(&self) -> bool {
976 self.config
977 .authority_overload_config
978 .check_system_overload_at_signing
979 }
980
981 pub fn check_system_overload_at_execution(&self) -> bool {
982 self.config
983 .authority_overload_config
984 .check_system_overload_at_execution
985 }
986
987 pub(crate) fn check_system_overload(
988 &self,
989 consensus_adapter: &Arc<ConsensusAdapter>,
990 tx_data: &SenderSignedData,
991 do_authority_overload_check: bool,
992 ) -> IotaResult {
993 if do_authority_overload_check {
994 self.check_authority_overload(tx_data).tap_err(|_| {
995 self.update_overload_metrics("execution_queue");
996 })?;
997 }
998 self.transaction_manager
999 .check_execution_overload(self.overload_config(), tx_data)
1000 .tap_err(|_| {
1001 self.update_overload_metrics("execution_pending");
1002 })?;
1003 consensus_adapter.check_consensus_overload().tap_err(|_| {
1004 self.update_overload_metrics("consensus");
1005 })?;
1006 Ok(())
1007 }
1008
1009 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1010 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1011 return Ok(());
1012 }
1013
1014 let load_shedding_percentage = self
1015 .overload_info
1016 .load_shedding_percentage
1017 .load(Ordering::Relaxed);
1018 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1019 }
1020
1021 fn update_overload_metrics(&self, source: &str) {
1022 self.metrics
1023 .transaction_overload_sources
1024 .with_label_values(&[source])
1025 .inc();
1026 }
1027
1028 #[instrument(level = "trace", skip_all)]
1034 pub async fn fullnode_execute_certificate_with_effects(
1035 &self,
1036 transaction: &VerifiedExecutableTransaction,
1037 effects: &VerifiedCertifiedTransactionEffects,
1044 epoch_store: &Arc<AuthorityPerEpochStore>,
1045 ) -> IotaResult {
1046 assert!(self.is_fullnode(epoch_store));
1047 if self.epoch_store.load().epoch() != epoch_store.epoch() {
1052 return Err(IotaError::EpochEnded(epoch_store.epoch()));
1053 }
1054 let _metrics_guard = self
1055 .metrics
1056 .execute_certificate_with_effects_latency
1057 .start_timer();
1058 let digest = *transaction.digest();
1059 debug!("execute_certificate_with_effects");
1060 fp_ensure!(
1061 *effects.data().transaction_digest() == digest,
1062 IotaError::ErrorWhileProcessingCertificate {
1063 err: "effects/tx digest mismatch".to_string()
1064 }
1065 );
1066
1067 if transaction.contains_shared_object() {
1068 epoch_store
1069 .acquire_shared_version_assignments_from_effects(
1070 transaction,
1071 effects.data(),
1072 self.get_object_cache_reader().as_ref(),
1073 )
1074 .await?;
1075 }
1076
1077 let expected_effects_digest = effects.digest();
1078
1079 self.transaction_manager
1080 .enqueue(vec![transaction.clone()], epoch_store);
1081
1082 let observed_effects = self
1083 .get_transaction_cache_reader()
1084 .notify_read_executed_effects(&[digest])
1085 .instrument(tracing::debug_span!(
1086 "notify_read_effects_in_execute_certificate_with_effects"
1087 ))
1088 .await?
1089 .pop()
1090 .expect("notify_read_effects should return exactly 1 element");
1091
1092 let observed_effects_digest = observed_effects.digest();
1093 if &observed_effects_digest != expected_effects_digest {
1094 panic!(
1095 "Locally executed effects do not match canonical effects! expected_effects_digest={:?} observed_effects_digest={:?} expected_effects={:?} observed_effects={:?} input_objects={:?}",
1096 expected_effects_digest,
1097 observed_effects_digest,
1098 effects.data(),
1099 observed_effects,
1100 transaction.data().transaction_data().input_objects()
1101 );
1102 }
1103 Ok(())
1104 }
1105
1106 #[instrument(level = "trace", skip_all)]
1108 pub async fn execute_certificate(
1109 &self,
1110 certificate: &VerifiedCertificate,
1111 epoch_store: &Arc<AuthorityPerEpochStore>,
1112 ) -> IotaResult<TransactionEffects> {
1113 let _metrics_guard = if certificate.contains_shared_object() {
1114 self.metrics
1115 .execute_certificate_latency_shared_object
1116 .start_timer()
1117 } else {
1118 self.metrics
1119 .execute_certificate_latency_single_writer
1120 .start_timer()
1121 };
1122 trace!("execute_certificate");
1123
1124 self.metrics.total_cert_attempts.inc();
1125
1126 if !certificate.contains_shared_object() {
1127 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1132 }
1133
1134 epoch_store
1137 .within_alive_epoch(self.notify_read_effects(certificate))
1138 .await
1139 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1140 .and_then(|r| r)
1141 }
1142
1143 #[instrument(level = "trace", skip_all)]
1158 pub async fn try_execute_immediately(
1159 &self,
1160 certificate: &VerifiedExecutableTransaction,
1161 mut expected_effects_digest: Option<TransactionEffectsDigest>,
1162 epoch_store: &Arc<AuthorityPerEpochStore>,
1163 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1164 let _scope = monitored_scope("Execution::try_execute_immediately");
1165 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1166
1167 let tx_digest = certificate.digest();
1168
1169 let tx_guard = epoch_store.acquire_tx_guard(certificate).await?;
1171
1172 if let Some(effects) = self
1175 .get_transaction_cache_reader()
1176 .get_executed_effects(tx_digest)?
1177 {
1178 tx_guard.release();
1179 return Ok((effects, None));
1180 }
1181 let input_objects =
1182 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1183
1184 if expected_effects_digest.is_none() {
1185 expected_effects_digest = epoch_store.get_signed_effects_digest(tx_digest)?;
1190 }
1191
1192 self.process_certificate(
1193 tx_guard,
1194 certificate,
1195 input_objects,
1196 expected_effects_digest,
1197 epoch_store,
1198 )
1199 .await
1200 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1201 .tap_ok(
1202 |(fx, _)| debug!(?tx_digest, fx_digest=?fx.digest(), "process_certificate succeeded"),
1203 )
1204 }
1205
1206 pub fn read_objects_for_execution(
1207 &self,
1208 tx_lock: &CertLockGuard,
1209 certificate: &VerifiedExecutableTransaction,
1210 epoch_store: &Arc<AuthorityPerEpochStore>,
1211 ) -> IotaResult<InputObjects> {
1212 let _scope = monitored_scope("Execution::load_input_objects");
1213 let _metrics_guard = self
1214 .metrics
1215 .execution_load_input_objects_latency
1216 .start_timer();
1217 let input_objects = &certificate.data().transaction_data().input_objects()?;
1218 self.input_loader.read_objects_for_execution(
1219 epoch_store,
1220 &certificate.key(),
1221 tx_lock,
1222 input_objects,
1223 epoch_store.epoch(),
1224 )
1225 }
1226
1227 pub async fn try_execute_for_test(
1231 &self,
1232 certificate: &VerifiedCertificate,
1233 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1234 let epoch_store = self.epoch_store_for_testing();
1235 let (effects, execution_error_opt) = self
1236 .try_execute_immediately(
1237 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1238 None,
1239 &epoch_store,
1240 )
1241 .await?;
1242 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1243 Ok((signed_effects, execution_error_opt))
1244 }
1245
1246 pub async fn notify_read_effects(
1247 &self,
1248 certificate: &VerifiedCertificate,
1249 ) -> IotaResult<TransactionEffects> {
1250 self.get_transaction_cache_reader()
1251 .notify_read_executed_effects(&[*certificate.digest()])
1252 .await
1253 .map(|mut r| r.pop().expect("must return correct number of effects"))
1254 }
1255
1256 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1257 self.get_object_cache_reader()
1258 .check_owned_objects_are_live(owned_object_refs)
1259 }
1260
1261 pub(crate) fn debug_dump_transaction_state(
1266 &self,
1267 tx_digest: &TransactionDigest,
1268 effects: &TransactionEffects,
1269 expected_effects_digest: TransactionEffectsDigest,
1270 inner_temporary_store: &InnerTemporaryStore,
1271 certificate: &VerifiedExecutableTransaction,
1272 debug_dump_config: &StateDebugDumpConfig,
1273 ) -> IotaResult<PathBuf> {
1274 let dump_dir = debug_dump_config
1275 .dump_file_directory
1276 .as_ref()
1277 .cloned()
1278 .unwrap_or(std::env::temp_dir());
1279 let epoch_store = self.load_epoch_store_one_call_per_task();
1280
1281 NodeStateDump::new(
1282 tx_digest,
1283 effects,
1284 expected_effects_digest,
1285 self.get_object_store().as_ref(),
1286 &epoch_store,
1287 inner_temporary_store,
1288 certificate,
1289 )?
1290 .write_to_file(&dump_dir)
1291 .map_err(|e| IotaError::FileIO(e.to_string()))
1292 }
1293
1294 #[instrument(level = "trace", skip_all)]
1295 pub(crate) async fn process_certificate(
1296 &self,
1297 tx_guard: CertTxGuard,
1298 certificate: &VerifiedExecutableTransaction,
1299 input_objects: InputObjects,
1300 expected_effects_digest: Option<TransactionEffectsDigest>,
1301 epoch_store: &Arc<AuthorityPerEpochStore>,
1302 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1303 let process_certificate_start_time = tokio::time::Instant::now();
1304 let digest = *certificate.digest();
1305
1306 fail_point_if!("correlated-crash-process-certificate", || {
1307 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1308 iota_simulator::task::kill_current_node(None);
1309 }
1310 });
1311
1312 let execution_guard = self
1313 .execution_lock_for_executable_transaction(certificate)
1314 .await;
1315 let execution_guard = match execution_guard {
1321 Ok(execution_guard) => execution_guard,
1322 Err(err) => {
1323 tx_guard.release();
1324 return Err(err);
1325 }
1326 };
1327 if *execution_guard != epoch_store.epoch() {
1331 tx_guard.release();
1332 info!("The epoch of the execution_guard doesn't match the epoch store");
1333 return Err(IotaError::WrongEpoch {
1334 expected_epoch: epoch_store.epoch(),
1335 actual_epoch: *execution_guard,
1336 });
1337 }
1338
1339 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1345 &execution_guard,
1346 certificate,
1347 input_objects,
1348 epoch_store,
1349 ) {
1350 Err(e) => {
1351 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1352 tx_guard.release();
1353 return Err(e);
1354 }
1355 Ok(res) => res,
1356 };
1357
1358 if let Some(expected_effects_digest) = expected_effects_digest {
1359 if effects.digest() != expected_effects_digest {
1360 match self.debug_dump_transaction_state(
1362 &digest,
1363 &effects,
1364 expected_effects_digest,
1365 &inner_temporary_store,
1366 certificate,
1367 &self.config.state_debug_dump_config,
1368 ) {
1369 Ok(out_path) => {
1370 info!(
1371 "Dumped node state for transaction {} to {}",
1372 digest,
1373 out_path.as_path().display().to_string()
1374 );
1375 }
1376 Err(e) => {
1377 error!("Error dumping state for transaction {}: {e}", digest);
1378 }
1379 }
1380 error!(
1381 tx_digest = ?digest,
1382 ?expected_effects_digest,
1383 actual_effects = ?effects,
1384 "fork detected!"
1385 );
1386 panic!(
1387 "Transaction {} is expected to have effects digest {}, but got {}!",
1388 digest,
1389 expected_effects_digest,
1390 effects.digest(),
1391 );
1392 }
1393 }
1394
1395 fail_point_async!("crash");
1396
1397 self.commit_certificate(
1398 certificate,
1399 inner_temporary_store,
1400 &effects,
1401 tx_guard,
1402 execution_guard,
1403 epoch_store,
1404 )
1405 .await?;
1406
1407 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1408 certificate.data().transaction_data().kind()
1409 {
1410 if let Some(err) = &execution_error_opt {
1411 debug_fatal!("Authenticator state update failed: {:?}", err);
1412 }
1413 epoch_store.update_authenticator_state(auth_state);
1414
1415 if cfg!(debug_assertions) {
1418 let authenticator_state = get_authenticator_state(self.get_object_store())
1419 .expect("Read cannot fail")
1420 .expect("Authenticator state must exist");
1421
1422 let mut sys_jwks: Vec<_> = authenticator_state
1423 .active_jwks
1424 .into_iter()
1425 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1426 .collect();
1427 let mut active_jwks: Vec<_> = epoch_store
1428 .signature_verifier
1429 .get_jwks()
1430 .into_iter()
1431 .collect();
1432 sys_jwks.sort();
1433 active_jwks.sort();
1434
1435 assert_eq!(sys_jwks, active_jwks);
1436 }
1437 }
1438
1439 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1440 if elapsed > 0.0 {
1441 self.metrics
1442 .execution_gas_latency_ratio
1443 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1444 };
1445 Ok((effects, execution_error_opt))
1446 }
1447
1448 #[instrument(level = "trace", skip_all)]
1449 async fn commit_certificate(
1450 &self,
1451 certificate: &VerifiedExecutableTransaction,
1452 inner_temporary_store: InnerTemporaryStore,
1453 effects: &TransactionEffects,
1454 tx_guard: CertTxGuard,
1455 _execution_guard: ExecutionLockReadGuard<'_>,
1456 epoch_store: &Arc<AuthorityPerEpochStore>,
1457 ) -> IotaResult {
1458 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1459 monitored_scope("Execution::commit_certificate");
1460 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1461
1462 let tx_key = certificate.key();
1463 let tx_digest = certificate.digest();
1464 let input_object_count = inner_temporary_store.input_objects.len();
1465 let shared_object_count = effects.input_shared_objects().len();
1466
1467 let output_keys = inner_temporary_store.get_output_keys(effects);
1468
1469 let _ = self
1471 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1472 .await
1473 .tap_err(|e| {
1474 self.metrics.post_processing_total_failures.inc();
1475 error!(?tx_digest, "tx post processing failed: {e}");
1476 });
1477
1478 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1482
1483 fail_point_async!("crash");
1485
1486 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1487 certificate.clone().into_unsigned(),
1488 effects.clone(),
1489 inner_temporary_store,
1490 );
1491 self.get_cache_writer()
1492 .write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())
1493 .await?;
1494
1495 if certificate.transaction_data().is_end_of_epoch_tx() {
1496 self.get_object_cache_reader()
1499 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1500 }
1501
1502 tx_guard.commit_tx();
1504
1505 self.transaction_manager
1509 .notify_commit(tx_digest, output_keys, epoch_store);
1510
1511 self.update_metrics(certificate, input_object_count, shared_object_count);
1512
1513 Ok(())
1514 }
1515
1516 fn update_metrics(
1517 &self,
1518 certificate: &VerifiedExecutableTransaction,
1519 input_object_count: usize,
1520 shared_object_count: usize,
1521 ) {
1522 if certificate.has_zklogin_sig() {
1524 self.metrics.zklogin_sig_count.inc();
1525 } else if certificate.has_upgraded_multisig() {
1526 self.metrics.multisig_sig_count.inc();
1527 }
1528
1529 self.metrics.total_effects.inc();
1530 self.metrics.total_certs.inc();
1531
1532 if shared_object_count > 0 {
1533 self.metrics.shared_obj_tx.inc();
1534 }
1535
1536 if certificate.is_sponsored_tx() {
1537 self.metrics.sponsored_tx.inc();
1538 }
1539
1540 self.metrics
1541 .num_input_objs
1542 .observe(input_object_count as f64);
1543 self.metrics
1544 .num_shared_objects
1545 .observe(shared_object_count as f64);
1546 self.metrics.batch_size.observe(
1547 certificate
1548 .data()
1549 .intent_message()
1550 .value
1551 .kind()
1552 .num_commands() as f64,
1553 );
1554 }
1555
1556 #[instrument(level = "trace", skip_all)]
1568 fn prepare_certificate(
1569 &self,
1570 _execution_guard: &ExecutionLockReadGuard<'_>,
1571 certificate: &VerifiedExecutableTransaction,
1572 input_objects: InputObjects,
1573 epoch_store: &Arc<AuthorityPerEpochStore>,
1574 ) -> IotaResult<(
1575 InnerTemporaryStore,
1576 TransactionEffects,
1577 Option<ExecutionError>,
1578 )> {
1579 let _scope = monitored_scope("Execution::prepare_certificate");
1580 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1581 let prepare_certificate_start_time = tokio::time::Instant::now();
1582
1583 let tx_data = certificate.data().transaction_data();
1586 tx_data.validity_check(epoch_store.protocol_config())?;
1587
1588 let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1591 certificate,
1592 input_objects,
1593 epoch_store.protocol_config(),
1594 epoch_store.reference_gas_price(),
1595 )?;
1596
1597 let owned_object_refs = input_objects.inner().filter_owned_objects();
1598 self.check_owned_locks(&owned_object_refs)?;
1599 let tx_digest = *certificate.digest();
1600 let protocol_config = epoch_store.protocol_config();
1601 let transaction_data = &certificate.data().intent_message().value;
1602 let (kind, signer, gas) = transaction_data.execution_parts();
1603
1604 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1605 let (inner_temp_store, _, mut effects, execution_error_opt) =
1606 epoch_store.executor().execute_transaction_to_effects(
1607 self.get_backing_store().as_ref(),
1608 protocol_config,
1609 self.metrics.limits_metrics.clone(),
1610 self.config
1613 .expensive_safety_check_config
1614 .enable_deep_per_tx_iota_conservation_check(),
1615 self.config.certificate_deny_config.certificate_deny_set(),
1616 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1617 epoch_store
1618 .epoch_start_config()
1619 .epoch_data()
1620 .epoch_start_timestamp(),
1621 input_objects,
1622 gas,
1623 gas_status,
1624 kind,
1625 signer,
1626 tx_digest,
1627 &mut None,
1628 );
1629
1630 fail_point_if!("cp_execution_nondeterminism", || {
1631 #[cfg(msim)]
1632 self.create_fail_state(certificate, epoch_store, &mut effects);
1633 });
1634
1635 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1636 if elapsed > 0.0 {
1637 self.metrics
1638 .prepare_cert_gas_latency_ratio
1639 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1640 }
1641
1642 Ok((inner_temp_store, effects, execution_error_opt.err()))
1643 }
1644
1645 pub fn prepare_certificate_for_benchmark(
1646 &self,
1647 certificate: &VerifiedExecutableTransaction,
1648 input_objects: InputObjects,
1649 epoch_store: &Arc<AuthorityPerEpochStore>,
1650 ) -> IotaResult<(
1651 InnerTemporaryStore,
1652 TransactionEffects,
1653 Option<ExecutionError>,
1654 )> {
1655 let lock: RwLock<EpochId> = RwLock::new(epoch_store.epoch());
1656 let execution_guard = lock.try_read().unwrap();
1657
1658 self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1659 }
1660
1661 pub async fn dry_exec_transaction(
1662 &self,
1663 transaction: TransactionData,
1664 transaction_digest: TransactionDigest,
1665 ) -> IotaResult<(
1666 DryRunTransactionBlockResponse,
1667 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1668 TransactionEffects,
1669 Option<ObjectID>,
1670 )> {
1671 let epoch_store = self.load_epoch_store_one_call_per_task();
1672 if !self.is_fullnode(&epoch_store) {
1673 return Err(IotaError::UnsupportedFeature {
1674 error: "dry-exec is only supported on fullnodes".to_string(),
1675 });
1676 }
1677
1678 if transaction.kind().is_system_tx() {
1679 return Err(IotaError::UnsupportedFeature {
1680 error: "dry-exec does not support system transactions".to_string(),
1681 });
1682 }
1683
1684 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1685 .await
1686 }
1687
1688 pub async fn dry_exec_transaction_for_benchmark(
1689 &self,
1690 transaction: TransactionData,
1691 transaction_digest: TransactionDigest,
1692 ) -> IotaResult<(
1693 DryRunTransactionBlockResponse,
1694 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1695 TransactionEffects,
1696 Option<ObjectID>,
1697 )> {
1698 let epoch_store = self.load_epoch_store_one_call_per_task();
1699 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1700 .await
1701 }
1702
1703 async fn dry_exec_transaction_impl(
1704 &self,
1705 epoch_store: &AuthorityPerEpochStore,
1706 transaction: TransactionData,
1707 transaction_digest: TransactionDigest,
1708 ) -> IotaResult<(
1709 DryRunTransactionBlockResponse,
1710 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1711 TransactionEffects,
1712 Option<ObjectID>,
1713 )> {
1714 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1716
1717 let input_object_kinds = transaction.input_objects()?;
1718 let receiving_object_refs = transaction.receiving_objects();
1719
1720 iota_transaction_checks::deny::check_transaction_for_signing(
1721 &transaction,
1722 &[],
1723 &input_object_kinds,
1724 &receiving_object_refs,
1725 &self.config.transaction_deny_config,
1726 self.get_backing_package_store().as_ref(),
1727 )?;
1728
1729 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1730 None,
1732 &input_object_kinds,
1733 &receiving_object_refs,
1734 epoch_store.epoch(),
1735 )?;
1736
1737 let mut gas_object_refs = transaction.gas().to_vec();
1739 let reference_gas_price = epoch_store.reference_gas_price();
1740 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1741 let sender = transaction.sender();
1742 const NANOS_TO_IOTA: u64 = 1_000_000_000;
1744 const DRY_RUN_IOTA: u64 = 1_000_000_000;
1745 let max_coin_value = NANOS_TO_IOTA * DRY_RUN_IOTA;
1746 let gas_object_id = ObjectID::random();
1747 let gas_object = Object::new_move(
1748 MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
1749 Owner::AddressOwner(sender),
1750 TransactionDigest::genesis_marker(),
1751 );
1752 let gas_object_ref = gas_object.compute_object_reference();
1753 gas_object_refs = vec![gas_object_ref];
1754 (
1755 iota_transaction_checks::check_transaction_input_with_given_gas(
1756 epoch_store.protocol_config(),
1757 reference_gas_price,
1758 &transaction,
1759 input_objects,
1760 receiving_objects,
1761 gas_object,
1762 &self.metrics.bytecode_verifier_metrics,
1763 &self.config.verifier_signing_config,
1764 )?,
1765 Some(gas_object_id),
1766 )
1767 } else {
1768 (
1769 iota_transaction_checks::check_transaction_input(
1770 epoch_store.protocol_config(),
1771 reference_gas_price,
1772 &transaction,
1773 input_objects,
1774 &receiving_objects,
1775 &self.metrics.bytecode_verifier_metrics,
1776 &self.config.verifier_signing_config,
1777 )?,
1778 None,
1779 )
1780 };
1781
1782 let protocol_config = epoch_store.protocol_config();
1783 let (kind, signer, _) = transaction.execution_parts();
1784
1785 let silent = true;
1786 let executor = iota_execution::executor(protocol_config, silent, None)
1787 .expect("Creating an executor should not fail here");
1788
1789 let expensive_checks = false;
1790 let (inner_temp_store, _, effects, _execution_error) = executor
1791 .execute_transaction_to_effects(
1792 self.get_backing_store().as_ref(),
1793 protocol_config,
1794 self.metrics.limits_metrics.clone(),
1795 expensive_checks,
1796 self.config.certificate_deny_config.certificate_deny_set(),
1797 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1798 epoch_store
1799 .epoch_start_config()
1800 .epoch_data()
1801 .epoch_start_timestamp(),
1802 checked_input_objects,
1803 gas_object_refs,
1804 gas_status,
1805 kind,
1806 signer,
1807 transaction_digest,
1808 &mut None,
1809 );
1810 let tx_digest = *effects.transaction_digest();
1811
1812 let module_cache =
1813 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1814
1815 let mut layout_resolver =
1816 epoch_store
1817 .executor()
1818 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1819 &inner_temp_store,
1820 self.get_backing_package_store(),
1821 )));
1822 let object_changes = Vec::new();
1824
1825 let balance_changes = Vec::new();
1827
1828 let written_with_kind = effects
1829 .created()
1830 .into_iter()
1831 .map(|(oref, _)| (oref, WriteKind::Create))
1832 .chain(
1833 effects
1834 .unwrapped()
1835 .into_iter()
1836 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1837 )
1838 .chain(
1839 effects
1840 .mutated()
1841 .into_iter()
1842 .map(|(oref, _)| (oref, WriteKind::Mutate)),
1843 )
1844 .map(|(oref, kind)| {
1845 let obj = inner_temp_store.written.get(&oref.0).unwrap();
1846 (oref.0, (oref, obj.clone(), kind))
1848 })
1849 .collect();
1850
1851 Ok((
1852 DryRunTransactionBlockResponse {
1853 input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1854 .map_err(|e| IotaError::TransactionSerialization {
1855 error: format!(
1856 "Failed to convert transaction to IotaTransactionBlockData: {e}",
1857 ),
1858 })?, effects: effects.clone().try_into()?,
1860 events: IotaTransactionBlockEvents::try_from(
1861 inner_temp_store.events.clone(),
1862 tx_digest,
1863 None,
1864 layout_resolver.as_mut(),
1865 )?,
1866 object_changes,
1867 balance_changes,
1868 },
1869 written_with_kind,
1870 effects,
1871 mock_gas,
1872 ))
1873 }
1874
1875 pub fn simulate_transaction(
1876 &self,
1877 transaction: TransactionData,
1878 ) -> IotaResult<SimulateTransactionResult> {
1879 if transaction.kind().is_system_tx() {
1880 return Err(IotaError::UnsupportedFeature {
1881 error: "simulate does not support system transactions".to_string(),
1882 });
1883 }
1884
1885 let epoch_store = self.load_epoch_store_one_call_per_task();
1886 if !self.is_fullnode(&epoch_store) {
1887 return Err(IotaError::UnsupportedFeature {
1888 error: "simulate is only supported on fullnodes".to_string(),
1889 });
1890 }
1891
1892 self.simulate_transaction_impl(&epoch_store, transaction)
1893 }
1894
1895 fn simulate_transaction_impl(
1896 &self,
1897 epoch_store: &AuthorityPerEpochStore,
1898 transaction: TransactionData,
1899 ) -> IotaResult<SimulateTransactionResult> {
1900 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1902
1903 let input_object_kinds = transaction.input_objects()?;
1904 let receiving_object_refs = transaction.receiving_objects();
1905
1906 iota_transaction_checks::deny::check_transaction_for_signing(
1907 &transaction,
1908 &[],
1909 &input_object_kinds,
1910 &receiving_object_refs,
1911 &self.config.transaction_deny_config,
1912 self.get_backing_package_store().as_ref(),
1913 )?;
1914
1915 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1916 None,
1918 &input_object_kinds,
1919 &receiving_object_refs,
1920 epoch_store.epoch(),
1921 )?;
1922
1923 let mut gas_object_refs = transaction.gas().to_vec();
1925 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1926 let sender = transaction.sender();
1927 const NANOS_TO_IOTA: u64 = 1_000_000_000;
1929 const DRY_RUN_IOTA: u64 = 1_000_000_000;
1930 let max_coin_value = NANOS_TO_IOTA * DRY_RUN_IOTA;
1931 let gas_object_id = ObjectID::MAX;
1932 let gas_object = Object::new_move(
1933 MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
1934 Owner::AddressOwner(sender),
1935 TransactionDigest::genesis_marker(),
1936 );
1937 let gas_object_ref = gas_object.compute_object_reference();
1938 gas_object_refs = vec![gas_object_ref];
1939 (
1940 iota_transaction_checks::check_transaction_input_with_given_gas(
1941 epoch_store.protocol_config(),
1942 epoch_store.reference_gas_price(),
1943 &transaction,
1944 input_objects,
1945 receiving_objects,
1946 gas_object,
1947 &self.metrics.bytecode_verifier_metrics,
1948 &self.config.verifier_signing_config,
1949 )?,
1950 Some(gas_object_id),
1951 )
1952 } else {
1953 (
1954 iota_transaction_checks::check_transaction_input(
1955 epoch_store.protocol_config(),
1956 epoch_store.reference_gas_price(),
1957 &transaction,
1958 input_objects,
1959 &receiving_objects,
1960 &self.metrics.bytecode_verifier_metrics,
1961 &self.config.verifier_signing_config,
1962 )?,
1963 None,
1964 )
1965 };
1966
1967 let protocol_config = epoch_store.protocol_config();
1968 let (kind, signer, _) = transaction.execution_parts();
1969
1970 let silent = true;
1971 let executor = iota_execution::executor(protocol_config, silent, None)
1972 .expect("Creating an executor should not fail here");
1973
1974 let expensive_checks = false;
1975 let (inner_temp_store, _, effects, _execution_error) = executor
1976 .execute_transaction_to_effects(
1977 self.get_backing_store().as_ref(),
1978 protocol_config,
1979 self.metrics.limits_metrics.clone(),
1980 expensive_checks,
1981 self.config.certificate_deny_config.certificate_deny_set(),
1982 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1983 epoch_store
1984 .epoch_start_config()
1985 .epoch_data()
1986 .epoch_start_timestamp(),
1987 checked_input_objects,
1988 gas_object_refs,
1989 gas_status,
1990 kind,
1991 signer,
1992 transaction.digest(),
1993 &mut None,
1994 );
1995
1996 Ok(SimulateTransactionResult {
1997 input_objects: inner_temp_store.input_objects,
1998 output_objects: inner_temp_store.written,
1999 events: effects.events_digest().map(|_| inner_temp_store.events),
2000 effects,
2001 mock_gas_id: mock_gas,
2002 })
2003 }
2004
2005 pub async fn dev_inspect_transaction_block(
2007 &self,
2008 sender: IotaAddress,
2009 transaction_kind: TransactionKind,
2010 gas_price: Option<u64>,
2011 gas_budget: Option<u64>,
2012 gas_sponsor: Option<IotaAddress>,
2013 gas_objects: Option<Vec<ObjectRef>>,
2014 show_raw_txn_data_and_effects: Option<bool>,
2015 skip_checks: Option<bool>,
2016 ) -> IotaResult<DevInspectResults> {
2017 let epoch_store = self.load_epoch_store_one_call_per_task();
2018
2019 if !self.is_fullnode(&epoch_store) {
2020 return Err(IotaError::UnsupportedFeature {
2021 error: "dev-inspect is only supported on fullnodes".to_string(),
2022 });
2023 }
2024
2025 if transaction_kind.is_system_tx() {
2026 return Err(IotaError::UnsupportedFeature {
2027 error: "system transactions are not supported".to_string(),
2028 });
2029 }
2030
2031 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
2032 let skip_checks = skip_checks.unwrap_or(true);
2033 let reference_gas_price = epoch_store.reference_gas_price();
2034 let protocol_config = epoch_store.protocol_config();
2035 let max_tx_gas = protocol_config.max_tx_gas();
2036
2037 let price = gas_price.unwrap_or(reference_gas_price);
2038 let budget = gas_budget.unwrap_or(max_tx_gas);
2039 let owner = gas_sponsor.unwrap_or(sender);
2040 let payment = gas_objects.unwrap_or_default();
2043 let transaction = TransactionData::V1(TransactionDataV1 {
2044 kind: transaction_kind.clone(),
2045 sender,
2046 gas_data: GasData {
2047 payment,
2048 owner,
2049 price,
2050 budget,
2051 },
2052 expiration: TransactionExpiration::None,
2053 });
2054
2055 let raw_txn_data = if show_raw_txn_data_and_effects {
2056 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
2057 error: "Failed to serialize transaction during dev inspect".to_string(),
2058 })?
2059 } else {
2060 vec![]
2061 };
2062
2063 transaction.validity_check_no_gas_check(protocol_config)?;
2064
2065 let input_object_kinds = transaction.input_objects()?;
2066 let receiving_object_refs = transaction.receiving_objects();
2067
2068 iota_transaction_checks::deny::check_transaction_for_signing(
2069 &transaction,
2070 &[],
2071 &input_object_kinds,
2072 &receiving_object_refs,
2073 &self.config.transaction_deny_config,
2074 self.get_backing_package_store().as_ref(),
2075 )?;
2076
2077 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
2078 None,
2080 &input_object_kinds,
2081 &receiving_object_refs,
2082 epoch_store.epoch(),
2083 )?;
2084
2085 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
2087 DEV_INSPECT_GAS_COIN_VALUE,
2088 transaction.gas_owner(),
2089 );
2090
2091 let gas_objects = if transaction.gas().is_empty() {
2092 let gas_object_ref = dummy_gas_object.compute_object_reference();
2093 vec![gas_object_ref]
2094 } else {
2095 transaction.gas().to_vec()
2096 };
2097
2098 let (gas_status, checked_input_objects) = if skip_checks {
2099 if transaction.gas().is_empty() {
2104 input_objects.push(ObjectReadResult::new(
2105 InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
2106 dummy_gas_object.into(),
2107 ));
2108 }
2109 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
2110 protocol_config,
2111 &transaction_kind,
2112 input_objects,
2113 receiving_objects,
2114 )?;
2115 let gas_status = IotaGasStatus::new(
2116 max_tx_gas,
2117 transaction.gas_price(),
2118 reference_gas_price,
2119 protocol_config,
2120 )?;
2121
2122 (gas_status, checked_input_objects)
2123 } else {
2124 if transaction.gas().is_empty() {
2128 iota_transaction_checks::check_transaction_input_with_given_gas(
2129 epoch_store.protocol_config(),
2130 reference_gas_price,
2131 &transaction,
2132 input_objects,
2133 receiving_objects,
2134 dummy_gas_object,
2135 &self.metrics.bytecode_verifier_metrics,
2136 &self.config.verifier_signing_config,
2137 )?
2138 } else {
2139 iota_transaction_checks::check_transaction_input(
2140 epoch_store.protocol_config(),
2141 reference_gas_price,
2142 &transaction,
2143 input_objects,
2144 &receiving_objects,
2145 &self.metrics.bytecode_verifier_metrics,
2146 &self.config.verifier_signing_config,
2147 )?
2148 }
2149 };
2150
2151 let executor = iota_execution::executor(protocol_config, true, None)
2152 .expect("Creating an executor should not fail here");
2153 let intent_msg = IntentMessage::new(
2154 Intent {
2155 version: IntentVersion::V0,
2156 scope: IntentScope::TransactionData,
2157 app_id: AppId::Iota,
2158 },
2159 transaction,
2160 );
2161 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2162 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2163 self.get_backing_store().as_ref(),
2164 protocol_config,
2165 self.metrics.limits_metrics.clone(),
2166 false,
2168 self.config.certificate_deny_config.certificate_deny_set(),
2169 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2170 epoch_store
2171 .epoch_start_config()
2172 .epoch_data()
2173 .epoch_start_timestamp(),
2174 checked_input_objects,
2175 gas_objects,
2176 gas_status,
2177 transaction_kind,
2178 sender,
2179 transaction_digest,
2180 skip_checks,
2181 );
2182
2183 let raw_effects = if show_raw_txn_data_and_effects {
2184 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2185 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2186 })?
2187 } else {
2188 vec![]
2189 };
2190
2191 let mut layout_resolver =
2192 epoch_store
2193 .executor()
2194 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2195 &inner_temp_store,
2196 self.get_backing_package_store(),
2197 )));
2198
2199 DevInspectResults::new(
2200 effects,
2201 inner_temp_store.events.clone(),
2202 execution_result,
2203 raw_txn_data,
2204 raw_effects,
2205 layout_resolver.as_mut(),
2206 )
2207 }
2208
2209 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2211 let epoch_store = self.epoch_store_for_testing();
2212 Ok(epoch_store.reference_gas_price())
2213 }
2214
2215 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2216 self.get_transaction_cache_reader()
2217 .is_tx_already_executed(digest)
2218 }
2219
2220 #[instrument(level = "debug", skip_all, err)]
2222 async fn index_tx(
2223 &self,
2224 indexes: &IndexStore,
2225 digest: &TransactionDigest,
2226 cert: &VerifiedExecutableTransaction,
2228 effects: &TransactionEffects,
2229 events: &TransactionEvents,
2230 timestamp_ms: u64,
2231 tx_coins: Option<TxCoins>,
2232 written: &WrittenObjects,
2233 inner_temporary_store: &InnerTemporaryStore,
2234 ) -> IotaResult<u64> {
2235 let changes = self
2236 .process_object_index(effects, written, inner_temporary_store)
2237 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2238
2239 indexes
2240 .index_tx(
2241 cert.data().intent_message().value.sender(),
2242 cert.data()
2243 .intent_message()
2244 .value
2245 .input_objects()?
2246 .iter()
2247 .map(|o| o.object_id()),
2248 effects
2249 .all_changed_objects()
2250 .into_iter()
2251 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2252 cert.data()
2253 .intent_message()
2254 .value
2255 .move_calls()
2256 .into_iter()
2257 .map(|(package, module, function)| {
2258 (*package, module.to_owned(), function.to_owned())
2259 }),
2260 events,
2261 changes,
2262 digest,
2263 timestamp_ms,
2264 tx_coins,
2265 )
2266 .await
2267 }
2268
2269 #[cfg(msim)]
2270 fn create_fail_state(
2271 &self,
2272 certificate: &VerifiedExecutableTransaction,
2273 epoch_store: &Arc<AuthorityPerEpochStore>,
2274 effects: &mut TransactionEffects,
2275 ) {
2276 use std::cell::RefCell;
2277 thread_local! {
2278 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2279 }
2280 if !certificate.data().intent_message().value.is_system_tx() {
2281 let committee = epoch_store.committee();
2282 let cur_stake = (**committee).weight(&self.name);
2283 if cur_stake > 0 {
2284 FAIL_STATE.with_borrow_mut(|fail_state| {
2285 if fail_state.0 < committee.validity_threshold() {
2287 fail_state.0 += cur_stake;
2288 fail_state.1.insert(self.name);
2289 }
2290
2291 if fail_state.1.contains(&self.name) {
2292 info!("cp_exec failing tx");
2293 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2294 }
2295 });
2296 }
2297 }
2298 }
2299
2300 fn process_object_index(
2301 &self,
2302 effects: &TransactionEffects,
2303 written: &WrittenObjects,
2304 inner_temporary_store: &InnerTemporaryStore,
2305 ) -> IotaResult<ObjectIndexChanges> {
2306 let epoch_store = self.load_epoch_store_one_call_per_task();
2307 let mut layout_resolver =
2308 epoch_store
2309 .executor()
2310 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2311 inner_temporary_store,
2312 self.get_backing_package_store(),
2313 )));
2314
2315 let modified_at_version = effects
2316 .modified_at_versions()
2317 .into_iter()
2318 .collect::<HashMap<_, _>>();
2319
2320 let tx_digest = effects.transaction_digest();
2321 let mut deleted_owners = vec![];
2322 let mut deleted_dynamic_fields = vec![];
2323 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2324 let old_version = modified_at_version.get(&id).unwrap();
2325 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2328 |e| panic!("tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}. Err: {e:?}"),
2329 ) {
2330 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2331 Owner::ObjectOwner(object_id) => {
2332 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2333 }
2334 _ => {}
2335 }
2336 }
2337
2338 let mut new_owners = vec![];
2339 let mut new_dynamic_fields = vec![];
2340
2341 for (oref, owner, kind) in effects.all_changed_objects() {
2342 let id = &oref.0;
2343 if let WriteKind::Mutate = kind {
2346 let Some(old_version) = modified_at_version.get(id) else {
2347 panic!(
2348 "tx_digest={tx_digest:?}, error processing object owner index, cannot find modified at version for mutated object [{id}]."
2349 );
2350 };
2351 let Some(old_object) = self
2354 .get_object_store()
2355 .get_object_by_key(id, *old_version)?
2356 else {
2357 panic!(
2358 "tx_digest={tx_digest:?}, error processing object owner index, cannot find owner for object {id:?} at version {old_version:?}"
2359 );
2360 };
2361 if old_object.owner != owner {
2362 match old_object.owner {
2363 Owner::AddressOwner(addr) => {
2364 deleted_owners.push((addr, *id));
2365 }
2366 Owner::ObjectOwner(object_id) => {
2367 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2368 }
2369 _ => {}
2370 }
2371 }
2372 }
2373
2374 match owner {
2375 Owner::AddressOwner(addr) => {
2376 let new_object = written.get(id).unwrap_or_else(
2379 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2380 );
2381 assert_eq!(
2382 new_object.version(),
2383 oref.1,
2384 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2385 tx_digest,
2386 id,
2387 new_object.version(),
2388 oref.1
2389 );
2390
2391 let type_ = new_object
2392 .type_()
2393 .map(|type_| ObjectType::Struct(type_.clone()))
2394 .unwrap_or(ObjectType::Package);
2395
2396 new_owners.push((
2397 (addr, *id),
2398 ObjectInfo {
2399 object_id: *id,
2400 version: oref.1,
2401 digest: oref.2,
2402 type_,
2403 owner,
2404 previous_transaction: *effects.transaction_digest(),
2405 },
2406 ));
2407 }
2408 Owner::ObjectOwner(owner) => {
2409 let new_object = written.get(id).unwrap_or_else(
2410 || panic!("tx_digest={tx_digest:?}, error processing object owner index, written does not contain object {id:?}")
2411 );
2412 assert_eq!(
2413 new_object.version(),
2414 oref.1,
2415 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2416 tx_digest,
2417 id,
2418 new_object.version(),
2419 oref.1
2420 );
2421
2422 let Some(df_info) = self
2423 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut(), false)
2424 .unwrap_or_else(|e| {
2425 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2426 None
2427 }
2428 )
2429 else {
2430 continue;
2432 };
2433 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2434 }
2435 _ => {}
2436 }
2437 }
2438
2439 Ok(ObjectIndexChanges {
2440 deleted_owners,
2441 deleted_dynamic_fields,
2442 new_owners,
2443 new_dynamic_fields,
2444 })
2445 }
2446
2447 fn try_create_dynamic_field_info(
2448 &self,
2449 o: &Object,
2450 written: &WrittenObjects,
2451 resolver: &mut dyn LayoutResolver,
2452 get_latest_object_version: bool,
2453 ) -> IotaResult<Option<DynamicFieldInfo>> {
2454 let Some(move_object) = o.data.try_as_move().cloned() else {
2456 return Ok(None);
2457 };
2458
2459 if !move_object.type_().is_dynamic_field() {
2461 return Ok(None);
2462 }
2463
2464 let layout = resolver
2465 .get_annotated_layout(&move_object.type_().clone().into())?
2466 .into_layout();
2467
2468 let field =
2469 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2470 IotaError::ObjectDeserialization {
2471 error: e.to_string(),
2472 }
2473 })?;
2474
2475 let type_ = field.kind;
2476 let name_type: TypeTag = field.name_layout.into();
2477 let bcs_name = field.name_bytes.to_owned();
2478
2479 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2480 .map_err(|e| {
2481 warn!("{e}");
2482 IotaError::ObjectDeserialization {
2483 error: e.to_string(),
2484 }
2485 })?;
2486
2487 let name = DynamicFieldName {
2488 type_: name_type,
2489 value: IotaMoveValue::from(name_value).to_json_value(),
2490 };
2491
2492 let value_metadata = field.value_metadata().map_err(|e| {
2493 warn!("{e}");
2494 IotaError::ObjectDeserialization {
2495 error: e.to_string(),
2496 }
2497 })?;
2498
2499 Ok(Some(match value_metadata {
2500 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2501 name,
2502 bcs_name,
2503 type_,
2504 object_type: object_type.to_canonical_string(true),
2505 object_id: o.id(),
2506 version: o.version(),
2507 digest: o.digest(),
2508 },
2509
2510 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2511 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2516 (
2517 object.version(),
2518 object.digest(),
2519 object.data.type_().unwrap().clone(),
2520 )
2521 } else {
2522 let object = if get_latest_object_version {
2524 self.get_object_store()
2531 .get_object(&object_id)?
2532 .ok_or_else(|| UserInputError::ObjectNotFound {
2533 object_id,
2534 version: Some(o.version()),
2535 })?
2536 } else {
2537 self.get_object_store()
2539 .get_object_by_key(&object_id, o.version())?
2540 .ok_or_else(|| UserInputError::ObjectNotFound {
2541 object_id,
2542 version: Some(o.version()),
2543 })?
2544 };
2545
2546 (
2547 object.version(),
2548 object.digest(),
2549 object.data.type_().unwrap().clone(),
2550 )
2551 };
2552
2553 DynamicFieldInfo {
2554 name,
2555 bcs_name,
2556 type_,
2557 object_type: object_type.to_string(),
2558 object_id,
2559 version,
2560 digest,
2561 }
2562 }
2563 }))
2564 }
2565
2566 #[instrument(level = "trace", skip_all, err)]
2567 async fn post_process_one_tx(
2568 &self,
2569 certificate: &VerifiedExecutableTransaction,
2570 effects: &TransactionEffects,
2571 inner_temporary_store: &InnerTemporaryStore,
2572 epoch_store: &Arc<AuthorityPerEpochStore>,
2573 ) -> IotaResult {
2574 if self.indexes.is_none() {
2575 return Ok(());
2576 }
2577
2578 let tx_digest = certificate.digest();
2579 let timestamp_ms = Self::unixtime_now_ms();
2580 let events = &inner_temporary_store.events;
2581 let written = &inner_temporary_store.written;
2582 let tx_coins =
2583 self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2584
2585 if let Some(indexes) = &self.indexes {
2587 let _ = self
2588 .index_tx(
2589 indexes.as_ref(),
2590 tx_digest,
2591 certificate,
2592 effects,
2593 events,
2594 timestamp_ms,
2595 tx_coins,
2596 written,
2597 inner_temporary_store,
2598 )
2599 .await
2600 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2601 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2602 .expect("Indexing tx should not fail");
2603
2604 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2605 let events = self.make_transaction_block_events(
2606 events.clone(),
2607 *tx_digest,
2608 timestamp_ms,
2609 epoch_store,
2610 inner_temporary_store,
2611 )?;
2612 self.subscription_handler
2614 .process_tx(certificate.data().transaction_data(), &effects, &events)
2615 .tap_ok(|_| {
2616 self.metrics
2617 .post_processing_total_tx_had_event_processed
2618 .inc()
2619 })
2620 .tap_err(|e| {
2621 warn!(
2622 ?tx_digest,
2623 "Post processing - Couldn't process events for tx: {}", e
2624 )
2625 })?;
2626
2627 self.metrics
2628 .post_processing_total_events_emitted
2629 .inc_by(events.data.len() as u64);
2630 };
2631 Ok(())
2632 }
2633
2634 fn make_transaction_block_events(
2635 &self,
2636 transaction_events: TransactionEvents,
2637 digest: TransactionDigest,
2638 timestamp_ms: u64,
2639 epoch_store: &Arc<AuthorityPerEpochStore>,
2640 inner_temporary_store: &InnerTemporaryStore,
2641 ) -> IotaResult<IotaTransactionBlockEvents> {
2642 let mut layout_resolver =
2643 epoch_store
2644 .executor()
2645 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2646 inner_temporary_store,
2647 self.get_backing_package_store(),
2648 )));
2649 IotaTransactionBlockEvents::try_from(
2650 transaction_events,
2651 digest,
2652 Some(timestamp_ms),
2653 layout_resolver.as_mut(),
2654 )
2655 }
2656
2657 pub fn unixtime_now_ms() -> u64 {
2658 let ts_ms = Utc::now().timestamp_millis();
2659 u64::try_from(ts_ms).expect("Travelling in time machine")
2660 }
2661
2662 #[instrument(level = "trace", skip_all)]
2663 pub async fn handle_transaction_info_request(
2664 &self,
2665 request: TransactionInfoRequest,
2666 ) -> IotaResult<TransactionInfoResponse> {
2667 let epoch_store = self.load_epoch_store_one_call_per_task();
2668 let (transaction, status) = self
2669 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2670 .ok_or(IotaError::TransactionNotFound {
2671 digest: request.transaction_digest,
2672 })?;
2673 Ok(TransactionInfoResponse {
2674 transaction,
2675 status,
2676 })
2677 }
2678
2679 #[instrument(level = "trace", skip_all)]
2680 pub async fn handle_object_info_request(
2681 &self,
2682 request: ObjectInfoRequest,
2683 ) -> IotaResult<ObjectInfoResponse> {
2684 let epoch_store = self.load_epoch_store_one_call_per_task();
2685
2686 let requested_object_seq = match request.request_kind {
2687 ObjectInfoRequestKind::LatestObjectInfo => {
2688 let (_, seq, _) = self
2689 .get_object_or_tombstone(request.object_id)
2690 .await?
2691 .ok_or_else(|| {
2692 IotaError::from(UserInputError::ObjectNotFound {
2693 object_id: request.object_id,
2694 version: None,
2695 })
2696 })?;
2697 seq
2698 }
2699 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2700 };
2701
2702 let object = self
2703 .get_object_store()
2704 .get_object_by_key(&request.object_id, requested_object_seq)?
2705 .ok_or_else(|| {
2706 IotaError::from(UserInputError::ObjectNotFound {
2707 object_id: request.object_id,
2708 version: Some(requested_object_seq),
2709 })
2710 })?;
2711
2712 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2713 (request.generate_layout, object.data.try_as_move())
2714 {
2715 Some(into_struct_layout(
2716 self.load_epoch_store_one_call_per_task()
2717 .executor()
2718 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2719 .get_annotated_layout(&move_obj.type_().clone().into())?,
2720 )?)
2721 } else {
2722 None
2723 };
2724
2725 let lock = if !object.is_address_owned() {
2726 None
2728 } else {
2729 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2730 .await?
2731 .map(|s| s.into_inner())
2732 };
2733
2734 Ok(ObjectInfoResponse {
2735 object,
2736 layout,
2737 lock_for_debugging: lock,
2738 })
2739 }
2740
2741 #[instrument(level = "trace", skip_all)]
2742 pub fn handle_checkpoint_request(
2743 &self,
2744 request: &CheckpointRequest,
2745 ) -> IotaResult<CheckpointResponse> {
2746 let summary = if request.certified {
2747 let summary = match request.sequence_number {
2748 Some(seq) => self
2749 .checkpoint_store
2750 .get_checkpoint_by_sequence_number(seq)?,
2751 None => self.checkpoint_store.get_latest_certified_checkpoint(),
2752 }
2753 .map(|v| v.into_inner());
2754 summary.map(CheckpointSummaryResponse::Certified)
2755 } else {
2756 let summary = match request.sequence_number {
2757 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2758 None => self
2759 .checkpoint_store
2760 .get_latest_locally_computed_checkpoint(),
2761 };
2762 summary.map(CheckpointSummaryResponse::Pending)
2763 };
2764 let contents = match &summary {
2765 Some(s) => self
2766 .checkpoint_store
2767 .get_checkpoint_contents(&s.content_digest())?,
2768 None => None,
2769 };
2770 Ok(CheckpointResponse {
2771 checkpoint: summary,
2772 contents,
2773 })
2774 }
2775
2776 fn check_protocol_version(
2777 supported_protocol_versions: SupportedProtocolVersions,
2778 current_version: ProtocolVersion,
2779 ) {
2780 info!("current protocol version is now {:?}", current_version);
2781 info!("supported versions are: {:?}", supported_protocol_versions);
2782 if !supported_protocol_versions.is_version_supported(current_version) {
2783 let msg = format!(
2784 "Unsupported protocol version. The network is at {current_version:?}, but this IotaNode only supports: {supported_protocol_versions:?}. Shutting down.",
2785 );
2786
2787 error!("{}", msg);
2788 eprintln!("{msg}");
2789
2790 #[cfg(not(msim))]
2791 std::process::exit(1);
2792
2793 #[cfg(msim)]
2794 iota_simulator::task::shutdown_current_node();
2795 }
2796 }
2797
2798 #[expect(clippy::disallowed_methods)] pub async fn new(
2800 name: AuthorityName,
2801 secret: StableSyncAuthoritySigner,
2802 supported_protocol_versions: SupportedProtocolVersions,
2803 store: Arc<AuthorityStore>,
2804 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2805 epoch_store: Arc<AuthorityPerEpochStore>,
2806 committee_store: Arc<CommitteeStore>,
2807 indexes: Option<Arc<IndexStore>>,
2808 rest_index: Option<Arc<RestIndexStore>>,
2809 checkpoint_store: Arc<CheckpointStore>,
2810 prometheus_registry: &Registry,
2811 genesis_objects: &[Object],
2812 db_checkpoint_config: &DBCheckpointConfig,
2813 config: NodeConfig,
2814 indirect_objects_threshold: usize,
2815 archive_readers: ArchiveReaderBalancer,
2816 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2817 ) -> Arc<Self> {
2818 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2819
2820 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2821 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2822 let transaction_manager = Arc::new(TransactionManager::new(
2823 execution_cache_trait_pointers.object_cache_reader.clone(),
2824 execution_cache_trait_pointers
2825 .transaction_cache_reader
2826 .clone(),
2827 &epoch_store,
2828 tx_ready_certificates,
2829 metrics.clone(),
2830 ));
2831 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2832
2833 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2834 epoch_store.get_parent_path(),
2835 &config.authority_store_pruning_config,
2836 );
2837 let _pruner = AuthorityStorePruner::new(
2838 store.perpetual_tables.clone(),
2839 checkpoint_store.clone(),
2840 rest_index.clone(),
2841 store.objects_lock_table.clone(),
2842 config.authority_store_pruning_config.clone(),
2843 epoch_store.committee().authority_exists(&name),
2844 epoch_store.epoch_start_state().epoch_duration_ms(),
2845 prometheus_registry,
2846 indirect_objects_threshold,
2847 archive_readers,
2848 );
2849 let input_loader =
2850 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2851 let epoch = epoch_store.epoch();
2852 let state = Arc::new(AuthorityState {
2853 name,
2854 secret,
2855 execution_lock: RwLock::new(epoch),
2856 epoch_store: ArcSwap::new(epoch_store.clone()),
2857 input_loader,
2858 execution_cache_trait_pointers,
2859 indexes,
2860 rest_index,
2861 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2862 checkpoint_store,
2863 committee_store,
2864 transaction_manager,
2865 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2866 metrics,
2867 _pruner,
2868 _authority_per_epoch_pruner,
2869 db_checkpoint_config: db_checkpoint_config.clone(),
2870 config,
2871 overload_info: AuthorityOverloadInfo::default(),
2872 validator_tx_finalizer,
2873 });
2874
2875 let authority_state = Arc::downgrade(&state);
2877 spawn_monitored_task!(execution_process(
2878 authority_state,
2879 rx_ready_certificates,
2880 rx_execution_shutdown,
2881 ));
2882
2883 state
2885 .create_owner_index_if_empty(genesis_objects, &epoch_store)
2886 .expect("Error indexing genesis objects.");
2887
2888 state
2889 }
2890
2891 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2893 &self.execution_cache_trait_pointers.object_cache_reader
2894 }
2895
2896 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2897 &self.execution_cache_trait_pointers.transaction_cache_reader
2898 }
2899
2900 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2901 &self.execution_cache_trait_pointers.cache_writer
2902 }
2903
2904 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2905 &self.execution_cache_trait_pointers.backing_store
2906 }
2907
2908 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2909 &self.execution_cache_trait_pointers.backing_package_store
2910 }
2911
2912 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2913 &self.execution_cache_trait_pointers.object_store
2914 }
2915
2916 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2917 &self.execution_cache_trait_pointers.reconfig_api
2918 }
2919
2920 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2921 &self.execution_cache_trait_pointers.accumulator_store
2922 }
2923
2924 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2925 &self.execution_cache_trait_pointers.checkpoint_cache
2926 }
2927
2928 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2929 &self.execution_cache_trait_pointers.state_sync_store
2930 }
2931
2932 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2933 &self.execution_cache_trait_pointers.cache_commit
2934 }
2935
2936 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2937 self.execution_cache_trait_pointers
2938 .testing_api
2939 .database_for_testing()
2940 }
2941
2942 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2943 &self,
2944 config: NodeConfig,
2945 metrics: Arc<AuthorityStorePruningMetrics>,
2946 ) -> anyhow::Result<()> {
2947 let archive_readers =
2948 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
2949 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
2950 &self.database_for_testing().perpetual_tables,
2951 &self.checkpoint_store,
2952 self.rest_index.as_deref(),
2953 &self.database_for_testing().objects_lock_table,
2954 config.authority_store_pruning_config,
2955 metrics,
2956 config.indirect_objects_threshold,
2957 archive_readers,
2958 EPOCH_DURATION_MS_FOR_TESTING,
2959 )
2960 .await
2961 }
2962
2963 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
2964 &self.transaction_manager
2965 }
2966
2967 pub fn enqueue_certificates_for_execution(
2969 &self,
2970 certs: Vec<VerifiedCertificate>,
2971 epoch_store: &Arc<AuthorityPerEpochStore>,
2972 ) {
2973 self.transaction_manager
2974 .enqueue_certificates(certs, epoch_store)
2975 }
2976
2977 pub fn enqueue_with_expected_effects_digest(
2978 &self,
2979 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
2980 epoch_store: &AuthorityPerEpochStore,
2981 ) {
2982 self.transaction_manager
2983 .enqueue_with_expected_effects_digest(certs, epoch_store)
2984 }
2985
2986 fn create_owner_index_if_empty(
2987 &self,
2988 genesis_objects: &[Object],
2989 epoch_store: &Arc<AuthorityPerEpochStore>,
2990 ) -> IotaResult {
2991 let Some(index_store) = &self.indexes else {
2992 return Ok(());
2993 };
2994 if !index_store.is_empty() {
2995 return Ok(());
2996 }
2997
2998 let mut new_owners = vec![];
2999 let mut new_dynamic_fields = vec![];
3000 let mut layout_resolver = epoch_store
3001 .executor()
3002 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
3003 for o in genesis_objects.iter() {
3004 match o.owner {
3005 Owner::AddressOwner(addr) => new_owners.push((
3006 (addr, o.id()),
3007 ObjectInfo::new(&o.compute_object_reference(), o),
3008 )),
3009 Owner::ObjectOwner(object_id) => {
3010 let id = o.id();
3011 let Some(info) = self.try_create_dynamic_field_info(
3012 o,
3013 &BTreeMap::new(),
3014 layout_resolver.as_mut(),
3015 true,
3016 )?
3017 else {
3018 continue;
3019 };
3020 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
3021 }
3022 _ => {}
3023 }
3024 }
3025
3026 index_store.insert_genesis_objects(ObjectIndexChanges {
3027 deleted_owners: vec![],
3028 deleted_dynamic_fields: vec![],
3029 new_owners,
3030 new_dynamic_fields,
3031 })
3032 }
3033
3034 pub async fn execution_lock_for_executable_transaction(
3038 &self,
3039 transaction: &VerifiedExecutableTransaction,
3040 ) -> IotaResult<ExecutionLockReadGuard> {
3041 let lock = self.execution_lock.read().await;
3042 if *lock == transaction.auth_sig().epoch() {
3043 Ok(lock)
3044 } else {
3045 Err(IotaError::WrongEpoch {
3046 expected_epoch: *lock,
3047 actual_epoch: transaction.auth_sig().epoch(),
3048 })
3049 }
3050 }
3051
3052 pub async fn execution_lock_for_signing(&self) -> ExecutionLockReadGuard {
3058 self.execution_lock.read().await
3059 }
3060
3061 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard {
3062 self.execution_lock.write().await
3063 }
3064
3065 #[instrument(level = "error", skip_all)]
3066 pub async fn reconfigure(
3067 &self,
3068 cur_epoch_store: &AuthorityPerEpochStore,
3069 supported_protocol_versions: SupportedProtocolVersions,
3070 new_committee: Committee,
3071 epoch_start_configuration: EpochStartConfiguration,
3072 accumulator: Arc<StateAccumulator>,
3073 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3074 epoch_supply_change: i64,
3075 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
3076 Self::check_protocol_version(
3077 supported_protocol_versions,
3078 epoch_start_configuration
3079 .epoch_start_state()
3080 .protocol_version(),
3081 );
3082 self.metrics.reset_on_reconfigure();
3083 self.committee_store.insert_new_committee(&new_committee)?;
3084
3085 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3087
3088 cur_epoch_store.epoch_terminated().await;
3090
3091 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
3097 .await?;
3098 self.get_reconfig_api()
3099 .clear_state_end_of_epoch(&execution_lock);
3100 self.check_system_consistency(
3101 cur_epoch_store,
3102 accumulator,
3103 expensive_safety_check_config,
3104 epoch_supply_change,
3105 )?;
3106 self.get_reconfig_api()
3107 .set_epoch_start_configuration(&epoch_start_configuration)?;
3108 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
3109 if self
3110 .db_checkpoint_config
3111 .perform_db_checkpoints_at_epoch_end
3112 {
3113 let checkpoint_indexes = self
3114 .db_checkpoint_config
3115 .perform_index_db_checkpoints_at_epoch_end
3116 .unwrap_or(false);
3117 let current_epoch = cur_epoch_store.epoch();
3118 let epoch_checkpoint_path = checkpoint_path.join(format!("epoch_{current_epoch}"));
3119 self.checkpoint_all_dbs(
3120 &epoch_checkpoint_path,
3121 cur_epoch_store,
3122 checkpoint_indexes,
3123 )?;
3124 }
3125 }
3126
3127 self.get_reconfig_api()
3128 .reconfigure_cache(&epoch_start_configuration)
3129 .await;
3130
3131 let new_epoch = new_committee.epoch;
3132 let new_epoch_store = self
3133 .reopen_epoch_db(
3134 cur_epoch_store,
3135 new_committee,
3136 epoch_start_configuration,
3137 expensive_safety_check_config,
3138 )
3139 .await?;
3140 assert_eq!(new_epoch_store.epoch(), new_epoch);
3141 self.transaction_manager.reconfigure(new_epoch);
3142 *execution_lock = new_epoch;
3143 Ok(new_epoch_store)
3147 }
3148
3149 pub async fn reconfigure_for_testing(&self) {
3154 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3155 let epoch_store = self.epoch_store_for_testing().clone();
3156 let protocol_config = epoch_store.protocol_config().clone();
3157 let _guard =
3165 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3166 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3167 self.get_backing_package_store().clone(),
3168 self.get_object_store().clone(),
3169 &self.config.expensive_safety_check_config,
3170 );
3171 let new_epoch = new_epoch_store.epoch();
3172 self.transaction_manager.reconfigure(new_epoch);
3173 self.epoch_store.store(new_epoch_store);
3174 epoch_store.epoch_terminated().await;
3175 *execution_lock = new_epoch;
3176 }
3177
3178 #[instrument(level = "error", skip_all)]
3179 fn check_system_consistency(
3180 &self,
3181 cur_epoch_store: &AuthorityPerEpochStore,
3182 accumulator: Arc<StateAccumulator>,
3183 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3184 epoch_supply_change: i64,
3185 ) -> IotaResult<()> {
3186 info!(
3187 "Performing iota conservation consistency check for epoch {}",
3188 cur_epoch_store.epoch()
3189 );
3190
3191 if cfg!(debug_assertions) {
3192 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3193 }
3194
3195 self.get_reconfig_api()
3196 .expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3197
3198 if expensive_safety_check_config.enable_state_consistency_check() {
3200 info!(
3201 "Performing state consistency check for epoch {}",
3202 cur_epoch_store.epoch()
3203 );
3204 self.expensive_check_is_consistent_state(
3205 accumulator,
3206 cur_epoch_store,
3207 cfg!(debug_assertions), );
3209 }
3210
3211 if expensive_safety_check_config.enable_secondary_index_checks() {
3212 if let Some(indexes) = self.indexes.clone() {
3213 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3214 .expect("secondary indexes are inconsistent");
3215 }
3216 }
3217
3218 Ok(())
3219 }
3220
3221 fn expensive_check_is_consistent_state(
3222 &self,
3223 accumulator: Arc<StateAccumulator>,
3224 cur_epoch_store: &AuthorityPerEpochStore,
3225 panic: bool,
3226 ) {
3227 let live_object_set_hash = accumulator.digest_live_object_set();
3228
3229 let root_state_hash: ECMHLiveObjectSetDigest = self
3230 .get_accumulator_store()
3231 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3232 .expect("Retrieving root state hash cannot fail")
3233 .expect("Root state hash for epoch must exist")
3234 .1
3235 .digest()
3236 .into();
3237
3238 let is_inconsistent = root_state_hash != live_object_set_hash;
3239 if is_inconsistent {
3240 if panic {
3241 panic!(
3242 "Inconsistent state detected: root state hash: {root_state_hash:?}, live object set hash: {live_object_set_hash:?}"
3243 );
3244 } else {
3245 error!(
3246 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3247 root_state_hash, live_object_set_hash
3248 );
3249 }
3250 } else {
3251 info!("State consistency check passed");
3252 }
3253
3254 if !panic {
3255 accumulator.set_inconsistent_state(is_inconsistent);
3256 }
3257 }
3258
3259 pub fn current_epoch_for_testing(&self) -> EpochId {
3260 self.epoch_store_for_testing().epoch()
3261 }
3262
3263 #[instrument(level = "error", skip_all)]
3264 pub fn checkpoint_all_dbs(
3265 &self,
3266 checkpoint_path: &Path,
3267 cur_epoch_store: &AuthorityPerEpochStore,
3268 checkpoint_indexes: bool,
3269 ) -> IotaResult {
3270 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3271 let current_epoch = cur_epoch_store.epoch();
3272
3273 if checkpoint_path.exists() {
3274 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3275 return Ok(());
3276 }
3277
3278 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3279 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3280
3281 if checkpoint_path_tmp.exists() {
3282 fs::remove_dir_all(&checkpoint_path_tmp)
3283 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3284 }
3285
3286 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3287 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3288
3289 self.checkpoint_store
3292 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3293
3294 self.get_reconfig_api()
3295 .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3296
3297 self.committee_store
3298 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3299
3300 if checkpoint_indexes {
3301 if let Some(indexes) = self.indexes.as_ref() {
3302 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3303 }
3304 }
3305
3306 fs::rename(checkpoint_path_tmp, checkpoint_path)
3307 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3308 Ok(())
3309 }
3310
3311 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3318 self.epoch_store.load()
3319 }
3320
3321 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3323 self.load_epoch_store_one_call_per_task()
3324 }
3325
3326 pub fn clone_committee_for_testing(&self) -> Committee {
3327 Committee::clone(self.epoch_store_for_testing().committee())
3328 }
3329
3330 #[instrument(level = "trace", skip_all)]
3331 pub async fn get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3332 self.get_object_store()
3333 .get_object(object_id)
3334 .map_err(Into::into)
3335 }
3336
3337 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3338 Ok(self
3339 .get_object(&IOTA_SYSTEM_ADDRESS.into())
3340 .await?
3341 .expect("framework object should always exist")
3342 .compute_object_reference())
3343 }
3344
3345 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3347 self.get_object_cache_reader()
3348 .get_iota_system_state_object_unsafe()
3349 }
3350
3351 #[instrument(level = "trace", skip_all)]
3352 pub fn get_checkpoint_by_sequence_number(
3353 &self,
3354 sequence_number: CheckpointSequenceNumber,
3355 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3356 Ok(self
3357 .checkpoint_store
3358 .get_checkpoint_by_sequence_number(sequence_number)?)
3359 }
3360
3361 #[instrument(level = "trace", skip_all)]
3362 pub fn get_transaction_checkpoint_for_tests(
3363 &self,
3364 digest: &TransactionDigest,
3365 epoch_store: &AuthorityPerEpochStore,
3366 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3367 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3368 let Some(checkpoint) = checkpoint else {
3369 return Ok(None);
3370 };
3371 let checkpoint = self
3372 .checkpoint_store
3373 .get_checkpoint_by_sequence_number(checkpoint)?;
3374 Ok(checkpoint)
3375 }
3376
3377 #[instrument(level = "trace", skip_all)]
3378 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3379 Ok(
3380 match self
3381 .get_object_cache_reader()
3382 .get_latest_object_or_tombstone(*object_id)?
3383 {
3384 Some((_, ObjectOrTombstone::Object(object))) => {
3385 let layout = self.get_object_layout(&object)?;
3386 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3387 }
3388 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3389 None => ObjectRead::NotExists(*object_id),
3390 },
3391 )
3392 }
3393
3394 pub fn get_chain_identifier(&self) -> Option<ChainIdentifier> {
3396 if let Some(digest) = CHAIN_IDENTIFIER.get() {
3397 return Some(*digest);
3398 }
3399
3400 let checkpoint = self
3401 .get_checkpoint_by_sequence_number(0)
3402 .tap_err(|e| error!("Failed to get genesis checkpoint: {:?}", e))
3403 .ok()?
3404 .tap_none(|| error!("Genesis checkpoint is missing from DB"))?;
3405 let _ = CHAIN_IDENTIFIER.set(ChainIdentifier::from(*checkpoint.digest()));
3407 Some(ChainIdentifier::from(*checkpoint.digest()))
3408 }
3409
3410 #[instrument(level = "trace", skip_all)]
3411 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3412 where
3413 T: DeserializeOwned,
3414 {
3415 let o = self.get_object_read(object_id)?.into_object()?;
3416 if let Some(move_object) = o.data.try_as_move() {
3417 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3418 IotaError::ObjectDeserialization {
3419 error: format!("{e}"),
3420 }
3421 })?)
3422 } else {
3423 Err(IotaError::ObjectDeserialization {
3424 error: format!("Provided object : [{object_id}] is not a Move object."),
3425 })
3426 }
3427 }
3428
3429 #[instrument(level = "trace", skip_all)]
3435 pub fn get_past_object_read(
3436 &self,
3437 object_id: &ObjectID,
3438 version: SequenceNumber,
3439 ) -> IotaResult<PastObjectRead> {
3440 let Some(obj_ref) = self
3442 .get_object_cache_reader()
3443 .get_latest_object_ref_or_tombstone(*object_id)?
3444 else {
3445 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3446 };
3447
3448 if version > obj_ref.1 {
3449 return Ok(PastObjectRead::VersionTooHigh {
3450 object_id: *object_id,
3451 asked_version: version,
3452 latest_version: obj_ref.1,
3453 });
3454 }
3455
3456 if version < obj_ref.1 {
3457 return Ok(match self.read_object_at_version(object_id, version)? {
3459 Some((object, layout)) => {
3460 let obj_ref = object.compute_object_reference();
3461 PastObjectRead::VersionFound(obj_ref, object, layout)
3462 }
3463
3464 None => PastObjectRead::VersionNotFound(*object_id, version),
3465 });
3466 }
3467
3468 if !obj_ref.2.is_alive() {
3469 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3470 }
3471
3472 match self.read_object_at_version(object_id, obj_ref.1)? {
3473 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3474 None => {
3475 error!(
3476 "Object with in parent_entry is missing from object store, datastore is \
3477 inconsistent",
3478 );
3479 Err(UserInputError::ObjectNotFound {
3480 object_id: *object_id,
3481 version: Some(obj_ref.1),
3482 }
3483 .into())
3484 }
3485 }
3486 }
3487
3488 #[instrument(level = "trace", skip_all)]
3489 fn read_object_at_version(
3490 &self,
3491 object_id: &ObjectID,
3492 version: SequenceNumber,
3493 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3494 let Some(object) = self
3495 .get_object_cache_reader()
3496 .get_object_by_key(object_id, version)?
3497 else {
3498 return Ok(None);
3499 };
3500
3501 let layout = self.get_object_layout(&object)?;
3502 Ok(Some((object, layout)))
3503 }
3504
3505 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3506 let layout = object
3507 .data
3508 .try_as_move()
3509 .map(|object| {
3510 into_struct_layout(
3511 self.load_epoch_store_one_call_per_task()
3512 .executor()
3513 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3515 .get_annotated_layout(&object.type_().clone().into())?,
3516 )
3517 })
3518 .transpose()?;
3519 Ok(layout)
3520 }
3521
3522 fn get_owner_at_version(
3523 &self,
3524 object_id: &ObjectID,
3525 version: SequenceNumber,
3526 ) -> IotaResult<Owner> {
3527 self.get_object_store()
3528 .get_object_by_key(object_id, version)?
3529 .ok_or_else(|| {
3530 IotaError::from(UserInputError::ObjectNotFound {
3531 object_id: *object_id,
3532 version: Some(version),
3533 })
3534 })
3535 .map(|o| o.owner)
3536 }
3537
3538 #[instrument(level = "trace", skip_all)]
3539 pub fn get_owner_objects(
3540 &self,
3541 owner: IotaAddress,
3542 cursor: Option<ObjectID>,
3544 limit: usize,
3545 filter: Option<IotaObjectDataFilter>,
3546 ) -> IotaResult<Vec<ObjectInfo>> {
3547 if let Some(indexes) = &self.indexes {
3548 indexes.get_owner_objects(owner, cursor, limit, filter)
3549 } else {
3550 Err(IotaError::IndexStoreNotAvailable)
3551 }
3552 }
3553
3554 #[instrument(level = "trace", skip_all)]
3555 pub fn get_owned_coins_iterator_with_cursor(
3556 &self,
3557 owner: IotaAddress,
3558 cursor: (String, ObjectID),
3560 limit: usize,
3561 one_coin_type_only: bool,
3562 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3563 if let Some(indexes) = &self.indexes {
3564 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3565 } else {
3566 Err(IotaError::IndexStoreNotAvailable)
3567 }
3568 }
3569
3570 #[instrument(level = "trace", skip_all)]
3571 pub fn get_owner_objects_iterator(
3572 &self,
3573 owner: IotaAddress,
3574 cursor: Option<ObjectID>,
3576 filter: Option<IotaObjectDataFilter>,
3577 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3578 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3579 if let Some(indexes) = &self.indexes {
3580 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3581 } else {
3582 Err(IotaError::IndexStoreNotAvailable)
3583 }
3584 }
3585
3586 #[instrument(level = "trace", skip_all)]
3587 pub async fn get_move_objects<T>(
3588 &self,
3589 owner: IotaAddress,
3590 type_: MoveObjectType,
3591 ) -> IotaResult<Vec<T>>
3592 where
3593 T: DeserializeOwned,
3594 {
3595 let object_ids = self
3596 .get_owner_objects_iterator(owner, None, None)?
3597 .filter(|o| match &o.type_ {
3598 ObjectType::Struct(s) => &type_ == s,
3599 ObjectType::Package => false,
3600 })
3601 .map(|info| ObjectKey(info.object_id, info.version))
3602 .collect::<Vec<_>>();
3603 let mut move_objects = vec![];
3604
3605 let objects = self
3606 .get_object_store()
3607 .multi_get_objects_by_key(&object_ids)?;
3608
3609 for (o, id) in objects.into_iter().zip(object_ids) {
3610 let object = o.ok_or_else(|| {
3611 IotaError::from(UserInputError::ObjectNotFound {
3612 object_id: id.0,
3613 version: Some(id.1),
3614 })
3615 })?;
3616 let move_object = object.data.try_as_move().ok_or_else(|| {
3617 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3618 })?;
3619 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3620 IotaError::ObjectDeserialization {
3621 error: format!("{e}"),
3622 }
3623 })?);
3624 }
3625 Ok(move_objects)
3626 }
3627
3628 #[instrument(level = "trace", skip_all)]
3629 pub fn get_dynamic_fields(
3630 &self,
3631 owner: ObjectID,
3632 cursor: Option<ObjectID>,
3634 limit: usize,
3635 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3636 Ok(self
3637 .get_dynamic_fields_iterator(owner, cursor)?
3638 .take(limit)
3639 .collect::<Result<Vec<_>, _>>()?)
3640 }
3641
3642 fn get_dynamic_fields_iterator(
3643 &self,
3644 owner: ObjectID,
3645 cursor: Option<ObjectID>,
3647 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3648 {
3649 if let Some(indexes) = &self.indexes {
3650 indexes.get_dynamic_fields_iterator(owner, cursor)
3651 } else {
3652 Err(IotaError::IndexStoreNotAvailable)
3653 }
3654 }
3655
3656 #[instrument(level = "trace", skip_all)]
3657 pub fn get_dynamic_field_object_id(
3658 &self,
3659 owner: ObjectID,
3660 name_type: TypeTag,
3661 name_bcs_bytes: &[u8],
3662 ) -> IotaResult<Option<ObjectID>> {
3663 if let Some(indexes) = &self.indexes {
3664 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3665 } else {
3666 Err(IotaError::IndexStoreNotAvailable)
3667 }
3668 }
3669
3670 #[instrument(level = "trace", skip_all)]
3671 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3672 Ok(self.get_indexes()?.next_sequence_number())
3673 }
3674
3675 #[instrument(level = "trace", skip_all)]
3676 pub async fn get_executed_transaction_and_effects(
3677 &self,
3678 digest: TransactionDigest,
3679 kv_store: Arc<TransactionKeyValueStore>,
3680 ) -> IotaResult<(Transaction, TransactionEffects)> {
3681 let transaction = kv_store.get_tx(digest).await?;
3682 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3683 Ok((transaction, effects))
3684 }
3685
3686 #[instrument(level = "trace", skip_all)]
3687 pub fn multi_get_checkpoint_by_sequence_number(
3688 &self,
3689 sequence_numbers: &[CheckpointSequenceNumber],
3690 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3691 Ok(self
3692 .checkpoint_store
3693 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3694 }
3695
3696 #[instrument(level = "trace", skip_all)]
3697 pub fn get_transaction_events(
3698 &self,
3699 digest: &TransactionEventsDigest,
3700 ) -> IotaResult<TransactionEvents> {
3701 self.get_transaction_cache_reader()
3702 .get_events(digest)?
3703 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3704 }
3705
3706 pub fn get_transaction_input_objects(
3707 &self,
3708 effects: &TransactionEffects,
3709 ) -> anyhow::Result<Vec<Object>> {
3710 let input_object_keys = effects
3711 .modified_at_versions()
3712 .into_iter()
3713 .map(|(object_id, version)| ObjectKey(object_id, version))
3714 .collect::<Vec<_>>();
3715
3716 let input_objects = self
3717 .get_object_store()
3718 .multi_get_objects_by_key(&input_object_keys)?
3719 .into_iter()
3720 .enumerate()
3721 .map(|(idx, maybe_object)| {
3722 maybe_object.ok_or_else(|| {
3723 anyhow::anyhow!(
3724 "missing input object key {:?} from tx {}",
3725 input_object_keys[idx],
3726 effects.transaction_digest()
3727 )
3728 })
3729 })
3730 .collect::<anyhow::Result<Vec<_>>>()?;
3731 Ok(input_objects)
3732 }
3733
3734 pub fn get_transaction_output_objects(
3735 &self,
3736 effects: &TransactionEffects,
3737 ) -> anyhow::Result<Vec<Object>> {
3738 let output_object_keys = effects
3739 .all_changed_objects()
3740 .into_iter()
3741 .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3742 .collect::<Vec<_>>();
3743
3744 let output_objects = self
3745 .get_object_store()
3746 .multi_get_objects_by_key(&output_object_keys)?
3747 .into_iter()
3748 .enumerate()
3749 .map(|(idx, maybe_object)| {
3750 maybe_object.ok_or_else(|| {
3751 anyhow::anyhow!(
3752 "missing output object key {:?} from tx {}",
3753 output_object_keys[idx],
3754 effects.transaction_digest()
3755 )
3756 })
3757 })
3758 .collect::<anyhow::Result<Vec<_>>>()?;
3759 Ok(output_objects)
3760 }
3761
3762 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3763 match &self.indexes {
3764 Some(i) => Ok(i.clone()),
3765 None => Err(IotaError::UnsupportedFeature {
3766 error: "extended object indexing is not enabled on this server".into(),
3767 }),
3768 }
3769 }
3770
3771 pub async fn get_transactions_for_tests(
3772 self: &Arc<Self>,
3773 filter: Option<TransactionFilter>,
3774 cursor: Option<TransactionDigest>,
3775 limit: Option<usize>,
3776 reverse: bool,
3777 ) -> IotaResult<Vec<TransactionDigest>> {
3778 let metrics = KeyValueStoreMetrics::new_for_tests();
3779 let kv_store = Arc::new(TransactionKeyValueStore::new(
3780 "rocksdb",
3781 metrics,
3782 self.clone(),
3783 ));
3784 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3785 .await
3786 }
3787
3788 #[instrument(level = "trace", skip_all)]
3789 pub async fn get_transactions(
3790 &self,
3791 kv_store: &Arc<TransactionKeyValueStore>,
3792 filter: Option<TransactionFilter>,
3793 cursor: Option<TransactionDigest>,
3795 limit: Option<usize>,
3796 reverse: bool,
3797 ) -> IotaResult<Vec<TransactionDigest>> {
3798 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3799 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3800 let iter = checkpoint_contents.iter().map(|c| c.transaction);
3801 if reverse {
3802 let iter = iter
3803 .rev()
3804 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3805 .skip(usize::from(cursor.is_some()));
3806 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3807 } else {
3808 let iter = iter
3809 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3810 .skip(usize::from(cursor.is_some()));
3811 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3812 }
3813 }
3814 self.get_indexes()?
3815 .get_transactions(filter, cursor, limit, reverse)
3816 }
3817
3818 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3819 &self.checkpoint_store
3820 }
3821
3822 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3823 self.get_checkpoint_store()
3824 .get_highest_executed_checkpoint_seq_number()?
3825 .ok_or(IotaError::UserInput {
3826 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3827 })
3828 }
3829
3830 #[cfg(msim)]
3831 pub fn get_highest_pruned_checkpoint_for_testing(
3832 &self,
3833 ) -> IotaResult<CheckpointSequenceNumber> {
3834 self.database_for_testing()
3835 .perpetual_tables
3836 .get_highest_pruned_checkpoint()
3837 }
3838
3839 #[instrument(level = "trace", skip_all)]
3840 pub fn get_checkpoint_summary_by_sequence_number(
3841 &self,
3842 sequence_number: CheckpointSequenceNumber,
3843 ) -> IotaResult<CheckpointSummary> {
3844 let verified_checkpoint = self
3845 .get_checkpoint_store()
3846 .get_checkpoint_by_sequence_number(sequence_number)?;
3847 match verified_checkpoint {
3848 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3849 None => Err(IotaError::UserInput {
3850 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3851 }),
3852 }
3853 }
3854
3855 #[instrument(level = "trace", skip_all)]
3856 pub fn get_checkpoint_summary_by_digest(
3857 &self,
3858 digest: CheckpointDigest,
3859 ) -> IotaResult<CheckpointSummary> {
3860 let verified_checkpoint = self
3861 .get_checkpoint_store()
3862 .get_checkpoint_by_digest(&digest)?;
3863 match verified_checkpoint {
3864 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3865 None => Err(IotaError::UserInput {
3866 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3867 }),
3868 }
3869 }
3870
3871 #[instrument(level = "trace", skip_all)]
3872 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3873 if is_system_package(package_id) {
3874 return self.find_genesis_txn_digest();
3875 }
3876 Ok(self
3877 .get_object_read(&package_id)?
3878 .into_object()?
3879 .previous_transaction)
3880 }
3881
3882 #[instrument(level = "trace", skip_all)]
3883 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3884 let summary = self
3885 .get_verified_checkpoint_by_sequence_number(0)?
3886 .into_message();
3887 let content = self.get_checkpoint_contents(summary.content_digest)?;
3888 let genesis_transaction = content.enumerate_transactions(&summary).next();
3889 Ok(genesis_transaction
3890 .ok_or(IotaError::UserInput {
3891 error: UserInputError::GenesisTransactionNotFound,
3892 })?
3893 .1
3894 .transaction)
3895 }
3896
3897 #[instrument(level = "trace", skip_all)]
3898 pub fn get_verified_checkpoint_by_sequence_number(
3899 &self,
3900 sequence_number: CheckpointSequenceNumber,
3901 ) -> IotaResult<VerifiedCheckpoint> {
3902 let verified_checkpoint = self
3903 .get_checkpoint_store()
3904 .get_checkpoint_by_sequence_number(sequence_number)?;
3905 match verified_checkpoint {
3906 Some(verified_checkpoint) => Ok(verified_checkpoint),
3907 None => Err(IotaError::UserInput {
3908 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3909 }),
3910 }
3911 }
3912
3913 #[instrument(level = "trace", skip_all)]
3914 pub fn get_verified_checkpoint_summary_by_digest(
3915 &self,
3916 digest: CheckpointDigest,
3917 ) -> IotaResult<VerifiedCheckpoint> {
3918 let verified_checkpoint = self
3919 .get_checkpoint_store()
3920 .get_checkpoint_by_digest(&digest)?;
3921 match verified_checkpoint {
3922 Some(verified_checkpoint) => Ok(verified_checkpoint),
3923 None => Err(IotaError::UserInput {
3924 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3925 }),
3926 }
3927 }
3928
3929 #[instrument(level = "trace", skip_all)]
3930 pub fn get_checkpoint_contents(
3931 &self,
3932 digest: CheckpointContentsDigest,
3933 ) -> IotaResult<CheckpointContents> {
3934 self.get_checkpoint_store()
3935 .get_checkpoint_contents(&digest)?
3936 .ok_or(IotaError::UserInput {
3937 error: UserInputError::CheckpointContentsNotFound(digest),
3938 })
3939 }
3940
3941 #[instrument(level = "trace", skip_all)]
3942 pub fn get_checkpoint_contents_by_sequence_number(
3943 &self,
3944 sequence_number: CheckpointSequenceNumber,
3945 ) -> IotaResult<CheckpointContents> {
3946 let verified_checkpoint = self
3947 .get_checkpoint_store()
3948 .get_checkpoint_by_sequence_number(sequence_number)?;
3949 match verified_checkpoint {
3950 Some(verified_checkpoint) => {
3951 let content_digest = verified_checkpoint.into_inner().content_digest;
3952 self.get_checkpoint_contents(content_digest)
3953 }
3954 None => Err(IotaError::UserInput {
3955 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3956 }),
3957 }
3958 }
3959
3960 #[instrument(level = "trace", skip_all)]
3961 pub async fn query_events(
3962 &self,
3963 kv_store: &Arc<TransactionKeyValueStore>,
3964 query: EventFilter,
3965 cursor: Option<EventID>,
3967 limit: usize,
3968 descending: bool,
3969 ) -> IotaResult<Vec<IotaEvent>> {
3970 let index_store = self.get_indexes()?;
3971
3972 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
3974 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
3975 IotaError::TransactionNotFound {
3976 digest: cursor.tx_digest,
3977 },
3978 )?;
3979 (tx_seq, cursor.event_seq as usize)
3980 } else if descending {
3981 (u64::MAX, usize::MAX)
3982 } else {
3983 (0, 0)
3984 };
3985
3986 let limit = limit + 1;
3987 let mut event_keys = match query {
3988 EventFilter::All(filters) => {
3989 if filters.is_empty() {
3990 index_store.all_events(tx_num, event_num, limit, descending)?
3991 } else {
3992 return Err(IotaError::UserInput {
3993 error: UserInputError::Unsupported(
3994 "This query type does not currently support filter combinations"
3995 .to_string(),
3996 ),
3997 });
3998 }
3999 }
4000 EventFilter::Transaction(digest) => {
4001 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
4002 }
4003 EventFilter::MoveModule { package, module } => {
4004 let module_id = ModuleId::new(package.into(), module);
4005 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
4006 }
4007 EventFilter::MoveEventType(struct_name) => index_store
4008 .events_by_move_event_struct_name(
4009 &struct_name,
4010 tx_num,
4011 event_num,
4012 limit,
4013 descending,
4014 )?,
4015 EventFilter::Sender(sender) => {
4016 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
4017 }
4018 EventFilter::TimeRange {
4019 start_time,
4020 end_time,
4021 } => index_store
4022 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
4023 EventFilter::MoveEventModule { package, module } => index_store
4024 .events_by_move_event_module(
4025 &ModuleId::new(package.into(), module),
4026 tx_num,
4027 event_num,
4028 limit,
4029 descending,
4030 )?,
4031 EventFilter::Package(_)
4033 | EventFilter::MoveEventField { .. }
4034 | EventFilter::Any(_)
4035 | EventFilter::And(_, _)
4036 | EventFilter::Or(_, _) => {
4037 return Err(IotaError::UserInput {
4038 error: UserInputError::Unsupported(
4039 "This query type is not supported by the full node.".to_string(),
4040 ),
4041 });
4042 }
4043 };
4044
4045 if cursor.is_some() {
4048 if !event_keys.is_empty() {
4049 event_keys.remove(0);
4050 }
4051 } else {
4052 event_keys.truncate(limit - 1);
4053 }
4054
4055 let transaction_digests = event_keys
4057 .iter()
4058 .map(|(_, digest, _, _)| *digest)
4059 .collect::<HashSet<_>>()
4060 .into_iter()
4061 .collect::<Vec<_>>();
4062
4063 let events = kv_store
4064 .multi_get_events_by_tx_digests(&transaction_digests)
4065 .await?;
4066
4067 let events_map: HashMap<_, _> =
4068 transaction_digests.iter().zip(events.into_iter()).collect();
4069
4070 let stored_events = event_keys
4071 .into_iter()
4072 .map(|k| {
4073 (
4074 k,
4075 events_map
4076 .get(&k.1)
4077 .expect("fetched digest is missing")
4078 .clone()
4079 .and_then(|e| e.data.get(k.2).cloned()),
4080 )
4081 })
4082 .map(|((digest, tx_digest, event_seq, timestamp), event)| {
4083 event
4084 .map(|e| (e, tx_digest, event_seq, timestamp))
4085 .ok_or(IotaError::TransactionEventsNotFound { digest })
4086 })
4087 .collect::<Result<Vec<_>, _>>()?;
4088
4089 let epoch_store = self.load_epoch_store_one_call_per_task();
4090 let backing_store = self.get_backing_package_store().as_ref();
4091 let mut layout_resolver = epoch_store
4092 .executor()
4093 .type_layout_resolver(Box::new(backing_store));
4094 let mut events = vec![];
4095 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
4096 events.push(IotaEvent::try_from(
4097 e.clone(),
4098 tx_digest,
4099 event_seq as u64,
4100 Some(timestamp),
4101 layout_resolver.get_annotated_layout(&e.type_)?,
4102 )?)
4103 }
4104 Ok(events)
4105 }
4106
4107 pub async fn insert_genesis_object(&self, object: Object) {
4108 self.get_reconfig_api()
4109 .insert_genesis_object(object)
4110 .expect("Cannot insert genesis object")
4111 }
4112
4113 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
4114 futures::future::join_all(
4115 objects
4116 .iter()
4117 .map(|o| self.insert_genesis_object(o.clone())),
4118 )
4119 .await;
4120 }
4121
4122 #[instrument(level = "trace", skip_all)]
4124 pub fn get_transaction_status(
4125 &self,
4126 transaction_digest: &TransactionDigest,
4127 epoch_store: &Arc<AuthorityPerEpochStore>,
4128 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
4129 if let Some(effects) =
4131 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
4132 {
4133 if let Some(transaction) = self
4134 .get_transaction_cache_reader()
4135 .get_transaction_block(transaction_digest)?
4136 {
4137 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4138 let events = if let Some(digest) = effects.events_digest() {
4139 self.get_transaction_events(digest)?
4140 } else {
4141 TransactionEvents::default()
4142 };
4143 return Ok(Some((
4144 (*transaction).clone().into_message(),
4145 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4146 )));
4147 } else {
4148 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4153 }
4154 }
4155 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4156 self.metrics.tx_already_processed.inc();
4157 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4158 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4159 } else {
4160 Ok(None)
4161 }
4162 }
4163
4164 #[instrument(level = "trace", skip_all)]
4168 pub fn get_signed_effects_and_maybe_resign(
4169 &self,
4170 transaction_digest: &TransactionDigest,
4171 epoch_store: &Arc<AuthorityPerEpochStore>,
4172 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4173 let effects = self
4174 .get_transaction_cache_reader()
4175 .get_executed_effects(transaction_digest)?;
4176 match effects {
4177 Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4178 None => Ok(None),
4179 }
4180 }
4181
4182 #[instrument(level = "trace", skip_all)]
4183 pub(crate) fn sign_effects(
4184 &self,
4185 effects: TransactionEffects,
4186 epoch_store: &Arc<AuthorityPerEpochStore>,
4187 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4188 let tx_digest = *effects.transaction_digest();
4189 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4190 Some(sig) if sig.epoch == epoch_store.epoch() => {
4191 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4192 }
4193 _ => {
4194 debug!(
4217 ?tx_digest,
4218 epoch=?epoch_store.epoch(),
4219 "Re-signing the effects with the current epoch"
4220 );
4221
4222 let sig = AuthoritySignInfo::new(
4223 epoch_store.epoch(),
4224 &effects,
4225 Intent::iota_app(IntentScope::TransactionEffects),
4226 self.name,
4227 &*self.secret,
4228 );
4229
4230 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4231
4232 epoch_store.insert_effects_digest_and_signature(
4233 &tx_digest,
4234 effects.digest(),
4235 &sig,
4236 )?;
4237
4238 effects
4239 }
4240 };
4241
4242 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4243 signed_effects,
4244 ))
4245 }
4246
4247 #[instrument(level = "trace", skip_all)]
4249 fn fullnode_only_get_tx_coins_for_indexing(
4250 &self,
4251 inner_temporary_store: &InnerTemporaryStore,
4252 epoch_store: &Arc<AuthorityPerEpochStore>,
4253 ) -> Option<TxCoins> {
4254 if self.indexes.is_none() || self.is_validator(epoch_store) {
4255 return None;
4256 }
4257 let written_coin_objects = inner_temporary_store
4258 .written
4259 .iter()
4260 .filter_map(|(k, v)| {
4261 if v.is_coin() {
4262 Some((*k, v.clone()))
4263 } else {
4264 None
4265 }
4266 })
4267 .collect();
4268 let input_coin_objects = inner_temporary_store
4269 .input_objects
4270 .iter()
4271 .filter_map(|(k, v)| {
4272 if v.is_coin() {
4273 Some((*k, v.clone()))
4274 } else {
4275 None
4276 }
4277 })
4278 .collect::<ObjectMap>();
4279 Some((input_coin_objects, written_coin_objects))
4280 }
4281
4282 #[instrument(level = "trace", skip_all)]
4294 pub async fn get_transaction_lock(
4295 &self,
4296 object_ref: &ObjectRef,
4297 epoch_store: &AuthorityPerEpochStore,
4298 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4299 let lock_info = self
4300 .get_object_cache_reader()
4301 .get_lock(*object_ref, epoch_store)?;
4302 let lock_info = match lock_info {
4303 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4304 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4305 provided_obj_ref: *object_ref,
4306 current_version: locked_ref.1,
4307 }
4308 .into());
4309 }
4310 ObjectLockStatus::Initialized => {
4311 return Ok(None);
4312 }
4313 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4314 };
4315
4316 epoch_store.get_signed_transaction(&lock_info)
4317 }
4318
4319 pub async fn get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4320 self.get_object_cache_reader().get_objects(objects)
4321 }
4322
4323 pub async fn get_object_or_tombstone(
4324 &self,
4325 object_id: ObjectID,
4326 ) -> IotaResult<Option<ObjectRef>> {
4327 self.get_object_cache_reader()
4328 .get_latest_object_ref_or_tombstone(object_id)
4329 }
4330
4331 pub fn set_override_protocol_upgrade_buffer_stake(
4341 &self,
4342 expected_epoch: EpochId,
4343 buffer_stake_bps: u64,
4344 ) -> IotaResult {
4345 let epoch_store = self.load_epoch_store_one_call_per_task();
4346 let actual_epoch = epoch_store.epoch();
4347 if actual_epoch != expected_epoch {
4348 return Err(IotaError::WrongEpoch {
4349 expected_epoch,
4350 actual_epoch,
4351 });
4352 }
4353
4354 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4355 }
4356
4357 pub fn clear_override_protocol_upgrade_buffer_stake(
4358 &self,
4359 expected_epoch: EpochId,
4360 ) -> IotaResult {
4361 let epoch_store = self.load_epoch_store_one_call_per_task();
4362 let actual_epoch = epoch_store.epoch();
4363 if actual_epoch != expected_epoch {
4364 return Err(IotaError::WrongEpoch {
4365 expected_epoch,
4366 actual_epoch,
4367 });
4368 }
4369
4370 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4371 }
4372
4373 pub async fn get_available_system_packages(
4377 &self,
4378 binary_config: &BinaryConfig,
4379 ) -> Vec<ObjectRef> {
4380 let mut results = vec![];
4381
4382 let system_packages = BuiltInFramework::iter_system_packages();
4383
4384 #[cfg(msim)]
4386 let extra_packages = framework_injection::get_extra_packages(self.name);
4387 #[cfg(msim)]
4388 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4389
4390 for system_package in system_packages {
4391 let modules = system_package.modules().to_vec();
4392 #[cfg(msim)]
4394 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4395 .unwrap_or(modules);
4396
4397 let Some(obj_ref) = iota_framework::compare_system_package(
4398 &self.get_object_store(),
4399 &system_package.id,
4400 &modules,
4401 system_package.dependencies.to_vec(),
4402 binary_config,
4403 )
4404 .await
4405 else {
4406 return vec![];
4407 };
4408 results.push(obj_ref);
4409 }
4410
4411 results
4412 }
4413
4414 async fn get_system_package_bytes(
4431 &self,
4432 system_packages: Vec<ObjectRef>,
4433 binary_config: &BinaryConfig,
4434 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4435 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4436 let objects = self.get_objects(&ids).await.expect("read cannot fail");
4437
4438 let mut res = Vec::with_capacity(system_packages.len());
4439 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4440 let prev_transaction = match object {
4441 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4442 info!("Framework {} does not need updating", system_package_ref.0);
4444 continue;
4445 }
4446
4447 Some(cur_object) => cur_object.previous_transaction,
4448 None => TransactionDigest::genesis_marker(),
4449 };
4450
4451 #[cfg(msim)]
4452 let SystemPackage {
4453 id: _,
4454 bytes,
4455 dependencies,
4456 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4457 .unwrap_or_else(|| {
4458 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4459 });
4460
4461 #[cfg(not(msim))]
4462 let SystemPackage {
4463 id: _,
4464 bytes,
4465 dependencies,
4466 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4467
4468 let modules: Vec<_> = bytes
4469 .iter()
4470 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4471 .collect();
4472
4473 let new_object = Object::new_system_package(
4474 &modules,
4475 system_package_ref.1,
4476 dependencies.clone(),
4477 prev_transaction,
4478 );
4479
4480 let new_ref = new_object.compute_object_reference();
4481 if new_ref != system_package_ref {
4482 error!(
4483 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4484 );
4485 return None;
4486 }
4487
4488 res.push((system_package_ref.1, bytes, dependencies));
4489 }
4490
4491 Some(res)
4492 }
4493
4494 fn is_protocol_version_supported_v1(
4498 proposed_protocol_version: ProtocolVersion,
4499 committee: &Committee,
4500 capabilities: Vec<AuthorityCapabilitiesV1>,
4501 mut buffer_stake_bps: u64,
4502 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
4503 if buffer_stake_bps > 10000 {
4504 warn!("clamping buffer_stake_bps to 10000");
4505 buffer_stake_bps = 10000;
4506 }
4507
4508 let mut desired_upgrades: Vec<_> = capabilities
4511 .into_iter()
4512 .filter_map(|mut cap| {
4513 if cap.available_system_packages.is_empty() {
4515 return None;
4516 }
4517
4518 cap.available_system_packages.sort();
4519
4520 info!(
4521 "validator {:?} supports {:?} with system packages: {:?}",
4522 cap.authority.concise(),
4523 cap.supported_protocol_versions,
4524 cap.available_system_packages,
4525 );
4526
4527 cap.supported_protocol_versions
4531 .get_version_digest(proposed_protocol_version)
4532 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4533 })
4534 .collect();
4535
4536 desired_upgrades.sort();
4539 desired_upgrades
4540 .into_iter()
4541 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4542 .into_iter()
4543 .find_map(|((digest, packages), group)| {
4544 assert!(!packages.is_empty());
4546
4547 let mut stake_aggregator: StakeAggregator<(), true> =
4548 StakeAggregator::new(Arc::new(committee.clone()));
4549
4550 for (_, _, authority) in group {
4551 stake_aggregator.insert_generic(authority, ());
4552 }
4553
4554 let total_votes = stake_aggregator.total_votes();
4555 let quorum_threshold = committee.quorum_threshold();
4556 let f = committee.total_votes() - committee.quorum_threshold();
4557
4558 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
4560 let effective_threshold = quorum_threshold + buffer_stake;
4561
4562 info!(
4563 protocol_config_digest = ?digest,
4564 ?total_votes,
4565 ?quorum_threshold,
4566 ?buffer_stake_bps,
4567 ?effective_threshold,
4568 ?proposed_protocol_version,
4569 ?packages,
4570 "support for upgrade"
4571 );
4572
4573 let has_support = total_votes >= effective_threshold;
4574 has_support.then_some((proposed_protocol_version, packages))
4575 })
4576 }
4577
4578 fn choose_protocol_version_and_system_packages_v1(
4582 current_protocol_version: ProtocolVersion,
4583 committee: &Committee,
4584 capabilities: Vec<AuthorityCapabilitiesV1>,
4585 buffer_stake_bps: u64,
4586 ) -> (ProtocolVersion, Vec<ObjectRef>) {
4587 let mut next_protocol_version = current_protocol_version;
4588 let mut system_packages = vec![];
4589
4590 while let Some((version, packages)) = Self::is_protocol_version_supported_v1(
4594 next_protocol_version + 1,
4595 committee,
4596 capabilities.clone(),
4597 buffer_stake_bps,
4598 ) {
4599 next_protocol_version = version;
4600 system_packages = packages;
4601 }
4602
4603 (next_protocol_version, system_packages)
4604 }
4605
4606 #[instrument(level = "debug", skip_all)]
4607 fn create_authenticator_state_tx(
4608 &self,
4609 epoch_store: &Arc<AuthorityPerEpochStore>,
4610 ) -> Option<EndOfEpochTransactionKind> {
4611 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4612 info!("authenticator state transactions not enabled");
4613 return None;
4614 }
4615
4616 let authenticator_state_exists = epoch_store.authenticator_state_exists();
4617 let tx = if authenticator_state_exists {
4618 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4619 let min_epoch =
4620 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4621 let authenticator_obj_initial_shared_version = epoch_store
4622 .epoch_start_config()
4623 .authenticator_obj_initial_shared_version()
4624 .expect("initial version must exist");
4625
4626 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4627 min_epoch,
4628 authenticator_obj_initial_shared_version,
4629 );
4630
4631 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4632
4633 tx
4634 } else {
4635 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4636 info!("Creating AuthenticatorStateCreate tx");
4637 tx
4638 };
4639 Some(tx)
4640 }
4641
4642 #[instrument(level = "error", skip_all)]
4655 pub async fn create_and_execute_advance_epoch_tx(
4656 &self,
4657 epoch_store: &Arc<AuthorityPerEpochStore>,
4658 gas_cost_summary: &GasCostSummary,
4659 checkpoint: CheckpointSequenceNumber,
4660 epoch_start_timestamp_ms: CheckpointTimestamp,
4661 ) -> anyhow::Result<(
4662 IotaSystemState,
4663 Option<SystemEpochInfoEvent>,
4664 TransactionEffects,
4665 )> {
4666 let mut txns = Vec::new();
4667
4668 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4669 txns.push(tx);
4670 }
4671
4672 let next_epoch = epoch_store.epoch() + 1;
4673
4674 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4675
4676 let (next_epoch_protocol_version, next_epoch_system_packages) =
4677 Self::choose_protocol_version_and_system_packages_v1(
4678 epoch_store.protocol_version(),
4679 epoch_store.committee(),
4680 epoch_store
4681 .get_capabilities_v1()
4682 .expect("read capabilities from db cannot fail"),
4683 buffer_stake_bps,
4684 );
4685
4686 let config = epoch_store.protocol_config();
4690 let binary_config = to_binary_config(config);
4691 let Some(next_epoch_system_package_bytes) = self
4692 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4693 .await
4694 else {
4695 error!(
4696 "upgraded system packages {:?} are not locally available, cannot create \
4697 ChangeEpochTx. validator binary must be upgraded to the correct version!",
4698 next_epoch_system_packages
4699 );
4700 bail!("missing system packages: cannot form ChangeEpochTx");
4710 };
4711
4712 if config.protocol_defined_base_fee()
4715 && config.max_committee_members_count_as_option().is_some()
4716 {
4717 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4718 next_epoch,
4719 next_epoch_protocol_version,
4720 gas_cost_summary.storage_cost,
4721 gas_cost_summary.computation_cost,
4722 gas_cost_summary.computation_cost_burned,
4723 gas_cost_summary.storage_rebate,
4724 gas_cost_summary.non_refundable_storage_fee,
4725 epoch_start_timestamp_ms,
4726 next_epoch_system_package_bytes,
4727 ));
4728 } else {
4729 txns.push(EndOfEpochTransactionKind::new_change_epoch(
4730 next_epoch,
4731 next_epoch_protocol_version,
4732 gas_cost_summary.storage_cost,
4733 gas_cost_summary.computation_cost,
4734 gas_cost_summary.storage_rebate,
4735 gas_cost_summary.non_refundable_storage_fee,
4736 epoch_start_timestamp_ms,
4737 next_epoch_system_package_bytes,
4738 ));
4739 }
4740
4741 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4742
4743 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4744 tx.clone(),
4745 epoch_store.epoch(),
4746 checkpoint,
4747 );
4748
4749 let tx_digest = executable_tx.digest();
4750
4751 info!(
4752 ?next_epoch,
4753 ?next_epoch_protocol_version,
4754 ?next_epoch_system_packages,
4755 computation_cost=?gas_cost_summary.computation_cost,
4756 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4757 storage_cost=?gas_cost_summary.storage_cost,
4758 storage_rebate=?gas_cost_summary.storage_rebate,
4759 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4760 ?tx_digest,
4761 "Creating advance epoch transaction"
4762 );
4763
4764 fail_point_async!("change_epoch_tx_delay");
4765 let tx_lock = epoch_store.acquire_tx_lock(tx_digest).await;
4766
4767 if self
4771 .get_transaction_cache_reader()
4772 .is_tx_already_executed(tx_digest)
4773 .expect("read cannot fail")
4774 {
4775 warn!("change epoch tx has already been executed via state sync");
4776 bail!("change epoch tx has already been executed via state sync",);
4777 }
4778
4779 let execution_guard = self
4780 .execution_lock_for_executable_transaction(&executable_tx)
4781 .await?;
4782
4783 epoch_store
4787 .assign_shared_object_versions_idempotent(
4788 self.get_object_cache_reader().as_ref(),
4789 &[executable_tx.clone()],
4790 )
4791 .await?;
4792
4793 let input_objects =
4794 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4795
4796 let (temporary_store, effects, _execution_error_opt) =
4797 self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4798 let system_obj = get_iota_system_state(&temporary_store.written)
4799 .expect("change epoch tx must write to system object");
4800 let system_epoch_info_event = temporary_store
4802 .events
4803 .data
4804 .into_iter()
4805 .find(|event| event.is_system_epoch_info_event())
4806 .map(SystemEpochInfoEvent::from);
4807 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4810
4811 self.get_state_sync_store()
4815 .insert_transaction_and_effects(&tx, &effects)
4816 .map_err(|err| {
4817 let err: anyhow::Error = err.into();
4818 err
4819 })?;
4820
4821 info!(
4822 "Effects summary of the change epoch transaction: {:?}",
4823 effects.summary_for_debug()
4824 );
4825 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
4826 assert!(effects.status().is_ok());
4828 Ok((system_obj, system_epoch_info_event, effects))
4829 }
4830
4831 #[instrument(level = "error", skip_all)]
4835 async fn revert_uncommitted_epoch_transactions(
4836 &self,
4837 epoch_store: &AuthorityPerEpochStore,
4838 ) -> IotaResult {
4839 {
4840 let state = epoch_store.get_reconfig_state_write_lock_guard();
4841 if state.should_accept_user_certs() {
4842 epoch_store.close_user_certs(state);
4851 }
4852 }
4854 let pending_certificates = epoch_store.pending_consensus_certificates();
4855 info!(
4856 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
4857 pending_certificates.len(),
4858 pending_certificates,
4859 );
4860 for digest in pending_certificates {
4861 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
4862 info!(
4863 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
4864 digest
4865 );
4866 continue;
4867 }
4868 info!("Reverting {:?} at the end of epoch", digest);
4869 epoch_store.revert_executed_transaction(&digest)?;
4870 self.get_reconfig_api().revert_state_update(&digest)?;
4871 }
4872 info!("All uncommitted local transactions reverted");
4873 Ok(())
4874 }
4875
4876 #[instrument(level = "error", skip_all)]
4877 async fn reopen_epoch_db(
4878 &self,
4879 cur_epoch_store: &AuthorityPerEpochStore,
4880 new_committee: Committee,
4881 epoch_start_configuration: EpochStartConfiguration,
4882 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4883 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
4884 let new_epoch = new_committee.epoch;
4885 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
4886 assert_eq!(
4887 epoch_start_configuration.epoch_start_state().epoch(),
4888 new_committee.epoch
4889 );
4890 fail_point!("before-open-new-epoch-store");
4891 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
4892 self.name,
4893 new_committee,
4894 epoch_start_configuration,
4895 self.get_backing_package_store().clone(),
4896 self.get_object_store().clone(),
4897 expensive_safety_check_config,
4898 cur_epoch_store.get_chain_identifier(),
4899 );
4900 self.epoch_store.store(new_epoch_store.clone());
4901 Ok(new_epoch_store)
4902 }
4903
4904 #[cfg(test)]
4905 pub(crate) fn iter_live_object_set_for_testing(
4906 &self,
4907 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
4908 self.get_accumulator_store()
4909 .iter_cached_live_object_set_for_testing()
4910 }
4911
4912 #[cfg(test)]
4913 pub(crate) fn shutdown_execution_for_test(&self) {
4914 self.tx_execution_shutdown
4915 .lock()
4916 .take()
4917 .unwrap()
4918 .send(())
4919 .unwrap();
4920 }
4921
4922 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> IotaResult {
4925 self.get_reconfig_api()
4926 .bulk_insert_genesis_objects(objects)?;
4927 self.get_object_cache_reader()
4928 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
4929 self.get_reconfig_api()
4930 .clear_state_end_of_epoch(&self.execution_lock_for_reconfiguration().await);
4931 Ok(())
4932 }
4933}
4934
4935pub struct RandomnessRoundReceiver {
4936 authority_state: Arc<AuthorityState>,
4937 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
4938}
4939
4940impl RandomnessRoundReceiver {
4941 pub fn spawn(
4942 authority_state: Arc<AuthorityState>,
4943 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
4944 ) -> JoinHandle<()> {
4945 let rrr = RandomnessRoundReceiver {
4946 authority_state,
4947 randomness_rx,
4948 };
4949 spawn_monitored_task!(rrr.run())
4950 }
4951
4952 async fn run(mut self) {
4953 info!("RandomnessRoundReceiver event loop started");
4954
4955 loop {
4956 tokio::select! {
4957 maybe_recv = self.randomness_rx.recv() => {
4958 if let Some((epoch, round, bytes)) = maybe_recv {
4959 self.handle_new_randomness(epoch, round, bytes);
4960 } else {
4961 break;
4962 }
4963 },
4964 }
4965 }
4966
4967 info!("RandomnessRoundReceiver event loop ended");
4968 }
4969
4970 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
4971 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
4972 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
4973 if epoch_store.epoch() != epoch {
4974 warn!(
4975 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
4976 epoch_store.epoch()
4977 );
4978 return;
4979 }
4980 let transaction = VerifiedTransaction::new_randomness_state_update(
4981 epoch,
4982 round,
4983 bytes,
4984 epoch_store
4985 .epoch_start_config()
4986 .randomness_obj_initial_shared_version(),
4987 );
4988 debug!(
4989 "created randomness state update transaction with digest: {:?}",
4990 transaction.digest()
4991 );
4992 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
4993 let digest = *transaction.digest();
4994
4995 self.authority_state
4997 .transaction_manager()
4998 .enqueue(vec![transaction], &epoch_store);
4999
5000 let authority_state = self.authority_state.clone();
5001 spawn_monitored_task!(async move {
5002 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
5011 let result = tokio::time::timeout(
5012 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
5013 authority_state
5014 .get_transaction_cache_reader()
5015 .notify_read_executed_effects(&[digest]),
5016 )
5017 .await;
5018 let result = match result {
5019 Ok(result) => result,
5020 Err(_) => {
5021 if cfg!(debug_assertions) {
5022 panic!(
5024 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5025 );
5026 }
5027 warn!(
5028 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
5029 );
5030 authority_state
5032 .get_transaction_cache_reader()
5033 .notify_read_executed_effects(&[digest])
5034 .await
5035 }
5036 };
5037
5038 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
5039 let effects = effects.pop().expect("should return effects");
5040 if *effects.status() != ExecutionStatus::Success {
5041 panic!(
5042 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
5043 );
5044 }
5045 debug!(
5046 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
5047 );
5048 });
5049 }
5050}
5051
5052#[async_trait]
5053impl TransactionKeyValueStoreTrait for AuthorityState {
5054 async fn multi_get(
5055 &self,
5056 transaction_keys: &[TransactionDigest],
5057 effects_keys: &[TransactionDigest],
5058 ) -> IotaResult<KVStoreTransactionData> {
5059 let txns = if !transaction_keys.is_empty() {
5060 self.get_transaction_cache_reader()
5061 .multi_get_transaction_blocks(transaction_keys)?
5062 .into_iter()
5063 .map(|t| t.map(|t| (*t).clone().into_inner()))
5064 .collect()
5065 } else {
5066 vec![]
5067 };
5068
5069 let fx = if !effects_keys.is_empty() {
5070 self.get_transaction_cache_reader()
5071 .multi_get_executed_effects(effects_keys)?
5072 } else {
5073 vec![]
5074 };
5075
5076 Ok((txns, fx))
5077 }
5078
5079 async fn multi_get_checkpoints(
5080 &self,
5081 checkpoint_summaries: &[CheckpointSequenceNumber],
5082 checkpoint_contents: &[CheckpointSequenceNumber],
5083 checkpoint_summaries_by_digest: &[CheckpointDigest],
5084 ) -> IotaResult<(
5085 Vec<Option<CertifiedCheckpointSummary>>,
5086 Vec<Option<CheckpointContents>>,
5087 Vec<Option<CertifiedCheckpointSummary>>,
5088 )> {
5089 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5091 let store = self.get_checkpoint_store();
5092 for seq in checkpoint_summaries {
5093 let checkpoint = store
5094 .get_checkpoint_by_sequence_number(*seq)?
5095 .map(|c| c.into_inner());
5096
5097 summaries.push(checkpoint);
5098 }
5099
5100 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5101 for seq in checkpoint_contents {
5102 let checkpoint = store
5103 .get_checkpoint_by_sequence_number(*seq)?
5104 .and_then(|summary| {
5105 store
5106 .get_checkpoint_contents(&summary.content_digest)
5107 .expect("db read cannot fail")
5108 });
5109 contents.push(checkpoint);
5110 }
5111
5112 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5113 for digest in checkpoint_summaries_by_digest {
5114 let checkpoint = store
5115 .get_checkpoint_by_digest(digest)?
5116 .map(|c| c.into_inner());
5117 summaries_by_digest.push(checkpoint);
5118 }
5119
5120 Ok((summaries, contents, summaries_by_digest))
5121 }
5122
5123 async fn get_transaction_perpetual_checkpoint(
5124 &self,
5125 digest: TransactionDigest,
5126 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5127 self.get_checkpoint_cache()
5128 .get_transaction_perpetual_checkpoint(&digest)
5129 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5130 }
5131
5132 async fn get_object(
5133 &self,
5134 object_id: ObjectID,
5135 version: VersionNumber,
5136 ) -> IotaResult<Option<Object>> {
5137 self.get_object_cache_reader()
5138 .get_object_by_key(&object_id, version)
5139 }
5140
5141 async fn multi_get_transactions_perpetual_checkpoints(
5142 &self,
5143 digests: &[TransactionDigest],
5144 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5145 let res = self
5146 .get_checkpoint_cache()
5147 .multi_get_transactions_perpetual_checkpoints(digests)?;
5148
5149 Ok(res
5150 .into_iter()
5151 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5152 .collect())
5153 }
5154
5155 #[instrument(skip(self))]
5156 async fn multi_get_events_by_tx_digests(
5157 &self,
5158 digests: &[TransactionDigest],
5159 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5160 if digests.is_empty() {
5161 return Ok(vec![]);
5162 }
5163 let events_digests: Vec<_> = self
5164 .get_transaction_cache_reader()
5165 .multi_get_executed_effects(digests)?
5166 .into_iter()
5167 .map(|t| t.and_then(|t| t.events_digest().cloned()))
5168 .collect();
5169 let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5170 let mut events = self
5171 .get_transaction_cache_reader()
5172 .multi_get_events(&non_empty_events)?
5173 .into_iter();
5174 Ok(events_digests
5175 .into_iter()
5176 .map(|ev| ev.and_then(|_| events.next()?))
5177 .collect())
5178 }
5179}
5180
5181#[cfg(msim)]
5182pub mod framework_injection {
5183 use std::{
5184 cell::RefCell,
5185 collections::{BTreeMap, BTreeSet},
5186 };
5187
5188 use iota_framework::{BuiltInFramework, SystemPackage};
5189 use iota_types::{
5190 base_types::{AuthorityName, ObjectID},
5191 is_system_package,
5192 };
5193 use move_binary_format::CompiledModule;
5194
5195 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5196
5197 thread_local! {
5199 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5200 }
5201
5202 type Framework = Vec<CompiledModule>;
5203
5204 pub type PackageUpgradeCallback =
5205 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5206
5207 enum PackageOverrideConfig {
5208 Global(Framework),
5209 PerValidator(PackageUpgradeCallback),
5210 }
5211
5212 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5213 modules
5214 .iter()
5215 .map(|m| {
5216 let mut buf = Vec::new();
5217 m.serialize_with_version(m.version, &mut buf).unwrap();
5218 buf
5219 })
5220 .collect()
5221 }
5222
5223 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5224 OVERRIDE.with(|bs| {
5225 bs.borrow_mut()
5226 .insert(package_id, PackageOverrideConfig::Global(modules))
5227 });
5228 }
5229
5230 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5231 OVERRIDE.with(|bs| {
5232 bs.borrow_mut()
5233 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5234 });
5235 }
5236
5237 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5238 OVERRIDE.with(|cfg| {
5239 cfg.borrow().get(package_id).and_then(|entry| match entry {
5240 PackageOverrideConfig::Global(framework) => {
5241 Some(compiled_modules_to_bytes(framework))
5242 }
5243 PackageOverrideConfig::PerValidator(func) => {
5244 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5245 }
5246 })
5247 })
5248 }
5249
5250 pub fn get_override_modules(
5251 package_id: &ObjectID,
5252 name: AuthorityName,
5253 ) -> Option<Vec<CompiledModule>> {
5254 OVERRIDE.with(|cfg| {
5255 cfg.borrow().get(package_id).and_then(|entry| match entry {
5256 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5257 PackageOverrideConfig::PerValidator(func) => func(name),
5258 })
5259 })
5260 }
5261
5262 pub fn get_override_system_package(
5263 package_id: &ObjectID,
5264 name: AuthorityName,
5265 ) -> Option<SystemPackage> {
5266 let bytes = get_override_bytes(package_id, name)?;
5267 let dependencies = if is_system_package(*package_id) {
5268 BuiltInFramework::get_package_by_id(package_id)
5269 .dependencies
5270 .to_vec()
5271 } else {
5272 BuiltInFramework::all_package_ids()
5275 };
5276 Some(SystemPackage {
5277 id: *package_id,
5278 bytes,
5279 dependencies,
5280 })
5281 }
5282
5283 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5284 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5285 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5286 cfg.borrow()
5287 .keys()
5288 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5289 .collect()
5290 });
5291
5292 extra
5293 .into_iter()
5294 .map(|package| SystemPackage {
5295 id: package,
5296 bytes: get_override_bytes(&package, name).unwrap(),
5297 dependencies: BuiltInFramework::all_package_ids(),
5298 })
5299 .collect()
5300 }
5301}
5302
5303#[derive(Debug, Serialize, Deserialize, Clone)]
5304pub struct ObjDumpFormat {
5305 pub id: ObjectID,
5306 pub version: VersionNumber,
5307 pub digest: ObjectDigest,
5308 pub object: Object,
5309}
5310
5311impl ObjDumpFormat {
5312 fn new(object: Object) -> Self {
5313 let oref = object.compute_object_reference();
5314 Self {
5315 id: oref.0,
5316 version: oref.1,
5317 digest: oref.2,
5318 object,
5319 }
5320 }
5321}
5322
5323#[derive(Debug, Serialize, Deserialize, Clone)]
5324pub struct NodeStateDump {
5325 pub tx_digest: TransactionDigest,
5326 pub sender_signed_data: SenderSignedData,
5327 pub executed_epoch: u64,
5328 pub reference_gas_price: u64,
5329 pub protocol_version: u64,
5330 pub epoch_start_timestamp_ms: u64,
5331 pub computed_effects: TransactionEffects,
5332 pub expected_effects_digest: TransactionEffectsDigest,
5333 pub relevant_system_packages: Vec<ObjDumpFormat>,
5334 pub shared_objects: Vec<ObjDumpFormat>,
5335 pub loaded_child_objects: Vec<ObjDumpFormat>,
5336 pub modified_at_versions: Vec<ObjDumpFormat>,
5337 pub runtime_reads: Vec<ObjDumpFormat>,
5338 pub input_objects: Vec<ObjDumpFormat>,
5339}
5340
5341impl NodeStateDump {
5342 pub fn new(
5343 tx_digest: &TransactionDigest,
5344 effects: &TransactionEffects,
5345 expected_effects_digest: TransactionEffectsDigest,
5346 object_store: &dyn ObjectStore,
5347 epoch_store: &Arc<AuthorityPerEpochStore>,
5348 inner_temporary_store: &InnerTemporaryStore,
5349 certificate: &VerifiedExecutableTransaction,
5350 ) -> IotaResult<Self> {
5351 let executed_epoch = epoch_store.epoch();
5353 let reference_gas_price = epoch_store.reference_gas_price();
5354 let epoch_start_config = epoch_store.epoch_start_config();
5355 let protocol_version = epoch_store.protocol_version().as_u64();
5356 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5357
5358 let mut relevant_system_packages = Vec::new();
5360 for sys_package_id in BuiltInFramework::all_package_ids() {
5361 if let Some(w) = object_store.get_object(&sys_package_id)? {
5362 relevant_system_packages.push(ObjDumpFormat::new(w))
5363 }
5364 }
5365
5366 let mut shared_objects = Vec::new();
5368 for kind in effects.input_shared_objects() {
5369 match kind {
5370 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5371 if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1)? {
5372 shared_objects.push(ObjDumpFormat::new(w))
5373 }
5374 }
5375 InputSharedObject::ReadDeleted(..)
5376 | InputSharedObject::MutateDeleted(..)
5377 | InputSharedObject::Cancelled(..) => (), }
5380 }
5381
5382 let mut loaded_child_objects = Vec::new();
5385 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5386 if let Some(w) = object_store.get_object_by_key(id, meta.version)? {
5387 loaded_child_objects.push(ObjDumpFormat::new(w))
5388 }
5389 }
5390
5391 let mut modified_at_versions = Vec::new();
5393 for (id, ver) in effects.modified_at_versions() {
5394 if let Some(w) = object_store.get_object_by_key(&id, ver)? {
5395 modified_at_versions.push(ObjDumpFormat::new(w))
5396 }
5397 }
5398
5399 let mut runtime_reads = Vec::new();
5403 for obj in inner_temporary_store
5404 .runtime_packages_loaded_from_db
5405 .values()
5406 {
5407 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5408 }
5409
5410 Ok(Self {
5413 tx_digest: *tx_digest,
5414 executed_epoch,
5415 reference_gas_price,
5416 epoch_start_timestamp_ms,
5417 protocol_version,
5418 relevant_system_packages,
5419 shared_objects,
5420 loaded_child_objects,
5421 modified_at_versions,
5422 runtime_reads,
5423 sender_signed_data: certificate.clone().into_message(),
5424 input_objects: inner_temporary_store
5425 .input_objects
5426 .values()
5427 .map(|o| ObjDumpFormat::new(o.clone()))
5428 .collect(),
5429 computed_effects: effects.clone(),
5430 expected_effects_digest,
5431 })
5432 }
5433
5434 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5435 let mut objects = Vec::new();
5436 objects.extend(self.relevant_system_packages.clone());
5437 objects.extend(self.shared_objects.clone());
5438 objects.extend(self.loaded_child_objects.clone());
5439 objects.extend(self.modified_at_versions.clone());
5440 objects.extend(self.runtime_reads.clone());
5441 objects.extend(self.input_objects.clone());
5442 objects
5443 }
5444
5445 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5446 let file_name = format!(
5447 "{}_{}_NODE_DUMP.json",
5448 self.tx_digest,
5449 AuthorityState::unixtime_now_ms()
5450 );
5451 let mut path = path.to_path_buf();
5452 path.push(&file_name);
5453 let mut file = File::create(path.clone())?;
5454 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5455 Ok(path)
5456 }
5457
5458 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5459 let file = File::open(path)?;
5460 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5461 }
5462}