1use std::{
7 collections::{BTreeMap, HashMap, HashSet},
8 fs,
9 fs::File,
10 io::Write,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, atomic::Ordering},
14 time::Duration,
15 vec,
16};
17
18use anyhow::anyhow;
19use arc_swap::{ArcSwap, Guard};
20use async_trait::async_trait;
21use authority_per_epoch_store::CertLockGuard;
22pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
23use chrono::prelude::*;
24use fastcrypto::{
25 encoding::{Base58, Encoding},
26 hash::MultisetHash,
27};
28use iota_archival::reader::ArchiveReaderBalancer;
29use iota_config::{
30 NodeConfig,
31 genesis::Genesis,
32 node::{
33 AuthorityOverloadConfig, DBCheckpointConfig, ExpensiveSafetyCheckConfig,
34 StateDebugDumpConfig,
35 },
36};
37use iota_framework::{BuiltInFramework, SystemPackage};
38use iota_json_rpc_types::{
39 DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent, IotaMoveValue,
40 IotaObjectDataFilter, IotaTransactionBlockData, IotaTransactionBlockEffects,
41 IotaTransactionBlockEvents, TransactionFilter,
42};
43use iota_macros::{fail_point, fail_point_async, fail_point_if};
44use iota_metrics::{
45 TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, monitored_scope, spawn_monitored_task,
46};
47use iota_storage::{
48 IndexStore,
49 indexes::{CoinInfo, ObjectIndexChanges},
50 key_value_store::{
51 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
52 },
53 key_value_store_metrics::KeyValueStoreMetrics,
54};
55#[cfg(msim)]
56use iota_types::committee::CommitteeTrait;
57use iota_types::{
58 IOTA_SYSTEM_ADDRESS, TypeTag,
59 authenticator_state::get_authenticator_state,
60 base_types::*,
61 committee::{Committee, EpochId, ProtocolVersion},
62 crypto::{AuthoritySignInfo, AuthoritySignature, RandomnessRound, Signer, default_hash},
63 deny_list_v1::check_coin_deny_list_v1_during_signing,
64 digests::{ChainIdentifier, TransactionEventsDigest},
65 dynamic_field::{DynamicFieldInfo, DynamicFieldName, visitor as DFV},
66 effects::{
67 InputSharedObject, SignedTransactionEffects, TransactionEffects, TransactionEffectsAPI,
68 TransactionEvents, VerifiedCertifiedTransactionEffects, VerifiedSignedTransactionEffects,
69 },
70 error::{ExecutionError, IotaError, IotaResult, UserInputError},
71 event::{Event, EventID, SystemEpochInfoEvent},
72 executable_transaction::VerifiedExecutableTransaction,
73 execution_config_utils::to_binary_config,
74 execution_status::ExecutionStatus,
75 fp_ensure,
76 gas::{GasCostSummary, IotaGasStatus},
77 inner_temporary_store::{
78 InnerTemporaryStore, ObjectMap, PackageStoreWithFallback, TemporaryModuleResolver, TxCoins,
79 WrittenObjects,
80 },
81 iota_system_state::{
82 IotaSystemState, IotaSystemStateTrait,
83 epoch_start_iota_system_state::EpochStartSystemStateTrait, get_iota_system_state,
84 },
85 is_system_package,
86 layout_resolver::{LayoutResolver, into_struct_layout},
87 message_envelope::Message,
88 messages_checkpoint::{
89 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents,
90 CheckpointContentsDigest, CheckpointDigest, CheckpointRequest, CheckpointResponse,
91 CheckpointSequenceNumber, CheckpointSummary, CheckpointSummaryResponse,
92 CheckpointTimestamp, ECMHLiveObjectSetDigest, VerifiedCheckpoint,
93 },
94 messages_consensus::AuthorityCapabilitiesV1,
95 messages_grpc::{
96 HandleTransactionResponse, LayoutGenerationOption, ObjectInfoRequest,
97 ObjectInfoRequestKind, ObjectInfoResponse, TransactionInfoRequest, TransactionInfoResponse,
98 TransactionStatus,
99 },
100 metrics::{BytecodeVerifierMetrics, LimitsMetrics},
101 object::{
102 MoveObject, OBJECT_START_VERSION, Object, ObjectRead, Owner, PastObjectRead,
103 bounded_visitor::BoundedVisitor,
104 },
105 storage::{
106 BackingPackageStore, BackingStore, ObjectKey, ObjectOrTombstone, ObjectStore, WriteKind,
107 },
108 supported_protocol_versions::{ProtocolConfig, SupportedProtocolVersions},
109 transaction::*,
110};
111use itertools::Itertools;
112use move_binary_format::{CompiledModule, binary_config::BinaryConfig};
113use move_core_types::{annotated_value::MoveStructLayout, language_storage::ModuleId};
114use once_cell::sync::OnceCell;
115use parking_lot::Mutex;
116use prometheus::{
117 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
118 register_histogram_vec_with_registry, register_histogram_with_registry,
119 register_int_counter_vec_with_registry, register_int_counter_with_registry,
120 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
121};
122use serde::{Deserialize, Serialize, de::DeserializeOwned};
123use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
124use tap::{TapFallible, TapOptional};
125use tokio::{
126 sync::{RwLock, mpsc, mpsc::unbounded_channel, oneshot},
127 task::JoinHandle,
128};
129use tracing::{Instrument, debug, error, info, instrument, warn};
130use typed_store::TypedStoreError;
131
132use self::{
133 authority_store::ExecutionLockWriteGuard, authority_store_pruner::AuthorityStorePruningMetrics,
134};
135#[cfg(msim)]
136pub use crate::checkpoints::checkpoint_executor::{
137 CheckpointTimeoutConfig, init_checkpoint_timeout_config,
138};
139use crate::{
140 authority::{
141 authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard},
142 authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner,
143 authority_store::{ExecutionLockReadGuard, ObjectLockStatus},
144 authority_store_pruner::{AuthorityStorePruner, EPOCH_DURATION_MS_FOR_TESTING},
145 epoch_start_configuration::{EpochStartConfigTrait, EpochStartConfiguration},
146 },
147 authority_client::NetworkAuthorityClient,
148 checkpoints::CheckpointStore,
149 consensus_adapter::ConsensusAdapter,
150 epoch::committee_store::CommitteeStore,
151 execution_cache::{
152 CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI,
153 ExecutionCacheTraitPointers, ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI,
154 TransactionCacheRead,
155 },
156 execution_driver::execution_process,
157 metrics::{LatencyObserver, RateTracker},
158 module_cache_metrics::ResolverMetrics,
159 overload_monitor::{AuthorityOverloadInfo, overload_monitor_accept_tx},
160 rest_index::RestIndexStore,
161 stake_aggregator::StakeAggregator,
162 state_accumulator::{AccumulatorStore, StateAccumulator},
163 subscription_handler::SubscriptionHandler,
164 transaction_input_loader::TransactionInputLoader,
165 transaction_manager::TransactionManager,
166 transaction_outputs::TransactionOutputs,
167 validator_tx_finalizer::ValidatorTxFinalizer,
168 verify_indexes::verify_indexes,
169};
170
171#[cfg(test)]
172#[path = "unit_tests/authority_tests.rs"]
173pub mod authority_tests;
174
175#[cfg(test)]
176#[path = "unit_tests/transaction_tests.rs"]
177pub mod transaction_tests;
178
179#[cfg(test)]
180#[path = "unit_tests/batch_transaction_tests.rs"]
181mod batch_transaction_tests;
182
183#[cfg(test)]
184#[path = "unit_tests/move_integration_tests.rs"]
185pub mod move_integration_tests;
186
187#[cfg(test)]
188#[path = "unit_tests/gas_tests.rs"]
189mod gas_tests;
190
191#[cfg(test)]
192#[path = "unit_tests/batch_verification_tests.rs"]
193mod batch_verification_tests;
194
195#[cfg(test)]
196#[path = "unit_tests/coin_deny_list_tests.rs"]
197mod coin_deny_list_tests;
198
199#[cfg(any(test, feature = "test-utils"))]
200pub mod authority_test_utils;
201
202pub mod authority_per_epoch_store;
203pub mod authority_per_epoch_store_pruner;
204
205pub mod authority_store_pruner;
206pub mod authority_store_tables;
207pub mod authority_store_types;
208pub mod epoch_start_configuration;
209pub mod shared_object_congestion_tracker;
210pub mod shared_object_version_manager;
211#[cfg(any(test, feature = "test-utils"))]
212pub mod test_authority_builder;
213pub mod transaction_deferral;
214
215pub(crate) mod authority_store;
216
217pub static CHAIN_IDENTIFIER: OnceCell<ChainIdentifier> = OnceCell::new();
218
219pub struct AuthorityMetrics {
221 tx_orders: IntCounter,
222 total_certs: IntCounter,
223 total_cert_attempts: IntCounter,
224 total_effects: IntCounter,
225 pub shared_obj_tx: IntCounter,
226 sponsored_tx: IntCounter,
227 tx_already_processed: IntCounter,
228 num_input_objs: Histogram,
229 num_shared_objects: Histogram,
230 batch_size: Histogram,
231
232 authority_state_handle_transaction_latency: Histogram,
233
234 execute_certificate_latency_single_writer: Histogram,
235 execute_certificate_latency_shared_object: Histogram,
236
237 execute_certificate_with_effects_latency: Histogram,
238 internal_execution_latency: Histogram,
239 execution_load_input_objects_latency: Histogram,
240 prepare_certificate_latency: Histogram,
241 commit_certificate_latency: Histogram,
242 db_checkpoint_latency: Histogram,
243
244 pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
245 pub(crate) transaction_manager_num_missing_objects: IntGauge,
246 pub(crate) transaction_manager_num_pending_certificates: IntGauge,
247 pub(crate) transaction_manager_num_executing_certificates: IntGauge,
248 pub(crate) transaction_manager_num_ready: IntGauge,
249 pub(crate) transaction_manager_object_cache_size: IntGauge,
250 pub(crate) transaction_manager_object_cache_hits: IntCounter,
251 pub(crate) transaction_manager_object_cache_misses: IntCounter,
252 pub(crate) transaction_manager_object_cache_evictions: IntCounter,
253 pub(crate) transaction_manager_package_cache_size: IntGauge,
254 pub(crate) transaction_manager_package_cache_hits: IntCounter,
255 pub(crate) transaction_manager_package_cache_misses: IntCounter,
256 pub(crate) transaction_manager_package_cache_evictions: IntCounter,
257 pub(crate) transaction_manager_transaction_queue_age_s: Histogram,
258
259 pub(crate) execution_driver_executed_transactions: IntCounter,
260 pub(crate) execution_driver_dispatch_queue: IntGauge,
261 pub(crate) execution_queueing_delay_s: Histogram,
262 pub(crate) prepare_cert_gas_latency_ratio: Histogram,
263 pub(crate) execution_gas_latency_ratio: Histogram,
264
265 pub(crate) skipped_consensus_txns: IntCounter,
266 pub(crate) skipped_consensus_txns_cache_hit: IntCounter,
267
268 pub(crate) authority_overload_status: IntGauge,
269 pub(crate) authority_load_shedding_percentage: IntGauge,
270
271 pub(crate) transaction_overload_sources: IntCounterVec,
272
273 post_processing_total_events_emitted: IntCounter,
275 post_processing_total_tx_indexed: IntCounter,
276 post_processing_total_tx_had_event_processed: IntCounter,
277 post_processing_total_failures: IntCounter,
278
279 pub consensus_handler_processed: IntCounterVec,
281 pub consensus_handler_transaction_sizes: HistogramVec,
282 pub consensus_handler_num_low_scoring_authorities: IntGauge,
283 pub consensus_handler_scores: IntGaugeVec,
284 pub consensus_handler_deferred_transactions: IntCounter,
285 pub consensus_handler_congested_transactions: IntCounter,
286 pub consensus_handler_cancelled_transactions: IntCounter,
287 pub consensus_handler_max_object_costs: IntGaugeVec,
288 pub consensus_committed_subdags: IntCounterVec,
289 pub consensus_committed_messages: IntGaugeVec,
290 pub consensus_committed_user_transactions: IntGaugeVec,
291 pub consensus_calculated_throughput: IntGauge,
292 pub consensus_calculated_throughput_profile: IntGauge,
293
294 pub limits_metrics: Arc<LimitsMetrics>,
295
296 pub bytecode_verifier_metrics: Arc<BytecodeVerifierMetrics>,
298
299 pub authenticator_state_update_failed: IntCounter,
300
301 pub zklogin_sig_count: IntCounter,
303 pub multisig_sig_count: IntCounter,
305
306 pub execution_queueing_latency: LatencyObserver,
309
310 pub txn_ready_rate_tracker: Arc<Mutex<RateTracker>>,
317
318 pub execution_rate_tracker: Arc<Mutex<RateTracker>>,
321}
322
323const POSITIVE_INT_BUCKETS: &[f64] = &[
325 1., 2., 5., 7., 10., 20., 50., 70., 100., 200., 500., 700., 1000., 2000., 5000., 7000., 10000.,
326 20000., 50000., 70000., 100000., 200000., 500000., 700000., 1000000., 2000000., 5000000.,
327 7000000., 10000000.,
328];
329
330const LATENCY_SEC_BUCKETS: &[f64] = &[
331 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1., 2., 3., 4., 5., 6., 7., 8., 9.,
332 10., 20., 30., 60., 90.,
333];
334
335const LOW_LATENCY_SEC_BUCKETS: &[f64] = &[
337 0.00001, 0.00002, 0.00005, 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
338 0.2, 0.5, 1., 2., 5., 10., 20., 50., 100.,
339];
340
341const GAS_LATENCY_RATIO_BUCKETS: &[f64] = &[
342 10.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0,
343 3000.0, 4000.0, 5000.0, 6000.0, 7000.0, 8000.0, 9000.0, 10000.0, 50000.0, 100000.0, 1000000.0,
344];
345
346pub const DEV_INSPECT_GAS_COIN_VALUE: u64 = 1_000_000_000_000;
347
348impl AuthorityMetrics {
349 pub fn new(registry: &prometheus::Registry) -> AuthorityMetrics {
350 let execute_certificate_latency = register_histogram_vec_with_registry!(
351 "authority_state_execute_certificate_latency",
352 "Latency of executing certificates, including waiting for inputs",
353 &["tx_type"],
354 LATENCY_SEC_BUCKETS.to_vec(),
355 registry,
356 )
357 .unwrap();
358
359 let execute_certificate_latency_single_writer =
360 execute_certificate_latency.with_label_values(&[TX_TYPE_SINGLE_WRITER_TX]);
361 let execute_certificate_latency_shared_object =
362 execute_certificate_latency.with_label_values(&[TX_TYPE_SHARED_OBJ_TX]);
363
364 Self {
365 tx_orders: register_int_counter_with_registry!(
366 "total_transaction_orders",
367 "Total number of transaction orders",
368 registry,
369 )
370 .unwrap(),
371 total_certs: register_int_counter_with_registry!(
372 "total_transaction_certificates",
373 "Total number of transaction certificates handled",
374 registry,
375 )
376 .unwrap(),
377 total_cert_attempts: register_int_counter_with_registry!(
378 "total_handle_certificate_attempts",
379 "Number of calls to handle_certificate",
380 registry,
381 )
382 .unwrap(),
383 total_effects: register_int_counter_with_registry!(
385 "total_transaction_effects",
386 "Total number of transaction effects produced",
387 registry,
388 )
389 .unwrap(),
390
391 shared_obj_tx: register_int_counter_with_registry!(
392 "num_shared_obj_tx",
393 "Number of transactions involving shared objects",
394 registry,
395 )
396 .unwrap(),
397
398 sponsored_tx: register_int_counter_with_registry!(
399 "num_sponsored_tx",
400 "Number of sponsored transactions",
401 registry,
402 )
403 .unwrap(),
404
405 tx_already_processed: register_int_counter_with_registry!(
406 "num_tx_already_processed",
407 "Number of transaction orders already processed previously",
408 registry,
409 )
410 .unwrap(),
411 num_input_objs: register_histogram_with_registry!(
412 "num_input_objects",
413 "Distribution of number of input TX objects per TX",
414 POSITIVE_INT_BUCKETS.to_vec(),
415 registry,
416 )
417 .unwrap(),
418 num_shared_objects: register_histogram_with_registry!(
419 "num_shared_objects",
420 "Number of shared input objects per TX",
421 POSITIVE_INT_BUCKETS.to_vec(),
422 registry,
423 )
424 .unwrap(),
425 batch_size: register_histogram_with_registry!(
426 "batch_size",
427 "Distribution of size of transaction batch",
428 POSITIVE_INT_BUCKETS.to_vec(),
429 registry,
430 )
431 .unwrap(),
432 authority_state_handle_transaction_latency: register_histogram_with_registry!(
433 "authority_state_handle_transaction_latency",
434 "Latency of handling transactions",
435 LATENCY_SEC_BUCKETS.to_vec(),
436 registry,
437 )
438 .unwrap(),
439 execute_certificate_latency_single_writer,
440 execute_certificate_latency_shared_object,
441 execute_certificate_with_effects_latency: register_histogram_with_registry!(
442 "authority_state_execute_certificate_with_effects_latency",
443 "Latency of executing certificates with effects, including waiting for inputs",
444 LATENCY_SEC_BUCKETS.to_vec(),
445 registry,
446 )
447 .unwrap(),
448 internal_execution_latency: register_histogram_with_registry!(
449 "authority_state_internal_execution_latency",
450 "Latency of actual certificate executions",
451 LATENCY_SEC_BUCKETS.to_vec(),
452 registry,
453 )
454 .unwrap(),
455 execution_load_input_objects_latency: register_histogram_with_registry!(
456 "authority_state_execution_load_input_objects_latency",
457 "Latency of loading input objects for execution",
458 LOW_LATENCY_SEC_BUCKETS.to_vec(),
459 registry,
460 )
461 .unwrap(),
462 prepare_certificate_latency: register_histogram_with_registry!(
463 "authority_state_prepare_certificate_latency",
464 "Latency of executing certificates, before committing the results",
465 LATENCY_SEC_BUCKETS.to_vec(),
466 registry,
467 )
468 .unwrap(),
469 commit_certificate_latency: register_histogram_with_registry!(
470 "authority_state_commit_certificate_latency",
471 "Latency of committing certificate execution results",
472 LATENCY_SEC_BUCKETS.to_vec(),
473 registry,
474 )
475 .unwrap(),
476 db_checkpoint_latency: register_histogram_with_registry!(
477 "db_checkpoint_latency",
478 "Latency of checkpointing dbs",
479 LATENCY_SEC_BUCKETS.to_vec(),
480 registry,
481 ).unwrap(),
482 transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
483 "transaction_manager_num_enqueued_certificates",
484 "Current number of certificates enqueued to TransactionManager",
485 &["result"],
486 registry,
487 )
488 .unwrap(),
489 transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
490 "transaction_manager_num_missing_objects",
491 "Current number of missing objects in TransactionManager",
492 registry,
493 )
494 .unwrap(),
495 transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
496 "transaction_manager_num_pending_certificates",
497 "Number of certificates pending in TransactionManager, with at least 1 missing input object",
498 registry,
499 )
500 .unwrap(),
501 transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
502 "transaction_manager_num_executing_certificates",
503 "Number of executing certificates, including queued and actually running certificates",
504 registry,
505 )
506 .unwrap(),
507 transaction_manager_num_ready: register_int_gauge_with_registry!(
508 "transaction_manager_num_ready",
509 "Number of ready transactions in TransactionManager",
510 registry,
511 )
512 .unwrap(),
513 transaction_manager_object_cache_size: register_int_gauge_with_registry!(
514 "transaction_manager_object_cache_size",
515 "Current size of object-availability cache in TransactionManager",
516 registry,
517 )
518 .unwrap(),
519 transaction_manager_object_cache_hits: register_int_counter_with_registry!(
520 "transaction_manager_object_cache_hits",
521 "Number of object-availability cache hits in TransactionManager",
522 registry,
523 )
524 .unwrap(),
525 authority_overload_status: register_int_gauge_with_registry!(
526 "authority_overload_status",
527 "Whether authority is current experiencing overload and enters load shedding mode.",
528 registry)
529 .unwrap(),
530 authority_load_shedding_percentage: register_int_gauge_with_registry!(
531 "authority_load_shedding_percentage",
532 "The percentage of transactions is shed when the authority is in load shedding mode.",
533 registry)
534 .unwrap(),
535 transaction_manager_object_cache_misses: register_int_counter_with_registry!(
536 "transaction_manager_object_cache_misses",
537 "Number of object-availability cache misses in TransactionManager",
538 registry,
539 )
540 .unwrap(),
541 transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
542 "transaction_manager_object_cache_evictions",
543 "Number of object-availability cache evictions in TransactionManager",
544 registry,
545 )
546 .unwrap(),
547 transaction_manager_package_cache_size: register_int_gauge_with_registry!(
548 "transaction_manager_package_cache_size",
549 "Current size of package-availability cache in TransactionManager",
550 registry,
551 )
552 .unwrap(),
553 transaction_manager_package_cache_hits: register_int_counter_with_registry!(
554 "transaction_manager_package_cache_hits",
555 "Number of package-availability cache hits in TransactionManager",
556 registry,
557 )
558 .unwrap(),
559 transaction_manager_package_cache_misses: register_int_counter_with_registry!(
560 "transaction_manager_package_cache_misses",
561 "Number of package-availability cache misses in TransactionManager",
562 registry,
563 )
564 .unwrap(),
565 transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
566 "transaction_manager_package_cache_evictions",
567 "Number of package-availability cache evictions in TransactionManager",
568 registry,
569 )
570 .unwrap(),
571 transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
572 "transaction_manager_transaction_queue_age_s",
573 "Time spent in waiting for transaction in the queue",
574 LATENCY_SEC_BUCKETS.to_vec(),
575 registry,
576 )
577 .unwrap(),
578 transaction_overload_sources: register_int_counter_vec_with_registry!(
579 "transaction_overload_sources",
580 "Number of times each source indicates transaction overload.",
581 &["source"],
582 registry)
583 .unwrap(),
584 execution_driver_executed_transactions: register_int_counter_with_registry!(
585 "execution_driver_executed_transactions",
586 "Cumulative number of transaction executed by execution driver",
587 registry,
588 )
589 .unwrap(),
590 execution_driver_dispatch_queue: register_int_gauge_with_registry!(
591 "execution_driver_dispatch_queue",
592 "Number of transaction pending in execution driver dispatch queue",
593 registry,
594 )
595 .unwrap(),
596 execution_queueing_delay_s: register_histogram_with_registry!(
597 "execution_queueing_delay_s",
598 "Queueing delay between a transaction is ready for execution until it starts executing.",
599 LATENCY_SEC_BUCKETS.to_vec(),
600 registry
601 )
602 .unwrap(),
603 prepare_cert_gas_latency_ratio: register_histogram_with_registry!(
604 "prepare_cert_gas_latency_ratio",
605 "The ratio of computation gas divided by VM execution latency.",
606 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
607 registry
608 )
609 .unwrap(),
610 execution_gas_latency_ratio: register_histogram_with_registry!(
611 "execution_gas_latency_ratio",
612 "The ratio of computation gas divided by certificate execution latency, include committing certificate.",
613 GAS_LATENCY_RATIO_BUCKETS.to_vec(),
614 registry
615 )
616 .unwrap(),
617 skipped_consensus_txns: register_int_counter_with_registry!(
618 "skipped_consensus_txns",
619 "Total number of consensus transactions skipped",
620 registry,
621 )
622 .unwrap(),
623 skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
624 "skipped_consensus_txns_cache_hit",
625 "Total number of consensus transactions skipped because of local cache hit",
626 registry,
627 )
628 .unwrap(),
629 post_processing_total_events_emitted: register_int_counter_with_registry!(
630 "post_processing_total_events_emitted",
631 "Total number of events emitted in post processing",
632 registry,
633 )
634 .unwrap(),
635 post_processing_total_tx_indexed: register_int_counter_with_registry!(
636 "post_processing_total_tx_indexed",
637 "Total number of txes indexed in post processing",
638 registry,
639 )
640 .unwrap(),
641 post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
642 "post_processing_total_tx_had_event_processed",
643 "Total number of txes finished event processing in post processing",
644 registry,
645 )
646 .unwrap(),
647 post_processing_total_failures: register_int_counter_with_registry!(
648 "post_processing_total_failures",
649 "Total number of failure in post processing",
650 registry,
651 )
652 .unwrap(),
653 consensus_handler_processed: register_int_counter_vec_with_registry!(
654 "consensus_handler_processed",
655 "Number of transactions processed by consensus handler",
656 &["class"],
657 registry
658 ).unwrap(),
659 consensus_handler_transaction_sizes: register_histogram_vec_with_registry!(
660 "consensus_handler_transaction_sizes",
661 "Sizes of each type of transactions processed by consensus handler",
662 &["class"],
663 POSITIVE_INT_BUCKETS.to_vec(),
664 registry
665 ).unwrap(),
666 consensus_handler_num_low_scoring_authorities: register_int_gauge_with_registry!(
667 "consensus_handler_num_low_scoring_authorities",
668 "Number of low scoring authorities based on reputation scores from consensus",
669 registry
670 ).unwrap(),
671 consensus_handler_scores: register_int_gauge_vec_with_registry!(
672 "consensus_handler_scores",
673 "scores from consensus for each authority",
674 &["authority"],
675 registry,
676 ).unwrap(),
677 consensus_handler_deferred_transactions: register_int_counter_with_registry!(
678 "consensus_handler_deferred_transactions",
679 "Number of transactions deferred by consensus handler",
680 registry,
681 ).unwrap(),
682 consensus_handler_congested_transactions: register_int_counter_with_registry!(
683 "consensus_handler_congested_transactions",
684 "Number of transactions deferred by consensus handler due to congestion",
685 registry,
686 ).unwrap(),
687 consensus_handler_cancelled_transactions: register_int_counter_with_registry!(
688 "consensus_handler_cancelled_transactions",
689 "Number of transactions cancelled by consensus handler",
690 registry,
691 ).unwrap(),
692 consensus_handler_max_object_costs: register_int_gauge_vec_with_registry!(
693 "consensus_handler_max_congestion_control_object_costs",
694 "Max object costs for congestion control in the current consensus commit",
695 &["commit_type"],
696 registry,
697 ).unwrap(),
698 consensus_committed_subdags: register_int_counter_vec_with_registry!(
699 "consensus_committed_subdags",
700 "Number of committed subdags, sliced by author",
701 &["authority"],
702 registry,
703 ).unwrap(),
704 consensus_committed_messages: register_int_gauge_vec_with_registry!(
705 "consensus_committed_messages",
706 "Total number of committed consensus messages, sliced by author",
707 &["authority"],
708 registry,
709 ).unwrap(),
710 consensus_committed_user_transactions: register_int_gauge_vec_with_registry!(
711 "consensus_committed_user_transactions",
712 "Number of committed user transactions, sliced by submitter",
713 &["authority"],
714 registry,
715 ).unwrap(),
716 limits_metrics: Arc::new(LimitsMetrics::new(registry)),
717 bytecode_verifier_metrics: Arc::new(BytecodeVerifierMetrics::new(registry)),
718 authenticator_state_update_failed: register_int_counter_with_registry!(
719 "authenticator_state_update_failed",
720 "Number of failed authenticator state updates",
721 registry,
722 )
723 .unwrap(),
724 zklogin_sig_count: register_int_counter_with_registry!(
725 "zklogin_sig_count",
726 "Count of zkLogin signatures",
727 registry,
728 )
729 .unwrap(),
730 multisig_sig_count: register_int_counter_with_registry!(
731 "multisig_sig_count",
732 "Count of zkLogin signatures",
733 registry,
734 )
735 .unwrap(),
736 consensus_calculated_throughput: register_int_gauge_with_registry!(
737 "consensus_calculated_throughput",
738 "The calculated throughput from consensus output. Result is calculated based on unique transactions.",
739 registry,
740 ).unwrap(),
741 consensus_calculated_throughput_profile: register_int_gauge_with_registry!(
742 "consensus_calculated_throughput_profile",
743 "The current active calculated throughput profile",
744 registry
745 ).unwrap(),
746 execution_queueing_latency: LatencyObserver::new(),
747 txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
748 execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))),
749 }
750 }
751
752 pub fn reset_on_reconfigure(&self) {
756 self.consensus_committed_messages.reset();
757 self.consensus_handler_scores.reset();
758 self.consensus_committed_user_transactions.reset();
759 }
760}
761
762pub type StableSyncAuthoritySigner = Pin<Arc<dyn Signer<AuthoritySignature> + Send + Sync>>;
769
770pub struct AuthorityState {
771 pub name: AuthorityName,
774 pub secret: StableSyncAuthoritySigner,
776
777 input_loader: TransactionInputLoader,
779 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
780
781 epoch_store: ArcSwap<AuthorityPerEpochStore>,
782
783 execution_lock: RwLock<EpochId>,
789
790 pub indexes: Option<Arc<IndexStore>>,
791 pub rest_index: Option<Arc<RestIndexStore>>,
792
793 pub subscription_handler: Arc<SubscriptionHandler>,
794 checkpoint_store: Arc<CheckpointStore>,
795
796 committee_store: Arc<CommitteeStore>,
797
798 transaction_manager: Arc<TransactionManager>,
800
801 #[cfg_attr(not(test), expect(unused))]
803 tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,
804
805 pub metrics: Arc<AuthorityMetrics>,
806 _pruner: AuthorityStorePruner,
807 _authority_per_epoch_pruner: AuthorityPerEpochStorePruner,
808
809 db_checkpoint_config: DBCheckpointConfig,
811
812 pub config: NodeConfig,
813
814 pub overload_info: AuthorityOverloadInfo,
816
817 pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
818}
819
820impl AuthorityState {
829 pub fn is_validator(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
830 epoch_store.committee().authority_exists(&self.name)
831 }
832
833 pub fn is_fullnode(&self, epoch_store: &AuthorityPerEpochStore) -> bool {
834 !self.is_validator(epoch_store)
835 }
836
837 pub fn committee_store(&self) -> &Arc<CommitteeStore> {
838 &self.committee_store
839 }
840
841 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
842 self.committee_store.clone()
843 }
844
845 pub fn overload_config(&self) -> &AuthorityOverloadConfig {
846 &self.config.authority_overload_config
847 }
848
849 pub fn get_epoch_state_commitments(
850 &self,
851 epoch: EpochId,
852 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
853 self.checkpoint_store.get_epoch_state_commitments(epoch)
854 }
855
856 #[instrument(level = "trace", skip_all)]
860 async fn handle_transaction_impl(
861 &self,
862 transaction: VerifiedTransaction,
863 epoch_store: &Arc<AuthorityPerEpochStore>,
864 ) -> IotaResult<VerifiedSignedTransaction> {
865 let _execution_lock = self.execution_lock_for_signing().await;
867
868 let tx_digest = transaction.digest();
869 let tx_data = transaction.data().transaction_data();
870
871 let input_object_kinds = tx_data.input_objects()?;
872 let receiving_objects_refs = tx_data.receiving_objects();
873
874 iota_transaction_checks::deny::check_transaction_for_signing(
878 tx_data,
879 transaction.tx_signatures(),
880 &input_object_kinds,
881 &receiving_objects_refs,
882 &self.config.transaction_deny_config,
883 self.get_backing_package_store().as_ref(),
884 )?;
885
886 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
887 Some(tx_digest),
888 &input_object_kinds,
889 &receiving_objects_refs,
890 epoch_store.epoch(),
891 )?;
892
893 let (_gas_status, checked_input_objects) =
894 iota_transaction_checks::check_transaction_input(
895 epoch_store.protocol_config(),
896 epoch_store.reference_gas_price(),
897 tx_data,
898 input_objects,
899 &receiving_objects,
900 &self.metrics.bytecode_verifier_metrics,
901 &self.config.verifier_signing_config,
902 )?;
903
904 check_coin_deny_list_v1_during_signing(
905 tx_data.sender(),
906 &checked_input_objects,
907 &receiving_objects,
908 &self.get_object_store(),
909 )?;
910
911 let owned_objects = checked_input_objects.inner().filter_owned_objects();
912
913 let signed_transaction = VerifiedSignedTransaction::new(
914 epoch_store.epoch(),
915 transaction,
916 self.name,
917 &*self.secret,
918 );
919
920 self.get_cache_writer()
925 .acquire_transaction_locks(epoch_store, &owned_objects, signed_transaction.clone())
926 .await?;
927
928 Ok(signed_transaction)
929 }
930
931 #[instrument(level = "trace", skip_all)]
933 pub async fn handle_transaction(
934 &self,
935 epoch_store: &Arc<AuthorityPerEpochStore>,
936 transaction: VerifiedTransaction,
937 ) -> IotaResult<HandleTransactionResponse> {
938 let tx_digest = *transaction.digest();
939 debug!("handle_transaction");
940
941 if let Some((_, status)) = self.get_transaction_status(&tx_digest, epoch_store)? {
943 return Ok(HandleTransactionResponse { status });
944 }
945
946 let _metrics_guard = self
947 .metrics
948 .authority_state_handle_transaction_latency
949 .start_timer();
950 self.metrics.tx_orders.inc();
951
952 let signed = self.handle_transaction_impl(transaction, epoch_store).await;
953 match signed {
954 Ok(s) => {
955 if self.is_validator(epoch_store) {
956 if let Some(validator_tx_finalizer) = &self.validator_tx_finalizer {
957 let tx = s.clone();
958 let validator_tx_finalizer = validator_tx_finalizer.clone();
959 let cache_reader = self.get_transaction_cache_reader().clone();
960 let epoch_store = epoch_store.clone();
961 spawn_monitored_task!(epoch_store.within_alive_epoch(
962 validator_tx_finalizer.track_signed_tx(cache_reader, tx)
963 ));
964 }
965 }
966 Ok(HandleTransactionResponse {
967 status: TransactionStatus::Signed(s.into_inner().into_sig()),
968 })
969 }
970 Err(err) => Ok(HandleTransactionResponse {
974 status: self
975 .get_transaction_status(&tx_digest, epoch_store)?
976 .ok_or(err)?
977 .1,
978 }),
979 }
980 }
981
982 pub fn check_system_overload_at_signing(&self) -> bool {
983 self.config
984 .authority_overload_config
985 .check_system_overload_at_signing
986 }
987
988 pub fn check_system_overload_at_execution(&self) -> bool {
989 self.config
990 .authority_overload_config
991 .check_system_overload_at_execution
992 }
993
994 pub(crate) fn check_system_overload(
995 &self,
996 consensus_adapter: &Arc<ConsensusAdapter>,
997 tx_data: &SenderSignedData,
998 do_authority_overload_check: bool,
999 ) -> IotaResult {
1000 if do_authority_overload_check {
1001 self.check_authority_overload(tx_data).tap_err(|_| {
1002 self.update_overload_metrics("execution_queue");
1003 })?;
1004 }
1005 self.transaction_manager
1006 .check_execution_overload(self.overload_config(), tx_data)
1007 .tap_err(|_| {
1008 self.update_overload_metrics("execution_pending");
1009 })?;
1010 consensus_adapter.check_consensus_overload().tap_err(|_| {
1011 self.update_overload_metrics("consensus");
1012 })?;
1013 Ok(())
1014 }
1015
1016 fn check_authority_overload(&self, tx_data: &SenderSignedData) -> IotaResult {
1017 if !self.overload_info.is_overload.load(Ordering::Relaxed) {
1018 return Ok(());
1019 }
1020
1021 let load_shedding_percentage = self
1022 .overload_info
1023 .load_shedding_percentage
1024 .load(Ordering::Relaxed);
1025 overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
1026 }
1027
1028 fn update_overload_metrics(&self, source: &str) {
1029 self.metrics
1030 .transaction_overload_sources
1031 .with_label_values(&[source])
1032 .inc();
1033 }
1034
1035 #[instrument(level = "trace", skip_all)]
1041 pub async fn fullnode_execute_certificate_with_effects(
1042 &self,
1043 transaction: &VerifiedExecutableTransaction,
1044 effects: &VerifiedCertifiedTransactionEffects,
1051 epoch_store: &Arc<AuthorityPerEpochStore>,
1052 ) -> IotaResult {
1053 assert!(self.is_fullnode(epoch_store));
1054 if self.epoch_store.load().epoch() != epoch_store.epoch() {
1059 return Err(IotaError::EpochEnded(epoch_store.epoch()));
1060 }
1061 let _metrics_guard = self
1062 .metrics
1063 .execute_certificate_with_effects_latency
1064 .start_timer();
1065 let digest = *transaction.digest();
1066 debug!("execute_certificate_with_effects");
1067 fp_ensure!(
1068 *effects.data().transaction_digest() == digest,
1069 IotaError::ErrorWhileProcessingCertificate {
1070 err: "effects/tx digest mismatch".to_string()
1071 }
1072 );
1073
1074 if transaction.contains_shared_object() {
1075 epoch_store
1076 .acquire_shared_locks_from_effects(
1077 transaction,
1078 effects.data(),
1079 self.get_object_cache_reader().as_ref(),
1080 )
1081 .await?;
1082 }
1083
1084 let expected_effects_digest = effects.digest();
1085
1086 self.transaction_manager
1087 .enqueue(vec![transaction.clone()], epoch_store);
1088
1089 let observed_effects = self
1090 .get_transaction_cache_reader()
1091 .notify_read_executed_effects(&[digest])
1092 .instrument(tracing::debug_span!(
1093 "notify_read_effects_in_execute_certificate_with_effects"
1094 ))
1095 .await?
1096 .pop()
1097 .expect("notify_read_effects should return exactly 1 element");
1098
1099 let observed_effects_digest = observed_effects.digest();
1100 if &observed_effects_digest != expected_effects_digest {
1101 panic!(
1102 "Locally executed effects do not match canonical effects! expected_effects_digest={:?} observed_effects_digest={:?} expected_effects={:?} observed_effects={:?} input_objects={:?}",
1103 expected_effects_digest,
1104 observed_effects_digest,
1105 effects.data(),
1106 observed_effects,
1107 transaction.data().transaction_data().input_objects()
1108 );
1109 }
1110 Ok(())
1111 }
1112
1113 #[instrument(level = "trace", skip_all)]
1115 pub async fn execute_certificate(
1116 &self,
1117 certificate: &VerifiedCertificate,
1118 epoch_store: &Arc<AuthorityPerEpochStore>,
1119 ) -> IotaResult<TransactionEffects> {
1120 let _metrics_guard = if certificate.contains_shared_object() {
1121 self.metrics
1122 .execute_certificate_latency_shared_object
1123 .start_timer()
1124 } else {
1125 self.metrics
1126 .execute_certificate_latency_single_writer
1127 .start_timer()
1128 };
1129 debug!("execute_certificate");
1130
1131 self.metrics.total_cert_attempts.inc();
1132
1133 if !certificate.contains_shared_object() {
1134 self.enqueue_certificates_for_execution(vec![certificate.clone()], epoch_store);
1139 }
1140
1141 epoch_store
1144 .within_alive_epoch(self.notify_read_effects(certificate))
1145 .await
1146 .map_err(|_| IotaError::EpochEnded(epoch_store.epoch()))
1147 .and_then(|r| r)
1148 }
1149
1150 #[instrument(level = "trace", skip_all)]
1165 pub async fn try_execute_immediately(
1166 &self,
1167 certificate: &VerifiedExecutableTransaction,
1168 mut expected_effects_digest: Option<TransactionEffectsDigest>,
1169 epoch_store: &Arc<AuthorityPerEpochStore>,
1170 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1171 let _scope = monitored_scope("Execution::try_execute_immediately");
1172 let _metrics_guard = self.metrics.internal_execution_latency.start_timer();
1173 debug!("execute_certificate_internal");
1174
1175 let tx_digest = certificate.digest();
1176
1177 let tx_guard = epoch_store.acquire_tx_guard(certificate).await?;
1179
1180 if let Some(effects) = self
1183 .get_transaction_cache_reader()
1184 .get_executed_effects(tx_digest)?
1185 {
1186 tx_guard.release();
1187 return Ok((effects, None));
1188 }
1189 let input_objects =
1190 self.read_objects_for_execution(tx_guard.as_lock_guard(), certificate, epoch_store)?;
1191
1192 if expected_effects_digest.is_none() {
1193 expected_effects_digest = epoch_store.get_signed_effects_digest(tx_digest)?;
1198 }
1199
1200 self.process_certificate(
1201 tx_guard,
1202 certificate,
1203 input_objects,
1204 expected_effects_digest,
1205 epoch_store,
1206 )
1207 .await
1208 .tap_err(|e| info!(?tx_digest, "process_certificate failed: {e}"))
1209 }
1210
1211 pub fn read_objects_for_execution(
1212 &self,
1213 tx_lock: &CertLockGuard,
1214 certificate: &VerifiedExecutableTransaction,
1215 epoch_store: &Arc<AuthorityPerEpochStore>,
1216 ) -> IotaResult<InputObjects> {
1217 let _scope = monitored_scope("Execution::load_input_objects");
1218 let _metrics_guard = self
1219 .metrics
1220 .execution_load_input_objects_latency
1221 .start_timer();
1222 let input_objects = &certificate.data().transaction_data().input_objects()?;
1223 self.input_loader.read_objects_for_execution(
1224 epoch_store.as_ref(),
1225 &certificate.key(),
1226 tx_lock,
1227 input_objects,
1228 epoch_store.epoch(),
1229 )
1230 }
1231
1232 pub async fn try_execute_for_test(
1236 &self,
1237 certificate: &VerifiedCertificate,
1238 ) -> IotaResult<(VerifiedSignedTransactionEffects, Option<ExecutionError>)> {
1239 let epoch_store = self.epoch_store_for_testing();
1240 let (effects, execution_error_opt) = self
1241 .try_execute_immediately(
1242 &VerifiedExecutableTransaction::new_from_certificate(certificate.clone()),
1243 None,
1244 &epoch_store,
1245 )
1246 .await?;
1247 let signed_effects = self.sign_effects(effects, &epoch_store)?;
1248 Ok((signed_effects, execution_error_opt))
1249 }
1250
1251 pub async fn notify_read_effects(
1252 &self,
1253 certificate: &VerifiedCertificate,
1254 ) -> IotaResult<TransactionEffects> {
1255 self.get_transaction_cache_reader()
1256 .notify_read_executed_effects(&[*certificate.digest()])
1257 .await
1258 .map(|mut r| r.pop().expect("must return correct number of effects"))
1259 }
1260
1261 fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> IotaResult {
1262 self.get_object_cache_reader()
1263 .check_owned_objects_are_live(owned_object_refs)
1264 }
1265
1266 pub(crate) fn debug_dump_transaction_state(
1271 &self,
1272 tx_digest: &TransactionDigest,
1273 effects: &TransactionEffects,
1274 expected_effects_digest: TransactionEffectsDigest,
1275 inner_temporary_store: &InnerTemporaryStore,
1276 certificate: &VerifiedExecutableTransaction,
1277 debug_dump_config: &StateDebugDumpConfig,
1278 ) -> IotaResult<PathBuf> {
1279 let dump_dir = debug_dump_config
1280 .dump_file_directory
1281 .as_ref()
1282 .cloned()
1283 .unwrap_or(std::env::temp_dir());
1284 let epoch_store = self.load_epoch_store_one_call_per_task();
1285
1286 NodeStateDump::new(
1287 tx_digest,
1288 effects,
1289 expected_effects_digest,
1290 self.get_object_store().as_ref(),
1291 &epoch_store,
1292 inner_temporary_store,
1293 certificate,
1294 )?
1295 .write_to_file(&dump_dir)
1296 .map_err(|e| IotaError::FileIO(e.to_string()))
1297 }
1298
1299 #[instrument(level = "trace", skip_all)]
1300 pub(crate) async fn process_certificate(
1301 &self,
1302 tx_guard: CertTxGuard,
1303 certificate: &VerifiedExecutableTransaction,
1304 input_objects: InputObjects,
1305 expected_effects_digest: Option<TransactionEffectsDigest>,
1306 epoch_store: &Arc<AuthorityPerEpochStore>,
1307 ) -> IotaResult<(TransactionEffects, Option<ExecutionError>)> {
1308 let process_certificate_start_time = tokio::time::Instant::now();
1309 let digest = *certificate.digest();
1310
1311 fail_point_if!("correlated-crash-process-certificate", || {
1312 if iota_simulator::random::deterministic_probability_once(digest, 0.01) {
1313 iota_simulator::task::kill_current_node(None);
1314 }
1315 });
1316
1317 let execution_guard = self
1318 .execution_lock_for_executable_transaction(certificate)
1319 .await;
1320 let execution_guard = match execution_guard {
1326 Ok(execution_guard) => execution_guard,
1327 Err(err) => {
1328 tx_guard.release();
1329 return Err(err);
1330 }
1331 };
1332 if *execution_guard != epoch_store.epoch() {
1336 tx_guard.release();
1337 info!("The epoch of the execution_guard doesn't match the epoch store");
1338 return Err(IotaError::WrongEpoch {
1339 expected_epoch: epoch_store.epoch(),
1340 actual_epoch: *execution_guard,
1341 });
1342 }
1343
1344 let (inner_temporary_store, effects, execution_error_opt) = match self.prepare_certificate(
1350 &execution_guard,
1351 certificate,
1352 input_objects,
1353 epoch_store,
1354 ) {
1355 Err(e) => {
1356 info!(name = ?self.name, ?digest, "Error preparing transaction: {e}");
1357 tx_guard.release();
1358 return Err(e);
1359 }
1360 Ok(res) => res,
1361 };
1362
1363 if let Some(expected_effects_digest) = expected_effects_digest {
1364 if effects.digest() != expected_effects_digest {
1365 match self.debug_dump_transaction_state(
1367 &digest,
1368 &effects,
1369 expected_effects_digest,
1370 &inner_temporary_store,
1371 certificate,
1372 &self.config.state_debug_dump_config,
1373 ) {
1374 Ok(out_path) => {
1375 info!(
1376 "Dumped node state for transaction {} to {}",
1377 digest,
1378 out_path.as_path().display().to_string()
1379 );
1380 }
1381 Err(e) => {
1382 error!("Error dumping state for transaction {}: {e}", digest);
1383 }
1384 }
1385 error!(
1386 tx_digest = ?digest,
1387 ?expected_effects_digest,
1388 actual_effects = ?effects,
1389 "fork detected!"
1390 );
1391 panic!(
1392 "Transaction {} is expected to have effects digest {}, but got {}!",
1393 digest,
1394 expected_effects_digest,
1395 effects.digest(),
1396 );
1397 }
1398 }
1399
1400 fail_point_async!("crash");
1401
1402 self.commit_certificate(
1403 certificate,
1404 inner_temporary_store,
1405 &effects,
1406 tx_guard,
1407 execution_guard,
1408 epoch_store,
1409 )
1410 .await?;
1411
1412 if let TransactionKind::AuthenticatorStateUpdateV1(auth_state) =
1413 certificate.data().transaction_data().kind()
1414 {
1415 if let Some(err) = &execution_error_opt {
1416 error!("Authenticator state update failed: {err}");
1417 self.metrics.authenticator_state_update_failed.inc();
1418 }
1419 debug_assert!(execution_error_opt.is_none());
1420 epoch_store.update_authenticator_state(auth_state);
1421
1422 if cfg!(debug_assertions) {
1425 let authenticator_state = get_authenticator_state(self.get_object_store())
1426 .expect("Read cannot fail")
1427 .expect("Authenticator state must exist");
1428
1429 let mut sys_jwks: Vec<_> = authenticator_state
1430 .active_jwks
1431 .into_iter()
1432 .map(|jwk| (jwk.jwk_id, jwk.jwk))
1433 .collect();
1434 let mut active_jwks: Vec<_> = epoch_store
1435 .signature_verifier
1436 .get_jwks()
1437 .into_iter()
1438 .collect();
1439 sys_jwks.sort();
1440 active_jwks.sort();
1441
1442 assert_eq!(sys_jwks, active_jwks);
1443 }
1444 }
1445
1446 let elapsed = process_certificate_start_time.elapsed().as_micros() as f64;
1447 if elapsed > 0.0 {
1448 self.metrics
1449 .execution_gas_latency_ratio
1450 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1451 };
1452 Ok((effects, execution_error_opt))
1453 }
1454
1455 #[instrument(level = "trace", skip_all)]
1456 async fn commit_certificate(
1457 &self,
1458 certificate: &VerifiedExecutableTransaction,
1459 inner_temporary_store: InnerTemporaryStore,
1460 effects: &TransactionEffects,
1461 tx_guard: CertTxGuard,
1462 _execution_guard: ExecutionLockReadGuard<'_>,
1463 epoch_store: &Arc<AuthorityPerEpochStore>,
1464 ) -> IotaResult {
1465 let _scope: Option<iota_metrics::MonitoredScopeGuard> =
1466 monitored_scope("Execution::commit_certificate");
1467 let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();
1468
1469 let tx_key = certificate.key();
1470 let tx_digest = certificate.digest();
1471 let input_object_count = inner_temporary_store.input_objects.len();
1472 let shared_object_count = effects.input_shared_objects().len();
1473
1474 let output_keys = inner_temporary_store.get_output_keys(effects);
1475
1476 let _ = self
1478 .post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
1479 .await
1480 .tap_err(|e| {
1481 self.metrics.post_processing_total_failures.inc();
1482 error!(?tx_digest, "tx post processing failed: {e}");
1483 });
1484
1485 epoch_store.insert_tx_key_and_digest(&tx_key, tx_digest)?;
1489
1490 fail_point_async!("crash");
1492
1493 let transaction_outputs = TransactionOutputs::build_transaction_outputs(
1494 certificate.clone().into_unsigned(),
1495 effects.clone(),
1496 inner_temporary_store,
1497 );
1498 self.get_cache_writer()
1499 .write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())
1500 .await?;
1501
1502 if certificate.transaction_data().is_end_of_epoch_tx() {
1503 self.get_object_cache_reader()
1506 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
1507 }
1508
1509 tx_guard.commit_tx();
1511
1512 self.transaction_manager
1516 .notify_commit(tx_digest, output_keys, epoch_store);
1517
1518 self.update_metrics(certificate, input_object_count, shared_object_count);
1519
1520 Ok(())
1521 }
1522
1523 fn update_metrics(
1524 &self,
1525 certificate: &VerifiedExecutableTransaction,
1526 input_object_count: usize,
1527 shared_object_count: usize,
1528 ) {
1529 if certificate.has_zklogin_sig() {
1531 self.metrics.zklogin_sig_count.inc();
1532 } else if certificate.has_upgraded_multisig() {
1533 self.metrics.multisig_sig_count.inc();
1534 }
1535
1536 self.metrics.total_effects.inc();
1537 self.metrics.total_certs.inc();
1538
1539 if shared_object_count > 0 {
1540 self.metrics.shared_obj_tx.inc();
1541 }
1542
1543 if certificate.is_sponsored_tx() {
1544 self.metrics.sponsored_tx.inc();
1545 }
1546
1547 self.metrics
1548 .num_input_objs
1549 .observe(input_object_count as f64);
1550 self.metrics
1551 .num_shared_objects
1552 .observe(shared_object_count as f64);
1553 self.metrics.batch_size.observe(
1554 certificate
1555 .data()
1556 .intent_message()
1557 .value
1558 .kind()
1559 .num_commands() as f64,
1560 );
1561 }
1562
1563 #[instrument(level = "trace", skip_all)]
1575 fn prepare_certificate(
1576 &self,
1577 _execution_guard: &ExecutionLockReadGuard<'_>,
1578 certificate: &VerifiedExecutableTransaction,
1579 input_objects: InputObjects,
1580 epoch_store: &Arc<AuthorityPerEpochStore>,
1581 ) -> IotaResult<(
1582 InnerTemporaryStore,
1583 TransactionEffects,
1584 Option<ExecutionError>,
1585 )> {
1586 let _scope = monitored_scope("Execution::prepare_certificate");
1587 let _metrics_guard = self.metrics.prepare_certificate_latency.start_timer();
1588 let prepare_certificate_start_time = tokio::time::Instant::now();
1589
1590 let tx_data = certificate.data().transaction_data();
1593 tx_data.validity_check(epoch_store.protocol_config())?;
1594
1595 let (gas_status, input_objects) = iota_transaction_checks::check_certificate_input(
1598 certificate,
1599 input_objects,
1600 epoch_store.protocol_config(),
1601 epoch_store.reference_gas_price(),
1602 )?;
1603
1604 let owned_object_refs = input_objects.inner().filter_owned_objects();
1605 self.check_owned_locks(&owned_object_refs)?;
1606 let tx_digest = *certificate.digest();
1607 let protocol_config = epoch_store.protocol_config();
1608 let transaction_data = &certificate.data().intent_message().value;
1609 let (kind, signer, gas) = transaction_data.execution_parts();
1610
1611 #[cfg_attr(not(any(msim, fail_points)), expect(unused_mut))]
1612 let (inner_temp_store, _, mut effects, execution_error_opt) =
1613 epoch_store.executor().execute_transaction_to_effects(
1614 self.get_backing_store().as_ref(),
1615 protocol_config,
1616 self.metrics.limits_metrics.clone(),
1617 self.config
1620 .expensive_safety_check_config
1621 .enable_deep_per_tx_iota_conservation_check(),
1622 self.config.certificate_deny_config.certificate_deny_set(),
1623 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1624 epoch_store
1625 .epoch_start_config()
1626 .epoch_data()
1627 .epoch_start_timestamp(),
1628 input_objects,
1629 gas,
1630 gas_status,
1631 kind,
1632 signer,
1633 tx_digest,
1634 &mut None,
1635 );
1636
1637 fail_point_if!("cp_execution_nondeterminism", || {
1638 #[cfg(msim)]
1639 self.create_fail_state(certificate, epoch_store, &mut effects);
1640 });
1641
1642 let elapsed = prepare_certificate_start_time.elapsed().as_micros() as f64;
1643 if elapsed > 0.0 {
1644 self.metrics
1645 .prepare_cert_gas_latency_ratio
1646 .observe(effects.gas_cost_summary().computation_cost as f64 / elapsed);
1647 }
1648
1649 Ok((inner_temp_store, effects, execution_error_opt.err()))
1650 }
1651
1652 pub fn prepare_certificate_for_benchmark(
1653 &self,
1654 certificate: &VerifiedExecutableTransaction,
1655 input_objects: InputObjects,
1656 epoch_store: &Arc<AuthorityPerEpochStore>,
1657 ) -> IotaResult<(
1658 InnerTemporaryStore,
1659 TransactionEffects,
1660 Option<ExecutionError>,
1661 )> {
1662 let lock: RwLock<EpochId> = RwLock::new(epoch_store.epoch());
1663 let execution_guard = lock.try_read().unwrap();
1664
1665 self.prepare_certificate(&execution_guard, certificate, input_objects, epoch_store)
1666 }
1667
1668 pub async fn dry_exec_transaction(
1669 &self,
1670 transaction: TransactionData,
1671 transaction_digest: TransactionDigest,
1672 ) -> IotaResult<(
1673 DryRunTransactionBlockResponse,
1674 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1675 TransactionEffects,
1676 Option<ObjectID>,
1677 )> {
1678 let epoch_store = self.load_epoch_store_one_call_per_task();
1679 if !self.is_fullnode(&epoch_store) {
1680 return Err(IotaError::UnsupportedFeature {
1681 error: "dry-exec is only supported on fullnodes".to_string(),
1682 });
1683 }
1684
1685 if transaction.kind().is_system_tx() {
1686 return Err(IotaError::UnsupportedFeature {
1687 error: "dry-exec does not support system transactions".to_string(),
1688 });
1689 }
1690
1691 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1692 .await
1693 }
1694
1695 pub async fn dry_exec_transaction_for_benchmark(
1696 &self,
1697 transaction: TransactionData,
1698 transaction_digest: TransactionDigest,
1699 ) -> IotaResult<(
1700 DryRunTransactionBlockResponse,
1701 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1702 TransactionEffects,
1703 Option<ObjectID>,
1704 )> {
1705 let epoch_store = self.load_epoch_store_one_call_per_task();
1706 self.dry_exec_transaction_impl(&epoch_store, transaction, transaction_digest)
1707 .await
1708 }
1709
1710 async fn dry_exec_transaction_impl(
1711 &self,
1712 epoch_store: &AuthorityPerEpochStore,
1713 transaction: TransactionData,
1714 transaction_digest: TransactionDigest,
1715 ) -> IotaResult<(
1716 DryRunTransactionBlockResponse,
1717 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
1718 TransactionEffects,
1719 Option<ObjectID>,
1720 )> {
1721 transaction.validity_check_no_gas_check(epoch_store.protocol_config())?;
1723
1724 let input_object_kinds = transaction.input_objects()?;
1725 let receiving_object_refs = transaction.receiving_objects();
1726
1727 iota_transaction_checks::deny::check_transaction_for_signing(
1728 &transaction,
1729 &[],
1730 &input_object_kinds,
1731 &receiving_object_refs,
1732 &self.config.transaction_deny_config,
1733 self.get_backing_package_store().as_ref(),
1734 )?;
1735
1736 let (input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1737 None,
1739 &input_object_kinds,
1740 &receiving_object_refs,
1741 epoch_store.epoch(),
1742 )?;
1743
1744 let mut gas_object_refs = transaction.gas().to_vec();
1746 let reference_gas_price = epoch_store.reference_gas_price();
1747 let ((gas_status, checked_input_objects), mock_gas) = if transaction.gas().is_empty() {
1748 let sender = transaction.sender();
1749 const NANOS_TO_IOTA: u64 = 1_000_000_000;
1751 const DRY_RUN_IOTA: u64 = 1_000_000_000;
1752 let max_coin_value = NANOS_TO_IOTA * DRY_RUN_IOTA;
1753 let gas_object_id = ObjectID::random();
1754 let gas_object = Object::new_move(
1755 MoveObject::new_gas_coin(OBJECT_START_VERSION, gas_object_id, max_coin_value),
1756 Owner::AddressOwner(sender),
1757 TransactionDigest::genesis_marker(),
1758 );
1759 let gas_object_ref = gas_object.compute_object_reference();
1760 gas_object_refs = vec![gas_object_ref];
1761 (
1762 iota_transaction_checks::check_transaction_input_with_given_gas(
1763 epoch_store.protocol_config(),
1764 reference_gas_price,
1765 &transaction,
1766 input_objects,
1767 receiving_objects,
1768 gas_object,
1769 &self.metrics.bytecode_verifier_metrics,
1770 &self.config.verifier_signing_config,
1771 )?,
1772 Some(gas_object_id),
1773 )
1774 } else {
1775 (
1776 iota_transaction_checks::check_transaction_input(
1777 epoch_store.protocol_config(),
1778 reference_gas_price,
1779 &transaction,
1780 input_objects,
1781 &receiving_objects,
1782 &self.metrics.bytecode_verifier_metrics,
1783 &self.config.verifier_signing_config,
1784 )?,
1785 None,
1786 )
1787 };
1788
1789 let protocol_config = epoch_store.protocol_config();
1790 let (kind, signer, _) = transaction.execution_parts();
1791
1792 let silent = true;
1793 let executor = iota_execution::executor(protocol_config, silent, None)
1794 .expect("Creating an executor should not fail here");
1795
1796 let expensive_checks = false;
1797 let (inner_temp_store, _, effects, _execution_error) = executor
1798 .execute_transaction_to_effects(
1799 self.get_backing_store().as_ref(),
1800 protocol_config,
1801 self.metrics.limits_metrics.clone(),
1802 expensive_checks,
1803 self.config.certificate_deny_config.certificate_deny_set(),
1804 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
1805 epoch_store
1806 .epoch_start_config()
1807 .epoch_data()
1808 .epoch_start_timestamp(),
1809 checked_input_objects,
1810 gas_object_refs,
1811 gas_status,
1812 kind,
1813 signer,
1814 transaction_digest,
1815 &mut None,
1816 );
1817 let tx_digest = *effects.transaction_digest();
1818
1819 let module_cache =
1820 TemporaryModuleResolver::new(&inner_temp_store, epoch_store.module_cache().clone());
1821
1822 let mut layout_resolver =
1823 epoch_store
1824 .executor()
1825 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
1826 &inner_temp_store,
1827 self.get_backing_package_store(),
1828 )));
1829 let object_changes = Vec::new();
1831
1832 let balance_changes = Vec::new();
1834
1835 let written_with_kind = effects
1836 .created()
1837 .into_iter()
1838 .map(|(oref, _)| (oref, WriteKind::Create))
1839 .chain(
1840 effects
1841 .unwrapped()
1842 .into_iter()
1843 .map(|(oref, _)| (oref, WriteKind::Unwrap)),
1844 )
1845 .chain(
1846 effects
1847 .mutated()
1848 .into_iter()
1849 .map(|(oref, _)| (oref, WriteKind::Mutate)),
1850 )
1851 .map(|(oref, kind)| {
1852 let obj = inner_temp_store.written.get(&oref.0).unwrap();
1853 (oref.0, (oref, obj.clone(), kind))
1855 })
1856 .collect();
1857
1858 Ok((
1859 DryRunTransactionBlockResponse {
1860 input: IotaTransactionBlockData::try_from(transaction, &module_cache, tx_digest)
1861 .map_err(|e| IotaError::TransactionSerialization {
1862 error: format!(
1863 "Failed to convert transaction to IotaTransactionBlockData: {e}",
1864 ),
1865 })?, effects: effects.clone().try_into()?,
1867 events: IotaTransactionBlockEvents::try_from(
1868 inner_temp_store.events.clone(),
1869 tx_digest,
1870 None,
1871 layout_resolver.as_mut(),
1872 )?,
1873 object_changes,
1874 balance_changes,
1875 },
1876 written_with_kind,
1877 effects,
1878 mock_gas,
1879 ))
1880 }
1881
1882 pub async fn dev_inspect_transaction_block(
1884 &self,
1885 sender: IotaAddress,
1886 transaction_kind: TransactionKind,
1887 gas_price: Option<u64>,
1888 gas_budget: Option<u64>,
1889 gas_sponsor: Option<IotaAddress>,
1890 gas_objects: Option<Vec<ObjectRef>>,
1891 show_raw_txn_data_and_effects: Option<bool>,
1892 skip_checks: Option<bool>,
1893 ) -> IotaResult<DevInspectResults> {
1894 let epoch_store = self.load_epoch_store_one_call_per_task();
1895
1896 if !self.is_fullnode(&epoch_store) {
1897 return Err(IotaError::UnsupportedFeature {
1898 error: "dev-inspect is only supported on fullnodes".to_string(),
1899 });
1900 }
1901
1902 if transaction_kind.is_system_tx() {
1903 return Err(IotaError::UnsupportedFeature {
1904 error: "system transactions are not supported".to_string(),
1905 });
1906 }
1907
1908 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
1909 let skip_checks = skip_checks.unwrap_or(true);
1910 let reference_gas_price = epoch_store.reference_gas_price();
1911 let protocol_config = epoch_store.protocol_config();
1912 let max_tx_gas = protocol_config.max_tx_gas();
1913
1914 let price = gas_price.unwrap_or(reference_gas_price);
1915 let budget = gas_budget.unwrap_or(max_tx_gas);
1916 let owner = gas_sponsor.unwrap_or(sender);
1917 let payment = gas_objects.unwrap_or_default();
1920 let transaction = TransactionData::V1(TransactionDataV1 {
1921 kind: transaction_kind.clone(),
1922 sender,
1923 gas_data: GasData {
1924 payment,
1925 owner,
1926 price,
1927 budget,
1928 },
1929 expiration: TransactionExpiration::None,
1930 });
1931
1932 let raw_txn_data = if show_raw_txn_data_and_effects {
1933 bcs::to_bytes(&transaction).map_err(|_| IotaError::TransactionSerialization {
1934 error: "Failed to serialize transaction during dev inspect".to_string(),
1935 })?
1936 } else {
1937 vec![]
1938 };
1939
1940 transaction.validity_check_no_gas_check(protocol_config)?;
1941
1942 let input_object_kinds = transaction.input_objects()?;
1943 let receiving_object_refs = transaction.receiving_objects();
1944
1945 iota_transaction_checks::deny::check_transaction_for_signing(
1946 &transaction,
1947 &[],
1948 &input_object_kinds,
1949 &receiving_object_refs,
1950 &self.config.transaction_deny_config,
1951 self.get_backing_package_store().as_ref(),
1952 )?;
1953
1954 let (mut input_objects, receiving_objects) = self.input_loader.read_objects_for_signing(
1955 None,
1957 &input_object_kinds,
1958 &receiving_object_refs,
1959 epoch_store.epoch(),
1960 )?;
1961
1962 let dummy_gas_object = Object::new_gas_with_balance_and_owner_for_testing(
1964 DEV_INSPECT_GAS_COIN_VALUE,
1965 transaction.gas_owner(),
1966 );
1967
1968 let gas_objects = if transaction.gas().is_empty() {
1969 let gas_object_ref = dummy_gas_object.compute_object_reference();
1970 vec![gas_object_ref]
1971 } else {
1972 transaction.gas().to_vec()
1973 };
1974
1975 let (gas_status, checked_input_objects) = if skip_checks {
1976 if transaction.gas().is_empty() {
1981 input_objects.push(ObjectReadResult::new(
1982 InputObjectKind::ImmOrOwnedMoveObject(gas_objects[0]),
1983 dummy_gas_object.into(),
1984 ));
1985 }
1986 let checked_input_objects = iota_transaction_checks::check_dev_inspect_input(
1987 protocol_config,
1988 &transaction_kind,
1989 input_objects,
1990 receiving_objects,
1991 )?;
1992 let gas_status = IotaGasStatus::new(
1993 max_tx_gas,
1994 transaction.gas_price(),
1995 reference_gas_price,
1996 protocol_config,
1997 )?;
1998
1999 (gas_status, checked_input_objects)
2000 } else {
2001 if transaction.gas().is_empty() {
2005 iota_transaction_checks::check_transaction_input_with_given_gas(
2006 epoch_store.protocol_config(),
2007 reference_gas_price,
2008 &transaction,
2009 input_objects,
2010 receiving_objects,
2011 dummy_gas_object,
2012 &self.metrics.bytecode_verifier_metrics,
2013 &self.config.verifier_signing_config,
2014 )?
2015 } else {
2016 iota_transaction_checks::check_transaction_input(
2017 epoch_store.protocol_config(),
2018 reference_gas_price,
2019 &transaction,
2020 input_objects,
2021 &receiving_objects,
2022 &self.metrics.bytecode_verifier_metrics,
2023 &self.config.verifier_signing_config,
2024 )?
2025 }
2026 };
2027
2028 let executor = iota_execution::executor(protocol_config, true, None)
2029 .expect("Creating an executor should not fail here");
2030 let intent_msg = IntentMessage::new(
2031 Intent {
2032 version: IntentVersion::V0,
2033 scope: IntentScope::TransactionData,
2034 app_id: AppId::Iota,
2035 },
2036 transaction,
2037 );
2038 let transaction_digest = TransactionDigest::new(default_hash(&intent_msg.value));
2039 let (inner_temp_store, _, effects, execution_result) = executor.dev_inspect_transaction(
2040 self.get_backing_store().as_ref(),
2041 protocol_config,
2042 self.metrics.limits_metrics.clone(),
2043 false,
2045 self.config.certificate_deny_config.certificate_deny_set(),
2046 &epoch_store.epoch_start_config().epoch_data().epoch_id(),
2047 epoch_store
2048 .epoch_start_config()
2049 .epoch_data()
2050 .epoch_start_timestamp(),
2051 checked_input_objects,
2052 gas_objects,
2053 gas_status,
2054 transaction_kind,
2055 sender,
2056 transaction_digest,
2057 skip_checks,
2058 );
2059
2060 let raw_effects = if show_raw_txn_data_and_effects {
2061 bcs::to_bytes(&effects).map_err(|_| IotaError::TransactionSerialization {
2062 error: "Failed to serialize transaction effects during dev inspect".to_string(),
2063 })?
2064 } else {
2065 vec![]
2066 };
2067
2068 let mut layout_resolver =
2069 epoch_store
2070 .executor()
2071 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2072 &inner_temp_store,
2073 self.get_backing_package_store(),
2074 )));
2075
2076 DevInspectResults::new(
2077 effects,
2078 inner_temp_store.events.clone(),
2079 execution_result,
2080 raw_txn_data,
2081 raw_effects,
2082 layout_resolver.as_mut(),
2083 )
2084 }
2085
2086 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
2088 let epoch_store = self.epoch_store_for_testing();
2089 Ok(epoch_store.reference_gas_price())
2090 }
2091
2092 pub fn is_tx_already_executed(&self, digest: &TransactionDigest) -> IotaResult<bool> {
2093 self.get_transaction_cache_reader()
2094 .is_tx_already_executed(digest)
2095 }
2096
2097 #[instrument(level = "debug", skip_all, err)]
2099 async fn index_tx(
2100 &self,
2101 indexes: &IndexStore,
2102 digest: &TransactionDigest,
2103 cert: &VerifiedExecutableTransaction,
2105 effects: &TransactionEffects,
2106 events: &TransactionEvents,
2107 timestamp_ms: u64,
2108 tx_coins: Option<TxCoins>,
2109 written: &WrittenObjects,
2110 inner_temporary_store: &InnerTemporaryStore,
2111 ) -> IotaResult<u64> {
2112 let changes = self
2113 .process_object_index(effects, written, inner_temporary_store)
2114 .tap_err(|e| warn!(tx_digest=?digest, "Failed to process object index, index_tx is skipped: {e}"))?;
2115
2116 indexes
2117 .index_tx(
2118 cert.data().intent_message().value.sender(),
2119 cert.data()
2120 .intent_message()
2121 .value
2122 .input_objects()?
2123 .iter()
2124 .map(|o| o.object_id()),
2125 effects
2126 .all_changed_objects()
2127 .into_iter()
2128 .map(|(obj_ref, owner, _kind)| (obj_ref, owner)),
2129 cert.data()
2130 .intent_message()
2131 .value
2132 .move_calls()
2133 .into_iter()
2134 .map(|(package, module, function)| {
2135 (*package, module.to_owned(), function.to_owned())
2136 }),
2137 events,
2138 changes,
2139 digest,
2140 timestamp_ms,
2141 tx_coins,
2142 )
2143 .await
2144 }
2145
2146 #[cfg(msim)]
2147 fn create_fail_state(
2148 &self,
2149 certificate: &VerifiedExecutableTransaction,
2150 epoch_store: &Arc<AuthorityPerEpochStore>,
2151 effects: &mut TransactionEffects,
2152 ) {
2153 use std::cell::RefCell;
2154 thread_local! {
2155 static FAIL_STATE: RefCell<(u64, HashSet<AuthorityName>)> = RefCell::new((0, HashSet::new()));
2156 }
2157 if !certificate.data().intent_message().value.is_system_tx() {
2158 let committee = epoch_store.committee();
2159 let cur_stake = (**committee).weight(&self.name);
2160 if cur_stake > 0 {
2161 FAIL_STATE.with_borrow_mut(|fail_state| {
2162 if fail_state.0 < committee.validity_threshold() {
2164 fail_state.0 += cur_stake;
2165 fail_state.1.insert(self.name);
2166 }
2167
2168 if fail_state.1.contains(&self.name) {
2169 info!("cp_exec failing tx");
2170 effects.gas_cost_summary_mut_for_testing().computation_cost += 1;
2171 }
2172 });
2173 }
2174 }
2175 }
2176
2177 fn process_object_index(
2178 &self,
2179 effects: &TransactionEffects,
2180 written: &WrittenObjects,
2181 inner_temporary_store: &InnerTemporaryStore,
2182 ) -> IotaResult<ObjectIndexChanges> {
2183 let epoch_store = self.load_epoch_store_one_call_per_task();
2184 let mut layout_resolver =
2185 epoch_store
2186 .executor()
2187 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2188 inner_temporary_store,
2189 self.get_backing_package_store(),
2190 )));
2191
2192 let modified_at_version = effects
2193 .modified_at_versions()
2194 .into_iter()
2195 .collect::<HashMap<_, _>>();
2196
2197 let tx_digest = effects.transaction_digest();
2198 let mut deleted_owners = vec![];
2199 let mut deleted_dynamic_fields = vec![];
2200 for (id, _, _) in effects.deleted().into_iter().chain(effects.wrapped()) {
2201 let old_version = modified_at_version.get(&id).unwrap();
2202 match self.get_owner_at_version(&id, *old_version).unwrap_or_else(
2205 |e| panic!("tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}. Err: {:?}", tx_digest, id, old_version, e),
2206 ) {
2207 Owner::AddressOwner(addr) => deleted_owners.push((addr, id)),
2208 Owner::ObjectOwner(object_id) => {
2209 deleted_dynamic_fields.push((ObjectID::from(object_id), id))
2210 }
2211 _ => {}
2212 }
2213 }
2214
2215 let mut new_owners = vec![];
2216 let mut new_dynamic_fields = vec![];
2217
2218 for (oref, owner, kind) in effects.all_changed_objects() {
2219 let id = &oref.0;
2220 if let WriteKind::Mutate = kind {
2223 let Some(old_version) = modified_at_version.get(id) else {
2224 panic!(
2225 "tx_digest={:?}, error processing object owner index, cannot find modified at version for mutated object [{id}].",
2226 tx_digest
2227 );
2228 };
2229 let Some(old_object) = self
2232 .get_object_store()
2233 .get_object_by_key(id, *old_version)?
2234 else {
2235 panic!(
2236 "tx_digest={:?}, error processing object owner index, cannot find owner for object {:?} at version {:?}",
2237 tx_digest, id, old_version
2238 );
2239 };
2240 if old_object.owner != owner {
2241 match old_object.owner {
2242 Owner::AddressOwner(addr) => {
2243 deleted_owners.push((addr, *id));
2244 }
2245 Owner::ObjectOwner(object_id) => {
2246 deleted_dynamic_fields.push((ObjectID::from(object_id), *id))
2247 }
2248 _ => {}
2249 }
2250 }
2251 }
2252
2253 match owner {
2254 Owner::AddressOwner(addr) => {
2255 let new_object = written.get(id).unwrap_or_else(
2258 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
2259 );
2260 assert_eq!(
2261 new_object.version(),
2262 oref.1,
2263 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2264 tx_digest,
2265 id,
2266 new_object.version(),
2267 oref.1
2268 );
2269
2270 let type_ = new_object
2271 .type_()
2272 .map(|type_| ObjectType::Struct(type_.clone()))
2273 .unwrap_or(ObjectType::Package);
2274
2275 new_owners.push((
2276 (addr, *id),
2277 ObjectInfo {
2278 object_id: *id,
2279 version: oref.1,
2280 digest: oref.2,
2281 type_,
2282 owner,
2283 previous_transaction: *effects.transaction_digest(),
2284 },
2285 ));
2286 }
2287 Owner::ObjectOwner(owner) => {
2288 let new_object = written.get(id).unwrap_or_else(
2289 || panic!("tx_digest={:?}, error processing object owner index, written does not contain object {:?}", tx_digest, id)
2290 );
2291 assert_eq!(
2292 new_object.version(),
2293 oref.1,
2294 "tx_digest={:?} error processing object owner index, object {:?} from written has mismatched version. Actual: {}, expected: {}",
2295 tx_digest,
2296 id,
2297 new_object.version(),
2298 oref.1
2299 );
2300
2301 let Some(df_info) = self
2302 .try_create_dynamic_field_info(new_object, written, layout_resolver.as_mut())
2303 .unwrap_or_else(|e| {
2304 error!("try_create_dynamic_field_info should not fail, {}, new_object={:?}", e, new_object);
2305 None
2306 }
2307 )
2308 else {
2309 continue;
2311 };
2312 new_dynamic_fields.push(((ObjectID::from(owner), *id), df_info))
2313 }
2314 _ => {}
2315 }
2316 }
2317
2318 Ok(ObjectIndexChanges {
2319 deleted_owners,
2320 deleted_dynamic_fields,
2321 new_owners,
2322 new_dynamic_fields,
2323 })
2324 }
2325
2326 fn try_create_dynamic_field_info(
2327 &self,
2328 o: &Object,
2329 written: &WrittenObjects,
2330 resolver: &mut dyn LayoutResolver,
2331 ) -> IotaResult<Option<DynamicFieldInfo>> {
2332 let Some(move_object) = o.data.try_as_move().cloned() else {
2334 return Ok(None);
2335 };
2336
2337 if !move_object.type_().is_dynamic_field() {
2339 return Ok(None);
2340 }
2341
2342 let layout = resolver
2343 .get_annotated_layout(&move_object.type_().clone().into())?
2344 .into_layout();
2345
2346 let field =
2347 DFV::FieldVisitor::deserialize(move_object.contents(), &layout).map_err(|e| {
2348 IotaError::ObjectDeserialization {
2349 error: e.to_string(),
2350 }
2351 })?;
2352
2353 let type_ = field.kind;
2354 let name_type: TypeTag = field.name_layout.into();
2355 let bcs_name = field.name_bytes.to_owned();
2356
2357 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
2358 .map_err(|e| {
2359 warn!("{e}");
2360 IotaError::ObjectDeserialization {
2361 error: e.to_string(),
2362 }
2363 })?;
2364
2365 let name = DynamicFieldName {
2366 type_: name_type,
2367 value: IotaMoveValue::from(name_value).to_json_value(),
2368 };
2369
2370 let value_metadata = field.value_metadata().map_err(|e| {
2371 warn!("{e}");
2372 IotaError::ObjectDeserialization {
2373 error: e.to_string(),
2374 }
2375 })?;
2376
2377 Ok(Some(match value_metadata {
2378 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
2379 name,
2380 bcs_name,
2381 type_,
2382 object_type: object_type.to_canonical_string(true),
2383 object_id: o.id(),
2384 version: o.version(),
2385 digest: o.digest(),
2386 },
2387
2388 DFV::ValueMetadata::DynamicObjectField(object_id) => {
2389 let (version, digest, object_type) = if let Some(object) = written.get(&object_id) {
2394 let version = object.version();
2395 let digest = object.digest();
2396 let object_type = object.data.type_().unwrap().clone();
2397 (version, digest, object_type)
2398 } else {
2399 let object = self
2401 .get_object_store()
2402 .get_object_by_key(&object_id, o.version())?
2403 .ok_or_else(|| UserInputError::ObjectNotFound {
2404 object_id,
2405 version: Some(o.version()),
2406 })?;
2407 let version = object.version();
2408 let digest = object.digest();
2409 let object_type = object.data.type_().unwrap().clone();
2410 (version, digest, object_type)
2411 };
2412
2413 DynamicFieldInfo {
2414 name,
2415 bcs_name,
2416 type_,
2417 object_type: object_type.to_string(),
2418 object_id,
2419 version,
2420 digest,
2421 }
2422 }
2423 }))
2424 }
2425
2426 #[instrument(level = "trace", skip_all, err)]
2427 async fn post_process_one_tx(
2428 &self,
2429 certificate: &VerifiedExecutableTransaction,
2430 effects: &TransactionEffects,
2431 inner_temporary_store: &InnerTemporaryStore,
2432 epoch_store: &Arc<AuthorityPerEpochStore>,
2433 ) -> IotaResult {
2434 if self.indexes.is_none() {
2435 return Ok(());
2436 }
2437
2438 let tx_digest = certificate.digest();
2439 let timestamp_ms = Self::unixtime_now_ms();
2440 let events = &inner_temporary_store.events;
2441 let written = &inner_temporary_store.written;
2442 let tx_coins =
2443 self.fullnode_only_get_tx_coins_for_indexing(inner_temporary_store, epoch_store);
2444
2445 if let Some(indexes) = &self.indexes {
2447 let _ = self
2448 .index_tx(
2449 indexes.as_ref(),
2450 tx_digest,
2451 certificate,
2452 effects,
2453 events,
2454 timestamp_ms,
2455 tx_coins,
2456 written,
2457 inner_temporary_store,
2458 )
2459 .await
2460 .tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
2461 .tap_err(|e| error!(?tx_digest, "Post processing - Couldn't index tx: {e}"))
2462 .expect("Indexing tx should not fail");
2463
2464 let effects: IotaTransactionBlockEffects = effects.clone().try_into()?;
2465 let events = self.make_transaction_block_events(
2466 events.clone(),
2467 *tx_digest,
2468 timestamp_ms,
2469 epoch_store,
2470 inner_temporary_store,
2471 )?;
2472 self.subscription_handler
2474 .process_tx(certificate.data().transaction_data(), &effects, &events)
2475 .await
2476 .tap_ok(|_| {
2477 self.metrics
2478 .post_processing_total_tx_had_event_processed
2479 .inc()
2480 })
2481 .tap_err(|e| {
2482 warn!(
2483 ?tx_digest,
2484 "Post processing - Couldn't process events for tx: {}", e
2485 )
2486 })?;
2487
2488 self.metrics
2489 .post_processing_total_events_emitted
2490 .inc_by(events.data.len() as u64);
2491 };
2492 Ok(())
2493 }
2494
2495 fn make_transaction_block_events(
2496 &self,
2497 transaction_events: TransactionEvents,
2498 digest: TransactionDigest,
2499 timestamp_ms: u64,
2500 epoch_store: &Arc<AuthorityPerEpochStore>,
2501 inner_temporary_store: &InnerTemporaryStore,
2502 ) -> IotaResult<IotaTransactionBlockEvents> {
2503 let mut layout_resolver =
2504 epoch_store
2505 .executor()
2506 .type_layout_resolver(Box::new(PackageStoreWithFallback::new(
2507 inner_temporary_store,
2508 self.get_backing_package_store(),
2509 )));
2510 IotaTransactionBlockEvents::try_from(
2511 transaction_events,
2512 digest,
2513 Some(timestamp_ms),
2514 layout_resolver.as_mut(),
2515 )
2516 }
2517
2518 pub fn unixtime_now_ms() -> u64 {
2519 let ts_ms = Utc::now().timestamp_millis();
2520 u64::try_from(ts_ms).expect("Travelling in time machine")
2521 }
2522
2523 #[instrument(level = "trace", skip_all)]
2524 pub async fn handle_transaction_info_request(
2525 &self,
2526 request: TransactionInfoRequest,
2527 ) -> IotaResult<TransactionInfoResponse> {
2528 let epoch_store = self.load_epoch_store_one_call_per_task();
2529 let (transaction, status) = self
2530 .get_transaction_status(&request.transaction_digest, &epoch_store)?
2531 .ok_or(IotaError::TransactionNotFound {
2532 digest: request.transaction_digest,
2533 })?;
2534 Ok(TransactionInfoResponse {
2535 transaction,
2536 status,
2537 })
2538 }
2539
2540 #[instrument(level = "trace", skip_all)]
2541 pub async fn handle_object_info_request(
2542 &self,
2543 request: ObjectInfoRequest,
2544 ) -> IotaResult<ObjectInfoResponse> {
2545 let epoch_store = self.load_epoch_store_one_call_per_task();
2546
2547 let requested_object_seq = match request.request_kind {
2548 ObjectInfoRequestKind::LatestObjectInfo => {
2549 let (_, seq, _) = self
2550 .get_object_or_tombstone(request.object_id)
2551 .await?
2552 .ok_or_else(|| {
2553 IotaError::from(UserInputError::ObjectNotFound {
2554 object_id: request.object_id,
2555 version: None,
2556 })
2557 })?;
2558 seq
2559 }
2560 ObjectInfoRequestKind::PastObjectInfoDebug(seq) => seq,
2561 };
2562
2563 let object = self
2564 .get_object_store()
2565 .get_object_by_key(&request.object_id, requested_object_seq)?
2566 .ok_or_else(|| {
2567 IotaError::from(UserInputError::ObjectNotFound {
2568 object_id: request.object_id,
2569 version: Some(requested_object_seq),
2570 })
2571 })?;
2572
2573 let layout = if let (LayoutGenerationOption::Generate, Some(move_obj)) =
2574 (request.generate_layout, object.data.try_as_move())
2575 {
2576 Some(into_struct_layout(
2577 self.load_epoch_store_one_call_per_task()
2578 .executor()
2579 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
2580 .get_annotated_layout(&move_obj.type_().clone().into())?,
2581 )?)
2582 } else {
2583 None
2584 };
2585
2586 let lock = if !object.is_address_owned() {
2587 None
2589 } else {
2590 self.get_transaction_lock(&object.compute_object_reference(), &epoch_store)
2591 .await?
2592 .map(|s| s.into_inner())
2593 };
2594
2595 Ok(ObjectInfoResponse {
2596 object,
2597 layout,
2598 lock_for_debugging: lock,
2599 })
2600 }
2601
2602 #[instrument(level = "trace", skip_all)]
2603 pub fn handle_checkpoint_request(
2604 &self,
2605 request: &CheckpointRequest,
2606 ) -> IotaResult<CheckpointResponse> {
2607 let summary = if request.certified {
2608 let summary = match request.sequence_number {
2609 Some(seq) => self
2610 .checkpoint_store
2611 .get_checkpoint_by_sequence_number(seq)?,
2612 None => self.checkpoint_store.get_latest_certified_checkpoint(),
2613 }
2614 .map(|v| v.into_inner());
2615 summary.map(CheckpointSummaryResponse::Certified)
2616 } else {
2617 let summary = match request.sequence_number {
2618 Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
2619 None => self
2620 .checkpoint_store
2621 .get_latest_locally_computed_checkpoint(),
2622 };
2623 summary.map(CheckpointSummaryResponse::Pending)
2624 };
2625 let contents = match &summary {
2626 Some(s) => self
2627 .checkpoint_store
2628 .get_checkpoint_contents(&s.content_digest())?,
2629 None => None,
2630 };
2631 Ok(CheckpointResponse {
2632 checkpoint: summary,
2633 contents,
2634 })
2635 }
2636
2637 fn check_protocol_version(
2638 supported_protocol_versions: SupportedProtocolVersions,
2639 current_version: ProtocolVersion,
2640 ) {
2641 info!("current protocol version is now {:?}", current_version);
2642 info!("supported versions are: {:?}", supported_protocol_versions);
2643 if !supported_protocol_versions.is_version_supported(current_version) {
2644 let msg = format!(
2645 "Unsupported protocol version. The network is at {:?}, but this IotaNode only supports: {:?}. Shutting down.",
2646 current_version, supported_protocol_versions,
2647 );
2648
2649 error!("{}", msg);
2650 eprintln!("{}", msg);
2651
2652 #[cfg(not(msim))]
2653 std::process::exit(1);
2654
2655 #[cfg(msim)]
2656 iota_simulator::task::shutdown_current_node();
2657 }
2658 }
2659
2660 #[expect(clippy::disallowed_methods)] pub async fn new(
2662 name: AuthorityName,
2663 secret: StableSyncAuthoritySigner,
2664 supported_protocol_versions: SupportedProtocolVersions,
2665 store: Arc<AuthorityStore>,
2666 execution_cache_trait_pointers: ExecutionCacheTraitPointers,
2667 epoch_store: Arc<AuthorityPerEpochStore>,
2668 committee_store: Arc<CommitteeStore>,
2669 indexes: Option<Arc<IndexStore>>,
2670 rest_index: Option<Arc<RestIndexStore>>,
2671 checkpoint_store: Arc<CheckpointStore>,
2672 prometheus_registry: &Registry,
2673 genesis_objects: &[Object],
2674 db_checkpoint_config: &DBCheckpointConfig,
2675 config: NodeConfig,
2676 indirect_objects_threshold: usize,
2677 archive_readers: ArchiveReaderBalancer,
2678 validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
2679 ) -> Arc<Self> {
2680 Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());
2681
2682 let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
2683 let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
2684 let transaction_manager = Arc::new(TransactionManager::new(
2685 execution_cache_trait_pointers.object_cache_reader.clone(),
2686 execution_cache_trait_pointers
2687 .transaction_cache_reader
2688 .clone(),
2689 &epoch_store,
2690 tx_ready_certificates,
2691 metrics.clone(),
2692 ));
2693 let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();
2694
2695 let _authority_per_epoch_pruner = AuthorityPerEpochStorePruner::new(
2696 epoch_store.get_parent_path(),
2697 &config.authority_store_pruning_config,
2698 );
2699 let _pruner = AuthorityStorePruner::new(
2700 store.perpetual_tables.clone(),
2701 checkpoint_store.clone(),
2702 rest_index.clone(),
2703 store.objects_lock_table.clone(),
2704 config.authority_store_pruning_config.clone(),
2705 epoch_store.committee().authority_exists(&name),
2706 epoch_store.epoch_start_state().epoch_duration_ms(),
2707 prometheus_registry,
2708 indirect_objects_threshold,
2709 archive_readers,
2710 );
2711 let input_loader =
2712 TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
2713 let epoch = epoch_store.epoch();
2714 let state = Arc::new(AuthorityState {
2715 name,
2716 secret,
2717 execution_lock: RwLock::new(epoch),
2718 epoch_store: ArcSwap::new(epoch_store.clone()),
2719 input_loader,
2720 execution_cache_trait_pointers,
2721 indexes,
2722 rest_index,
2723 subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
2724 checkpoint_store,
2725 committee_store,
2726 transaction_manager,
2727 tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
2728 metrics,
2729 _pruner,
2730 _authority_per_epoch_pruner,
2731 db_checkpoint_config: db_checkpoint_config.clone(),
2732 config,
2733 overload_info: AuthorityOverloadInfo::default(),
2734 validator_tx_finalizer,
2735 });
2736
2737 let authority_state = Arc::downgrade(&state);
2739 spawn_monitored_task!(execution_process(
2740 authority_state,
2741 rx_ready_certificates,
2742 rx_execution_shutdown,
2743 ));
2744
2745 state
2747 .create_owner_index_if_empty(genesis_objects, &epoch_store)
2748 .expect("Error indexing genesis objects.");
2749
2750 state
2751 }
2752
2753 pub fn get_object_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
2755 &self.execution_cache_trait_pointers.object_cache_reader
2756 }
2757
2758 pub fn get_transaction_cache_reader(&self) -> &Arc<dyn TransactionCacheRead> {
2759 &self.execution_cache_trait_pointers.transaction_cache_reader
2760 }
2761
2762 pub fn get_cache_writer(&self) -> &Arc<dyn ExecutionCacheWrite> {
2763 &self.execution_cache_trait_pointers.cache_writer
2764 }
2765
2766 pub fn get_backing_store(&self) -> &Arc<dyn BackingStore + Send + Sync> {
2767 &self.execution_cache_trait_pointers.backing_store
2768 }
2769
2770 pub fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
2771 &self.execution_cache_trait_pointers.backing_package_store
2772 }
2773
2774 pub fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
2775 &self.execution_cache_trait_pointers.object_store
2776 }
2777
2778 pub fn get_reconfig_api(&self) -> &Arc<dyn ExecutionCacheReconfigAPI> {
2779 &self.execution_cache_trait_pointers.reconfig_api
2780 }
2781
2782 pub fn get_accumulator_store(&self) -> &Arc<dyn AccumulatorStore> {
2783 &self.execution_cache_trait_pointers.accumulator_store
2784 }
2785
2786 pub fn get_checkpoint_cache(&self) -> &Arc<dyn CheckpointCache> {
2787 &self.execution_cache_trait_pointers.checkpoint_cache
2788 }
2789
2790 pub fn get_state_sync_store(&self) -> &Arc<dyn StateSyncAPI> {
2791 &self.execution_cache_trait_pointers.state_sync_store
2792 }
2793
2794 pub fn get_cache_commit(&self) -> &Arc<dyn ExecutionCacheCommit> {
2795 &self.execution_cache_trait_pointers.cache_commit
2796 }
2797
2798 pub fn database_for_testing(&self) -> Arc<AuthorityStore> {
2799 self.execution_cache_trait_pointers
2800 .testing_api
2801 .database_for_testing()
2802 }
2803
2804 pub async fn prune_checkpoints_for_eligible_epochs_for_testing(
2805 &self,
2806 config: NodeConfig,
2807 metrics: Arc<AuthorityStorePruningMetrics>,
2808 ) -> anyhow::Result<()> {
2809 let archive_readers =
2810 ArchiveReaderBalancer::new(config.archive_reader_config(), &Registry::default())?;
2811 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
2812 &self.database_for_testing().perpetual_tables,
2813 &self.checkpoint_store,
2814 self.rest_index.as_deref(),
2815 &self.database_for_testing().objects_lock_table,
2816 config.authority_store_pruning_config,
2817 metrics,
2818 config.indirect_objects_threshold,
2819 archive_readers,
2820 EPOCH_DURATION_MS_FOR_TESTING,
2821 )
2822 .await
2823 }
2824
2825 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
2826 &self.transaction_manager
2827 }
2828
2829 pub fn enqueue_certificates_for_execution(
2831 &self,
2832 certs: Vec<VerifiedCertificate>,
2833 epoch_store: &Arc<AuthorityPerEpochStore>,
2834 ) {
2835 self.transaction_manager
2836 .enqueue_certificates(certs, epoch_store)
2837 }
2838
2839 pub(crate) fn enqueue_with_expected_effects_digest(
2840 &self,
2841 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
2842 epoch_store: &AuthorityPerEpochStore,
2843 ) {
2844 self.transaction_manager
2845 .enqueue_with_expected_effects_digest(certs, epoch_store)
2846 }
2847
2848 fn create_owner_index_if_empty(
2849 &self,
2850 genesis_objects: &[Object],
2851 epoch_store: &Arc<AuthorityPerEpochStore>,
2852 ) -> IotaResult {
2853 let Some(index_store) = &self.indexes else {
2854 return Ok(());
2855 };
2856 if !index_store.is_empty() {
2857 return Ok(());
2858 }
2859
2860 let mut new_owners = vec![];
2861 let mut new_dynamic_fields = vec![];
2862 let mut layout_resolver = epoch_store
2863 .executor()
2864 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()));
2865 for o in genesis_objects.iter() {
2866 match o.owner {
2867 Owner::AddressOwner(addr) => new_owners.push((
2868 (addr, o.id()),
2869 ObjectInfo::new(&o.compute_object_reference(), o),
2870 )),
2871 Owner::ObjectOwner(object_id) => {
2872 let id = o.id();
2873 let Some(info) = self.try_create_dynamic_field_info(
2874 o,
2875 &BTreeMap::new(),
2876 layout_resolver.as_mut(),
2877 )?
2878 else {
2879 continue;
2880 };
2881 new_dynamic_fields.push(((ObjectID::from(object_id), id), info));
2882 }
2883 _ => {}
2884 }
2885 }
2886
2887 index_store.insert_genesis_objects(ObjectIndexChanges {
2888 deleted_owners: vec![],
2889 deleted_dynamic_fields: vec![],
2890 new_owners,
2891 new_dynamic_fields,
2892 })
2893 }
2894
2895 pub async fn execution_lock_for_executable_transaction(
2899 &self,
2900 transaction: &VerifiedExecutableTransaction,
2901 ) -> IotaResult<ExecutionLockReadGuard> {
2902 let lock = self.execution_lock.read().await;
2903 if *lock == transaction.auth_sig().epoch() {
2904 Ok(lock)
2905 } else {
2906 Err(IotaError::WrongEpoch {
2907 expected_epoch: *lock,
2908 actual_epoch: transaction.auth_sig().epoch(),
2909 })
2910 }
2911 }
2912
2913 pub async fn execution_lock_for_signing(&self) -> ExecutionLockReadGuard {
2919 self.execution_lock.read().await
2920 }
2921
2922 pub async fn execution_lock_for_reconfiguration(&self) -> ExecutionLockWriteGuard {
2923 self.execution_lock.write().await
2924 }
2925
2926 #[instrument(level = "error", skip_all)]
2927 pub async fn reconfigure(
2928 &self,
2929 cur_epoch_store: &AuthorityPerEpochStore,
2930 supported_protocol_versions: SupportedProtocolVersions,
2931 new_committee: Committee,
2932 epoch_start_configuration: EpochStartConfiguration,
2933 accumulator: Arc<StateAccumulator>,
2934 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
2935 epoch_supply_change: i64,
2936 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2937 Self::check_protocol_version(
2938 supported_protocol_versions,
2939 epoch_start_configuration
2940 .epoch_start_state()
2941 .protocol_version(),
2942 );
2943 self.metrics.reset_on_reconfigure();
2944 self.committee_store.insert_new_committee(&new_committee)?;
2945
2946 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
2948
2949 cur_epoch_store.epoch_terminated().await;
2951
2952 self.revert_uncommitted_epoch_transactions(cur_epoch_store)
2958 .await?;
2959 self.get_reconfig_api()
2960 .clear_state_end_of_epoch(&execution_lock);
2961 self.check_system_consistency(
2962 cur_epoch_store,
2963 accumulator,
2964 expensive_safety_check_config,
2965 epoch_supply_change,
2966 )?;
2967 self.get_reconfig_api()
2968 .set_epoch_start_configuration(&epoch_start_configuration)?;
2969 if let Some(checkpoint_path) = &self.db_checkpoint_config.checkpoint_path {
2970 if self
2971 .db_checkpoint_config
2972 .perform_db_checkpoints_at_epoch_end
2973 {
2974 let checkpoint_indexes = self
2975 .db_checkpoint_config
2976 .perform_index_db_checkpoints_at_epoch_end
2977 .unwrap_or(false);
2978 let current_epoch = cur_epoch_store.epoch();
2979 let epoch_checkpoint_path =
2980 checkpoint_path.join(format!("epoch_{}", current_epoch));
2981 self.checkpoint_all_dbs(
2982 &epoch_checkpoint_path,
2983 cur_epoch_store,
2984 checkpoint_indexes,
2985 )?;
2986 }
2987 }
2988
2989 self.get_reconfig_api()
2990 .reconfigure_cache(&epoch_start_configuration)
2991 .await;
2992
2993 let new_epoch = new_committee.epoch;
2994 let new_epoch_store = self
2995 .reopen_epoch_db(
2996 cur_epoch_store,
2997 new_committee,
2998 epoch_start_configuration,
2999 expensive_safety_check_config,
3000 )
3001 .await?;
3002 assert_eq!(new_epoch_store.epoch(), new_epoch);
3003 self.transaction_manager.reconfigure(new_epoch);
3004 *execution_lock = new_epoch;
3005 Ok(new_epoch_store)
3009 }
3010
3011 pub async fn reconfigure_for_testing(&self) {
3016 let mut execution_lock = self.execution_lock_for_reconfiguration().await;
3017 let epoch_store = self.epoch_store_for_testing().clone();
3018 let protocol_config = epoch_store.protocol_config().clone();
3019 let _guard =
3027 ProtocolConfig::apply_overrides_for_testing(move |_, _| protocol_config.clone());
3028 let new_epoch_store = epoch_store.new_at_next_epoch_for_testing(
3029 self.get_backing_package_store().clone(),
3030 self.get_object_store().clone(),
3031 &self.config.expensive_safety_check_config,
3032 );
3033 let new_epoch = new_epoch_store.epoch();
3034 self.transaction_manager.reconfigure(new_epoch);
3035 self.epoch_store.store(new_epoch_store);
3036 epoch_store.epoch_terminated().await;
3037 *execution_lock = new_epoch;
3038 }
3039
3040 #[instrument(level = "error", skip_all)]
3041 fn check_system_consistency(
3042 &self,
3043 cur_epoch_store: &AuthorityPerEpochStore,
3044 accumulator: Arc<StateAccumulator>,
3045 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
3046 epoch_supply_change: i64,
3047 ) -> IotaResult<()> {
3048 info!(
3049 "Performing iota conservation consistency check for epoch {}",
3050 cur_epoch_store.epoch()
3051 );
3052
3053 if cfg!(debug_assertions) {
3054 cur_epoch_store.check_all_executed_transactions_in_checkpoint();
3055 }
3056
3057 self.get_reconfig_api()
3058 .expensive_check_iota_conservation(cur_epoch_store, Some(epoch_supply_change))?;
3059
3060 if expensive_safety_check_config.enable_state_consistency_check() {
3062 info!(
3063 "Performing state consistency check for epoch {}",
3064 cur_epoch_store.epoch()
3065 );
3066 self.expensive_check_is_consistent_state(
3067 accumulator,
3068 cur_epoch_store,
3069 cfg!(debug_assertions), );
3071 }
3072
3073 if expensive_safety_check_config.enable_secondary_index_checks() {
3074 if let Some(indexes) = self.indexes.clone() {
3075 verify_indexes(self.get_accumulator_store().as_ref(), indexes)
3076 .expect("secondary indexes are inconsistent");
3077 }
3078 }
3079
3080 Ok(())
3081 }
3082
3083 fn expensive_check_is_consistent_state(
3084 &self,
3085 accumulator: Arc<StateAccumulator>,
3086 cur_epoch_store: &AuthorityPerEpochStore,
3087 panic: bool,
3088 ) {
3089 let live_object_set_hash = accumulator.digest_live_object_set();
3090
3091 let root_state_hash: ECMHLiveObjectSetDigest = self
3092 .get_accumulator_store()
3093 .get_root_state_accumulator_for_epoch(cur_epoch_store.epoch())
3094 .expect("Retrieving root state hash cannot fail")
3095 .expect("Root state hash for epoch must exist")
3096 .1
3097 .digest()
3098 .into();
3099
3100 let is_inconsistent = root_state_hash != live_object_set_hash;
3101 if is_inconsistent {
3102 if panic {
3103 panic!(
3104 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3105 root_state_hash, live_object_set_hash
3106 );
3107 } else {
3108 error!(
3109 "Inconsistent state detected: root state hash: {:?}, live object set hash: {:?}",
3110 root_state_hash, live_object_set_hash
3111 );
3112 }
3113 } else {
3114 info!("State consistency check passed");
3115 }
3116
3117 if !panic {
3118 accumulator.set_inconsistent_state(is_inconsistent);
3119 }
3120 }
3121
3122 pub fn current_epoch_for_testing(&self) -> EpochId {
3123 self.epoch_store_for_testing().epoch()
3124 }
3125
3126 #[instrument(level = "error", skip_all)]
3127 pub fn checkpoint_all_dbs(
3128 &self,
3129 checkpoint_path: &Path,
3130 cur_epoch_store: &AuthorityPerEpochStore,
3131 checkpoint_indexes: bool,
3132 ) -> IotaResult {
3133 let _metrics_guard = self.metrics.db_checkpoint_latency.start_timer();
3134 let current_epoch = cur_epoch_store.epoch();
3135
3136 if checkpoint_path.exists() {
3137 info!("Skipping db checkpoint as it already exists for epoch: {current_epoch}");
3138 return Ok(());
3139 }
3140
3141 let checkpoint_path_tmp = checkpoint_path.with_extension("tmp");
3142 let store_checkpoint_path_tmp = checkpoint_path_tmp.join("store");
3143
3144 if checkpoint_path_tmp.exists() {
3145 fs::remove_dir_all(&checkpoint_path_tmp)
3146 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3147 }
3148
3149 fs::create_dir_all(&checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3150 fs::create_dir(&store_checkpoint_path_tmp).map_err(|e| IotaError::FileIO(e.to_string()))?;
3151
3152 self.checkpoint_store
3155 .checkpoint_db(&checkpoint_path_tmp.join("checkpoints"))?;
3156
3157 self.get_reconfig_api()
3158 .checkpoint_db(&store_checkpoint_path_tmp.join("perpetual"))?;
3159
3160 self.committee_store
3161 .checkpoint_db(&checkpoint_path_tmp.join("epochs"))?;
3162
3163 if checkpoint_indexes {
3164 if let Some(indexes) = self.indexes.as_ref() {
3165 indexes.checkpoint_db(&checkpoint_path_tmp.join("indexes"))?;
3166 }
3167 }
3168
3169 fs::rename(checkpoint_path_tmp, checkpoint_path)
3170 .map_err(|e| IotaError::FileIO(e.to_string()))?;
3171 Ok(())
3172 }
3173
3174 pub fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3181 self.epoch_store.load()
3182 }
3183
3184 pub fn epoch_store_for_testing(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
3186 self.load_epoch_store_one_call_per_task()
3187 }
3188
3189 pub fn clone_committee_for_testing(&self) -> Committee {
3190 Committee::clone(self.epoch_store_for_testing().committee())
3191 }
3192
3193 #[instrument(level = "trace", skip_all)]
3194 pub async fn get_object(&self, object_id: &ObjectID) -> IotaResult<Option<Object>> {
3195 self.get_object_store()
3196 .get_object(object_id)
3197 .map_err(Into::into)
3198 }
3199
3200 pub async fn get_iota_system_package_object_ref(&self) -> IotaResult<ObjectRef> {
3201 Ok(self
3202 .get_object(&IOTA_SYSTEM_ADDRESS.into())
3203 .await?
3204 .expect("framework object should always exist")
3205 .compute_object_reference())
3206 }
3207
3208 pub fn get_iota_system_state_object_for_testing(&self) -> IotaResult<IotaSystemState> {
3210 self.get_object_cache_reader()
3211 .get_iota_system_state_object_unsafe()
3212 }
3213
3214 #[instrument(level = "trace", skip_all)]
3215 pub fn get_checkpoint_by_sequence_number(
3216 &self,
3217 sequence_number: CheckpointSequenceNumber,
3218 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3219 Ok(self
3220 .checkpoint_store
3221 .get_checkpoint_by_sequence_number(sequence_number)?)
3222 }
3223
3224 #[instrument(level = "trace", skip_all)]
3225 pub fn get_transaction_checkpoint_for_tests(
3226 &self,
3227 digest: &TransactionDigest,
3228 epoch_store: &AuthorityPerEpochStore,
3229 ) -> IotaResult<Option<VerifiedCheckpoint>> {
3230 let checkpoint = epoch_store.get_transaction_checkpoint(digest)?;
3231 let Some(checkpoint) = checkpoint else {
3232 return Ok(None);
3233 };
3234 let checkpoint = self
3235 .checkpoint_store
3236 .get_checkpoint_by_sequence_number(checkpoint)?;
3237 Ok(checkpoint)
3238 }
3239
3240 #[instrument(level = "trace", skip_all)]
3241 pub fn get_object_read(&self, object_id: &ObjectID) -> IotaResult<ObjectRead> {
3242 Ok(
3243 match self
3244 .get_object_cache_reader()
3245 .get_latest_object_or_tombstone(*object_id)?
3246 {
3247 Some((_, ObjectOrTombstone::Object(object))) => {
3248 let layout = self.get_object_layout(&object)?;
3249 ObjectRead::Exists(object.compute_object_reference(), object, layout)
3250 }
3251 Some((_, ObjectOrTombstone::Tombstone(objref))) => ObjectRead::Deleted(objref),
3252 None => ObjectRead::NotExists(*object_id),
3253 },
3254 )
3255 }
3256
3257 pub fn get_chain_identifier(&self) -> Option<ChainIdentifier> {
3259 if let Some(digest) = CHAIN_IDENTIFIER.get() {
3260 return Some(*digest);
3261 }
3262
3263 let checkpoint = self
3264 .get_checkpoint_by_sequence_number(0)
3265 .tap_err(|e| error!("Failed to get genesis checkpoint: {:?}", e))
3266 .ok()?
3267 .tap_none(|| error!("Genesis checkpoint is missing from DB"))?;
3268 let _ = CHAIN_IDENTIFIER.set(ChainIdentifier::from(*checkpoint.digest()));
3270 Some(ChainIdentifier::from(*checkpoint.digest()))
3271 }
3272
3273 #[instrument(level = "trace", skip_all)]
3274 pub fn get_move_object<T>(&self, object_id: &ObjectID) -> IotaResult<T>
3275 where
3276 T: DeserializeOwned,
3277 {
3278 let o = self.get_object_read(object_id)?.into_object()?;
3279 if let Some(move_object) = o.data.try_as_move() {
3280 Ok(bcs::from_bytes(move_object.contents()).map_err(|e| {
3281 IotaError::ObjectDeserialization {
3282 error: format!("{e}"),
3283 }
3284 })?)
3285 } else {
3286 Err(IotaError::ObjectDeserialization {
3287 error: format!("Provided object : [{object_id}] is not a Move object."),
3288 })
3289 }
3290 }
3291
3292 #[instrument(level = "trace", skip_all)]
3298 pub fn get_past_object_read(
3299 &self,
3300 object_id: &ObjectID,
3301 version: SequenceNumber,
3302 ) -> IotaResult<PastObjectRead> {
3303 let Some(obj_ref) = self
3305 .get_object_cache_reader()
3306 .get_latest_object_ref_or_tombstone(*object_id)?
3307 else {
3308 return Ok(PastObjectRead::ObjectNotExists(*object_id));
3309 };
3310
3311 if version > obj_ref.1 {
3312 return Ok(PastObjectRead::VersionTooHigh {
3313 object_id: *object_id,
3314 asked_version: version,
3315 latest_version: obj_ref.1,
3316 });
3317 }
3318
3319 if version < obj_ref.1 {
3320 return Ok(match self.read_object_at_version(object_id, version)? {
3322 Some((object, layout)) => {
3323 let obj_ref = object.compute_object_reference();
3324 PastObjectRead::VersionFound(obj_ref, object, layout)
3325 }
3326
3327 None => PastObjectRead::VersionNotFound(*object_id, version),
3328 });
3329 }
3330
3331 if !obj_ref.2.is_alive() {
3332 return Ok(PastObjectRead::ObjectDeleted(obj_ref));
3333 }
3334
3335 match self.read_object_at_version(object_id, obj_ref.1)? {
3336 Some((object, layout)) => Ok(PastObjectRead::VersionFound(obj_ref, object, layout)),
3337 None => {
3338 error!(
3339 "Object with in parent_entry is missing from object store, datastore is \
3340 inconsistent",
3341 );
3342 Err(UserInputError::ObjectNotFound {
3343 object_id: *object_id,
3344 version: Some(obj_ref.1),
3345 }
3346 .into())
3347 }
3348 }
3349 }
3350
3351 #[instrument(level = "trace", skip_all)]
3352 fn read_object_at_version(
3353 &self,
3354 object_id: &ObjectID,
3355 version: SequenceNumber,
3356 ) -> IotaResult<Option<(Object, Option<MoveStructLayout>)>> {
3357 let Some(object) = self
3358 .get_object_cache_reader()
3359 .get_object_by_key(object_id, version)?
3360 else {
3361 return Ok(None);
3362 };
3363
3364 let layout = self.get_object_layout(&object)?;
3365 Ok(Some((object, layout)))
3366 }
3367
3368 fn get_object_layout(&self, object: &Object) -> IotaResult<Option<MoveStructLayout>> {
3369 let layout = object
3370 .data
3371 .try_as_move()
3372 .map(|object| {
3373 into_struct_layout(
3374 self.load_epoch_store_one_call_per_task()
3375 .executor()
3376 .type_layout_resolver(Box::new(self.get_backing_package_store().as_ref()))
3378 .get_annotated_layout(&object.type_().clone().into())?,
3379 )
3380 })
3381 .transpose()?;
3382 Ok(layout)
3383 }
3384
3385 fn get_owner_at_version(
3386 &self,
3387 object_id: &ObjectID,
3388 version: SequenceNumber,
3389 ) -> IotaResult<Owner> {
3390 self.get_object_store()
3391 .get_object_by_key(object_id, version)?
3392 .ok_or_else(|| {
3393 IotaError::from(UserInputError::ObjectNotFound {
3394 object_id: *object_id,
3395 version: Some(version),
3396 })
3397 })
3398 .map(|o| o.owner)
3399 }
3400
3401 #[instrument(level = "trace", skip_all)]
3402 pub fn get_owner_objects(
3403 &self,
3404 owner: IotaAddress,
3405 cursor: Option<ObjectID>,
3407 limit: usize,
3408 filter: Option<IotaObjectDataFilter>,
3409 ) -> IotaResult<Vec<ObjectInfo>> {
3410 if let Some(indexes) = &self.indexes {
3411 indexes.get_owner_objects(owner, cursor, limit, filter)
3412 } else {
3413 Err(IotaError::IndexStoreNotAvailable)
3414 }
3415 }
3416
3417 #[instrument(level = "trace", skip_all)]
3418 pub fn get_owned_coins_iterator_with_cursor(
3419 &self,
3420 owner: IotaAddress,
3421 cursor: (String, ObjectID),
3423 limit: usize,
3424 one_coin_type_only: bool,
3425 ) -> IotaResult<impl Iterator<Item = (String, ObjectID, CoinInfo)> + '_> {
3426 if let Some(indexes) = &self.indexes {
3427 indexes.get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)
3428 } else {
3429 Err(IotaError::IndexStoreNotAvailable)
3430 }
3431 }
3432
3433 #[instrument(level = "trace", skip_all)]
3434 pub fn get_owner_objects_iterator(
3435 &self,
3436 owner: IotaAddress,
3437 cursor: Option<ObjectID>,
3439 filter: Option<IotaObjectDataFilter>,
3440 ) -> IotaResult<impl Iterator<Item = ObjectInfo> + '_> {
3441 let cursor_u = cursor.unwrap_or(ObjectID::ZERO);
3442 if let Some(indexes) = &self.indexes {
3443 indexes.get_owner_objects_iterator(owner, cursor_u, filter)
3444 } else {
3445 Err(IotaError::IndexStoreNotAvailable)
3446 }
3447 }
3448
3449 #[instrument(level = "trace", skip_all)]
3450 pub async fn get_move_objects<T>(
3451 &self,
3452 owner: IotaAddress,
3453 type_: MoveObjectType,
3454 ) -> IotaResult<Vec<T>>
3455 where
3456 T: DeserializeOwned,
3457 {
3458 let object_ids = self
3459 .get_owner_objects_iterator(owner, None, None)?
3460 .filter(|o| match &o.type_ {
3461 ObjectType::Struct(s) => &type_ == s,
3462 ObjectType::Package => false,
3463 })
3464 .map(|info| ObjectKey(info.object_id, info.version))
3465 .collect::<Vec<_>>();
3466 let mut move_objects = vec![];
3467
3468 let objects = self
3469 .get_object_store()
3470 .multi_get_objects_by_key(&object_ids)?;
3471
3472 for (o, id) in objects.into_iter().zip(object_ids) {
3473 let object = o.ok_or_else(|| {
3474 IotaError::from(UserInputError::ObjectNotFound {
3475 object_id: id.0,
3476 version: Some(id.1),
3477 })
3478 })?;
3479 let move_object = object.data.try_as_move().ok_or_else(|| {
3480 IotaError::from(UserInputError::MovePackageAsObject { object_id: id.0 })
3481 })?;
3482 move_objects.push(bcs::from_bytes(move_object.contents()).map_err(|e| {
3483 IotaError::ObjectDeserialization {
3484 error: format!("{e}"),
3485 }
3486 })?);
3487 }
3488 Ok(move_objects)
3489 }
3490
3491 #[instrument(level = "trace", skip_all)]
3492 pub fn get_dynamic_fields(
3493 &self,
3494 owner: ObjectID,
3495 cursor: Option<ObjectID>,
3497 limit: usize,
3498 ) -> IotaResult<Vec<(ObjectID, DynamicFieldInfo)>> {
3499 Ok(self
3500 .get_dynamic_fields_iterator(owner, cursor)?
3501 .take(limit)
3502 .collect::<Result<Vec<_>, _>>()?)
3503 }
3504
3505 fn get_dynamic_fields_iterator(
3506 &self,
3507 owner: ObjectID,
3508 cursor: Option<ObjectID>,
3510 ) -> IotaResult<impl Iterator<Item = Result<(ObjectID, DynamicFieldInfo), TypedStoreError>> + '_>
3511 {
3512 if let Some(indexes) = &self.indexes {
3513 indexes.get_dynamic_fields_iterator(owner, cursor)
3514 } else {
3515 Err(IotaError::IndexStoreNotAvailable)
3516 }
3517 }
3518
3519 #[instrument(level = "trace", skip_all)]
3520 pub fn get_dynamic_field_object_id(
3521 &self,
3522 owner: ObjectID,
3523 name_type: TypeTag,
3524 name_bcs_bytes: &[u8],
3525 ) -> IotaResult<Option<ObjectID>> {
3526 if let Some(indexes) = &self.indexes {
3527 indexes.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)
3528 } else {
3529 Err(IotaError::IndexStoreNotAvailable)
3530 }
3531 }
3532
3533 #[instrument(level = "trace", skip_all)]
3534 pub fn get_total_transaction_blocks(&self) -> IotaResult<u64> {
3535 Ok(self.get_indexes()?.next_sequence_number())
3536 }
3537
3538 #[instrument(level = "trace", skip_all)]
3539 pub async fn get_executed_transaction_and_effects(
3540 &self,
3541 digest: TransactionDigest,
3542 kv_store: Arc<TransactionKeyValueStore>,
3543 ) -> IotaResult<(Transaction, TransactionEffects)> {
3544 let transaction = kv_store.get_tx(digest).await?;
3545 let effects = kv_store.get_fx_by_tx_digest(digest).await?;
3546 Ok((transaction, effects))
3547 }
3548
3549 #[instrument(level = "trace", skip_all)]
3550 pub fn multi_get_checkpoint_by_sequence_number(
3551 &self,
3552 sequence_numbers: &[CheckpointSequenceNumber],
3553 ) -> IotaResult<Vec<Option<VerifiedCheckpoint>>> {
3554 Ok(self
3555 .checkpoint_store
3556 .multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
3557 }
3558
3559 #[instrument(level = "trace", skip_all)]
3560 pub fn get_transaction_events(
3561 &self,
3562 digest: &TransactionEventsDigest,
3563 ) -> IotaResult<TransactionEvents> {
3564 self.get_transaction_cache_reader()
3565 .get_events(digest)?
3566 .ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
3567 }
3568
3569 pub fn get_transaction_input_objects(
3570 &self,
3571 effects: &TransactionEffects,
3572 ) -> anyhow::Result<Vec<Object>> {
3573 let input_object_keys = effects
3574 .modified_at_versions()
3575 .into_iter()
3576 .map(|(object_id, version)| ObjectKey(object_id, version))
3577 .collect::<Vec<_>>();
3578
3579 let input_objects = self
3580 .get_object_store()
3581 .multi_get_objects_by_key(&input_object_keys)?
3582 .into_iter()
3583 .enumerate()
3584 .map(|(idx, maybe_object)| {
3585 maybe_object.ok_or_else(|| {
3586 anyhow::anyhow!(
3587 "missing input object key {:?} from tx {}",
3588 input_object_keys[idx],
3589 effects.transaction_digest()
3590 )
3591 })
3592 })
3593 .collect::<anyhow::Result<Vec<_>>>()?;
3594 Ok(input_objects)
3595 }
3596
3597 pub fn get_transaction_output_objects(
3598 &self,
3599 effects: &TransactionEffects,
3600 ) -> anyhow::Result<Vec<Object>> {
3601 let output_object_keys = effects
3602 .all_changed_objects()
3603 .into_iter()
3604 .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
3605 .collect::<Vec<_>>();
3606
3607 let output_objects = self
3608 .get_object_store()
3609 .multi_get_objects_by_key(&output_object_keys)?
3610 .into_iter()
3611 .enumerate()
3612 .map(|(idx, maybe_object)| {
3613 maybe_object.ok_or_else(|| {
3614 anyhow::anyhow!(
3615 "missing output object key {:?} from tx {}",
3616 output_object_keys[idx],
3617 effects.transaction_digest()
3618 )
3619 })
3620 })
3621 .collect::<anyhow::Result<Vec<_>>>()?;
3622 Ok(output_objects)
3623 }
3624
3625 fn get_indexes(&self) -> IotaResult<Arc<IndexStore>> {
3626 match &self.indexes {
3627 Some(i) => Ok(i.clone()),
3628 None => Err(IotaError::UnsupportedFeature {
3629 error: "extended object indexing is not enabled on this server".into(),
3630 }),
3631 }
3632 }
3633
3634 pub async fn get_transactions_for_tests(
3635 self: &Arc<Self>,
3636 filter: Option<TransactionFilter>,
3637 cursor: Option<TransactionDigest>,
3638 limit: Option<usize>,
3639 reverse: bool,
3640 ) -> IotaResult<Vec<TransactionDigest>> {
3641 let metrics = KeyValueStoreMetrics::new_for_tests();
3642 let kv_store = Arc::new(TransactionKeyValueStore::new(
3643 "rocksdb",
3644 metrics,
3645 self.clone(),
3646 ));
3647 self.get_transactions(&kv_store, filter, cursor, limit, reverse)
3648 .await
3649 }
3650
3651 #[instrument(level = "trace", skip_all)]
3652 pub async fn get_transactions(
3653 &self,
3654 kv_store: &Arc<TransactionKeyValueStore>,
3655 filter: Option<TransactionFilter>,
3656 cursor: Option<TransactionDigest>,
3658 limit: Option<usize>,
3659 reverse: bool,
3660 ) -> IotaResult<Vec<TransactionDigest>> {
3661 if let Some(TransactionFilter::Checkpoint(sequence_number)) = filter {
3662 let checkpoint_contents = kv_store.get_checkpoint_contents(sequence_number).await?;
3663 let iter = checkpoint_contents.iter().map(|c| c.transaction);
3664 if reverse {
3665 let iter = iter
3666 .rev()
3667 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3668 .skip(usize::from(cursor.is_some()));
3669 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3670 } else {
3671 let iter = iter
3672 .skip_while(|d| cursor.is_some() && Some(*d) != cursor)
3673 .skip(usize::from(cursor.is_some()));
3674 return Ok(iter.take(limit.unwrap_or(usize::MAX)).collect());
3675 }
3676 }
3677 self.get_indexes()?
3678 .get_transactions(filter, cursor, limit, reverse)
3679 }
3680
3681 pub fn get_checkpoint_store(&self) -> &Arc<CheckpointStore> {
3682 &self.checkpoint_store
3683 }
3684
3685 pub fn get_latest_checkpoint_sequence_number(&self) -> IotaResult<CheckpointSequenceNumber> {
3686 self.get_checkpoint_store()
3687 .get_highest_executed_checkpoint_seq_number()?
3688 .ok_or(IotaError::UserInput {
3689 error: UserInputError::LatestCheckpointSequenceNumberNotFound,
3690 })
3691 }
3692
3693 #[cfg(msim)]
3694 pub fn get_highest_pruned_checkpoint_for_testing(
3695 &self,
3696 ) -> IotaResult<CheckpointSequenceNumber> {
3697 self.database_for_testing()
3698 .perpetual_tables
3699 .get_highest_pruned_checkpoint()
3700 }
3701
3702 #[instrument(level = "trace", skip_all)]
3703 pub fn get_checkpoint_summary_by_sequence_number(
3704 &self,
3705 sequence_number: CheckpointSequenceNumber,
3706 ) -> IotaResult<CheckpointSummary> {
3707 let verified_checkpoint = self
3708 .get_checkpoint_store()
3709 .get_checkpoint_by_sequence_number(sequence_number)?;
3710 match verified_checkpoint {
3711 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3712 None => Err(IotaError::UserInput {
3713 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3714 }),
3715 }
3716 }
3717
3718 #[instrument(level = "trace", skip_all)]
3719 pub fn get_checkpoint_summary_by_digest(
3720 &self,
3721 digest: CheckpointDigest,
3722 ) -> IotaResult<CheckpointSummary> {
3723 let verified_checkpoint = self
3724 .get_checkpoint_store()
3725 .get_checkpoint_by_digest(&digest)?;
3726 match verified_checkpoint {
3727 Some(verified_checkpoint) => Ok(verified_checkpoint.into_inner().into_data()),
3728 None => Err(IotaError::UserInput {
3729 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3730 }),
3731 }
3732 }
3733
3734 #[instrument(level = "trace", skip_all)]
3735 pub fn find_publish_txn_digest(&self, package_id: ObjectID) -> IotaResult<TransactionDigest> {
3736 if is_system_package(package_id) {
3737 return self.find_genesis_txn_digest();
3738 }
3739 Ok(self
3740 .get_object_read(&package_id)?
3741 .into_object()?
3742 .previous_transaction)
3743 }
3744
3745 #[instrument(level = "trace", skip_all)]
3746 pub fn find_genesis_txn_digest(&self) -> IotaResult<TransactionDigest> {
3747 let summary = self
3748 .get_verified_checkpoint_by_sequence_number(0)?
3749 .into_message();
3750 let content = self.get_checkpoint_contents(summary.content_digest)?;
3751 let genesis_transaction = content.enumerate_transactions(&summary).next();
3752 Ok(genesis_transaction
3753 .ok_or(IotaError::UserInput {
3754 error: UserInputError::GenesisTransactionNotFound,
3755 })?
3756 .1
3757 .transaction)
3758 }
3759
3760 #[instrument(level = "trace", skip_all)]
3761 pub fn get_verified_checkpoint_by_sequence_number(
3762 &self,
3763 sequence_number: CheckpointSequenceNumber,
3764 ) -> IotaResult<VerifiedCheckpoint> {
3765 let verified_checkpoint = self
3766 .get_checkpoint_store()
3767 .get_checkpoint_by_sequence_number(sequence_number)?;
3768 match verified_checkpoint {
3769 Some(verified_checkpoint) => Ok(verified_checkpoint),
3770 None => Err(IotaError::UserInput {
3771 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3772 }),
3773 }
3774 }
3775
3776 #[instrument(level = "trace", skip_all)]
3777 pub fn get_verified_checkpoint_summary_by_digest(
3778 &self,
3779 digest: CheckpointDigest,
3780 ) -> IotaResult<VerifiedCheckpoint> {
3781 let verified_checkpoint = self
3782 .get_checkpoint_store()
3783 .get_checkpoint_by_digest(&digest)?;
3784 match verified_checkpoint {
3785 Some(verified_checkpoint) => Ok(verified_checkpoint),
3786 None => Err(IotaError::UserInput {
3787 error: UserInputError::VerifiedCheckpointDigestNotFound(Base58::encode(digest)),
3788 }),
3789 }
3790 }
3791
3792 #[instrument(level = "trace", skip_all)]
3793 pub fn get_checkpoint_contents(
3794 &self,
3795 digest: CheckpointContentsDigest,
3796 ) -> IotaResult<CheckpointContents> {
3797 self.get_checkpoint_store()
3798 .get_checkpoint_contents(&digest)?
3799 .ok_or(IotaError::UserInput {
3800 error: UserInputError::CheckpointContentsNotFound(digest),
3801 })
3802 }
3803
3804 #[instrument(level = "trace", skip_all)]
3805 pub fn get_checkpoint_contents_by_sequence_number(
3806 &self,
3807 sequence_number: CheckpointSequenceNumber,
3808 ) -> IotaResult<CheckpointContents> {
3809 let verified_checkpoint = self
3810 .get_checkpoint_store()
3811 .get_checkpoint_by_sequence_number(sequence_number)?;
3812 match verified_checkpoint {
3813 Some(verified_checkpoint) => {
3814 let content_digest = verified_checkpoint.into_inner().content_digest;
3815 self.get_checkpoint_contents(content_digest)
3816 }
3817 None => Err(IotaError::UserInput {
3818 error: UserInputError::VerifiedCheckpointNotFound(sequence_number),
3819 }),
3820 }
3821 }
3822
3823 #[instrument(level = "trace", skip_all)]
3824 pub async fn query_events(
3825 &self,
3826 kv_store: &Arc<TransactionKeyValueStore>,
3827 query: EventFilter,
3828 cursor: Option<EventID>,
3830 limit: usize,
3831 descending: bool,
3832 ) -> IotaResult<Vec<IotaEvent>> {
3833 let index_store = self.get_indexes()?;
3834
3835 let (tx_num, event_num) = if let Some(cursor) = cursor.as_ref() {
3837 let tx_seq = index_store.get_transaction_seq(&cursor.tx_digest)?.ok_or(
3838 IotaError::TransactionNotFound {
3839 digest: cursor.tx_digest,
3840 },
3841 )?;
3842 (tx_seq, cursor.event_seq as usize)
3843 } else if descending {
3844 (u64::MAX, usize::MAX)
3845 } else {
3846 (0, 0)
3847 };
3848
3849 let limit = limit + 1;
3850 let mut event_keys = match query {
3851 EventFilter::All(filters) => {
3852 if filters.is_empty() {
3853 index_store.all_events(tx_num, event_num, limit, descending)?
3854 } else {
3855 return Err(IotaError::UserInput {
3856 error: UserInputError::Unsupported(
3857 "This query type does not currently support filter combinations"
3858 .to_string(),
3859 ),
3860 });
3861 }
3862 }
3863 EventFilter::Transaction(digest) => {
3864 index_store.events_by_transaction(&digest, tx_num, event_num, limit, descending)?
3865 }
3866 EventFilter::MoveModule { package, module } => {
3867 let module_id = ModuleId::new(package.into(), module);
3868 index_store.events_by_module_id(&module_id, tx_num, event_num, limit, descending)?
3869 }
3870 EventFilter::MoveEventType(struct_name) => index_store
3871 .events_by_move_event_struct_name(
3872 &struct_name,
3873 tx_num,
3874 event_num,
3875 limit,
3876 descending,
3877 )?,
3878 EventFilter::Sender(sender) => {
3879 index_store.events_by_sender(&sender, tx_num, event_num, limit, descending)?
3880 }
3881 EventFilter::TimeRange {
3882 start_time,
3883 end_time,
3884 } => index_store
3885 .event_iterator(start_time, end_time, tx_num, event_num, limit, descending)?,
3886 EventFilter::MoveEventModule { package, module } => index_store
3887 .events_by_move_event_module(
3888 &ModuleId::new(package.into(), module),
3889 tx_num,
3890 event_num,
3891 limit,
3892 descending,
3893 )?,
3894 EventFilter::Package(_)
3896 | EventFilter::MoveEventField { .. }
3897 | EventFilter::Any(_)
3898 | EventFilter::And(_, _)
3899 | EventFilter::Or(_, _) => {
3900 return Err(IotaError::UserInput {
3901 error: UserInputError::Unsupported(
3902 "This query type is not supported by the full node.".to_string(),
3903 ),
3904 });
3905 }
3906 };
3907
3908 if cursor.is_some() {
3911 if !event_keys.is_empty() {
3912 event_keys.remove(0);
3913 }
3914 } else {
3915 event_keys.truncate(limit - 1);
3916 }
3917
3918 let transaction_digests = event_keys
3920 .iter()
3921 .map(|(_, digest, _, _)| *digest)
3922 .collect::<HashSet<_>>()
3923 .into_iter()
3924 .collect::<Vec<_>>();
3925
3926 let events = kv_store
3927 .multi_get_events_by_tx_digests(&transaction_digests)
3928 .await?;
3929
3930 let events_map: HashMap<_, _> =
3931 transaction_digests.iter().zip(events.into_iter()).collect();
3932
3933 let stored_events = event_keys
3934 .into_iter()
3935 .map(|k| {
3936 (
3937 k,
3938 events_map
3939 .get(&k.1)
3940 .expect("fetched digest is missing")
3941 .clone()
3942 .and_then(|e| e.data.get(k.2).cloned()),
3943 )
3944 })
3945 .map(|((digest, tx_digest, event_seq, timestamp), event)| {
3946 event
3947 .map(|e| (e, tx_digest, event_seq, timestamp))
3948 .ok_or(IotaError::TransactionEventsNotFound { digest })
3949 })
3950 .collect::<Result<Vec<_>, _>>()?;
3951
3952 let epoch_store = self.load_epoch_store_one_call_per_task();
3953 let backing_store = self.get_backing_package_store().as_ref();
3954 let mut layout_resolver = epoch_store
3955 .executor()
3956 .type_layout_resolver(Box::new(backing_store));
3957 let mut events = vec![];
3958 for (e, tx_digest, event_seq, timestamp) in stored_events.into_iter() {
3959 events.push(IotaEvent::try_from(
3960 e.clone(),
3961 tx_digest,
3962 event_seq as u64,
3963 Some(timestamp),
3964 layout_resolver.get_annotated_layout(&e.type_)?,
3965 )?)
3966 }
3967 Ok(events)
3968 }
3969
3970 pub async fn insert_genesis_object(&self, object: Object) {
3971 self.get_reconfig_api()
3972 .insert_genesis_object(object)
3973 .expect("Cannot insert genesis object")
3974 }
3975
3976 pub async fn insert_genesis_objects(&self, objects: &[Object]) {
3977 futures::future::join_all(
3978 objects
3979 .iter()
3980 .map(|o| self.insert_genesis_object(o.clone())),
3981 )
3982 .await;
3983 }
3984
3985 #[instrument(level = "trace", skip_all)]
3987 pub fn get_transaction_status(
3988 &self,
3989 transaction_digest: &TransactionDigest,
3990 epoch_store: &Arc<AuthorityPerEpochStore>,
3991 ) -> IotaResult<Option<(SenderSignedData, TransactionStatus)>> {
3992 if let Some(effects) =
3994 self.get_signed_effects_and_maybe_resign(transaction_digest, epoch_store)?
3995 {
3996 if let Some(transaction) = self
3997 .get_transaction_cache_reader()
3998 .get_transaction_block(transaction_digest)?
3999 {
4000 let cert_sig = epoch_store.get_transaction_cert_sig(transaction_digest)?;
4001 let events = if let Some(digest) = effects.events_digest() {
4002 self.get_transaction_events(digest)?
4003 } else {
4004 TransactionEvents::default()
4005 };
4006 return Ok(Some((
4007 (*transaction).clone().into_message(),
4008 TransactionStatus::Executed(cert_sig, effects.into_inner(), events),
4009 )));
4010 } else {
4011 debug!(tx_digest=?transaction_digest, "Signed effects exist but no transaction found");
4016 }
4017 }
4018 if let Some(signed) = epoch_store.get_signed_transaction(transaction_digest)? {
4019 self.metrics.tx_already_processed.inc();
4020 let (transaction, sig) = signed.into_inner().into_data_and_sig();
4021 Ok(Some((transaction, TransactionStatus::Signed(sig))))
4022 } else {
4023 Ok(None)
4024 }
4025 }
4026
4027 #[instrument(level = "trace", skip_all)]
4031 pub fn get_signed_effects_and_maybe_resign(
4032 &self,
4033 transaction_digest: &TransactionDigest,
4034 epoch_store: &Arc<AuthorityPerEpochStore>,
4035 ) -> IotaResult<Option<VerifiedSignedTransactionEffects>> {
4036 let effects = self
4037 .get_transaction_cache_reader()
4038 .get_executed_effects(transaction_digest)?;
4039 match effects {
4040 Some(effects) => Ok(Some(self.sign_effects(effects, epoch_store)?)),
4041 None => Ok(None),
4042 }
4043 }
4044
4045 #[instrument(level = "trace", skip_all)]
4046 pub(crate) fn sign_effects(
4047 &self,
4048 effects: TransactionEffects,
4049 epoch_store: &Arc<AuthorityPerEpochStore>,
4050 ) -> IotaResult<VerifiedSignedTransactionEffects> {
4051 let tx_digest = *effects.transaction_digest();
4052 let signed_effects = match epoch_store.get_effects_signature(&tx_digest)? {
4053 Some(sig) if sig.epoch == epoch_store.epoch() => {
4054 SignedTransactionEffects::new_from_data_and_sig(effects, sig)
4055 }
4056 _ => {
4057 debug!(
4080 ?tx_digest,
4081 epoch=?epoch_store.epoch(),
4082 "Re-signing the effects with the current epoch"
4083 );
4084
4085 let sig = AuthoritySignInfo::new(
4086 epoch_store.epoch(),
4087 &effects,
4088 Intent::iota_app(IntentScope::TransactionEffects),
4089 self.name,
4090 &*self.secret,
4091 );
4092
4093 let effects = SignedTransactionEffects::new_from_data_and_sig(effects, sig.clone());
4094
4095 epoch_store.insert_effects_digest_and_signature(
4096 &tx_digest,
4097 effects.digest(),
4098 &sig,
4099 )?;
4100
4101 effects
4102 }
4103 };
4104
4105 Ok(VerifiedSignedTransactionEffects::new_unchecked(
4106 signed_effects,
4107 ))
4108 }
4109
4110 #[instrument(level = "trace", skip_all)]
4112 fn fullnode_only_get_tx_coins_for_indexing(
4113 &self,
4114 inner_temporary_store: &InnerTemporaryStore,
4115 epoch_store: &Arc<AuthorityPerEpochStore>,
4116 ) -> Option<TxCoins> {
4117 if self.indexes.is_none() || self.is_validator(epoch_store) {
4118 return None;
4119 }
4120 let written_coin_objects = inner_temporary_store
4121 .written
4122 .iter()
4123 .filter_map(|(k, v)| {
4124 if v.is_coin() {
4125 Some((*k, v.clone()))
4126 } else {
4127 None
4128 }
4129 })
4130 .collect();
4131 let input_coin_objects = inner_temporary_store
4132 .input_objects
4133 .iter()
4134 .filter_map(|(k, v)| {
4135 if v.is_coin() {
4136 Some((*k, v.clone()))
4137 } else {
4138 None
4139 }
4140 })
4141 .collect::<ObjectMap>();
4142 Some((input_coin_objects, written_coin_objects))
4143 }
4144
4145 #[instrument(level = "trace", skip_all)]
4157 pub async fn get_transaction_lock(
4158 &self,
4159 object_ref: &ObjectRef,
4160 epoch_store: &AuthorityPerEpochStore,
4161 ) -> IotaResult<Option<VerifiedSignedTransaction>> {
4162 let lock_info = self
4163 .get_object_cache_reader()
4164 .get_lock(*object_ref, epoch_store)?;
4165 let lock_info = match lock_info {
4166 ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
4167 return Err(UserInputError::ObjectVersionUnavailableForConsumption {
4168 provided_obj_ref: *object_ref,
4169 current_version: locked_ref.1,
4170 }
4171 .into());
4172 }
4173 ObjectLockStatus::Initialized => {
4174 return Ok(None);
4175 }
4176 ObjectLockStatus::LockedToTx { locked_by_tx } => locked_by_tx,
4177 };
4178
4179 epoch_store.get_signed_transaction(&lock_info)
4180 }
4181
4182 pub async fn get_objects(&self, objects: &[ObjectID]) -> IotaResult<Vec<Option<Object>>> {
4183 self.get_object_cache_reader().get_objects(objects)
4184 }
4185
4186 pub async fn get_object_or_tombstone(
4187 &self,
4188 object_id: ObjectID,
4189 ) -> IotaResult<Option<ObjectRef>> {
4190 self.get_object_cache_reader()
4191 .get_latest_object_ref_or_tombstone(object_id)
4192 }
4193
4194 pub fn set_override_protocol_upgrade_buffer_stake(
4204 &self,
4205 expected_epoch: EpochId,
4206 buffer_stake_bps: u64,
4207 ) -> IotaResult {
4208 let epoch_store = self.load_epoch_store_one_call_per_task();
4209 let actual_epoch = epoch_store.epoch();
4210 if actual_epoch != expected_epoch {
4211 return Err(IotaError::WrongEpoch {
4212 expected_epoch,
4213 actual_epoch,
4214 });
4215 }
4216
4217 epoch_store.set_override_protocol_upgrade_buffer_stake(buffer_stake_bps)
4218 }
4219
4220 pub fn clear_override_protocol_upgrade_buffer_stake(
4221 &self,
4222 expected_epoch: EpochId,
4223 ) -> IotaResult {
4224 let epoch_store = self.load_epoch_store_one_call_per_task();
4225 let actual_epoch = epoch_store.epoch();
4226 if actual_epoch != expected_epoch {
4227 return Err(IotaError::WrongEpoch {
4228 expected_epoch,
4229 actual_epoch,
4230 });
4231 }
4232
4233 epoch_store.clear_override_protocol_upgrade_buffer_stake()
4234 }
4235
4236 pub async fn get_available_system_packages(
4240 &self,
4241 binary_config: &BinaryConfig,
4242 ) -> Vec<ObjectRef> {
4243 let mut results = vec![];
4244
4245 let system_packages = BuiltInFramework::iter_system_packages();
4246
4247 #[cfg(msim)]
4249 let extra_packages = framework_injection::get_extra_packages(self.name);
4250 #[cfg(msim)]
4251 let system_packages = system_packages.map(|p| p).chain(extra_packages.iter());
4252
4253 for system_package in system_packages {
4254 let modules = system_package.modules().to_vec();
4255 #[cfg(msim)]
4257 let modules = framework_injection::get_override_modules(&system_package.id, self.name)
4258 .unwrap_or(modules);
4259
4260 let Some(obj_ref) = iota_framework::compare_system_package(
4261 &self.get_object_store(),
4262 &system_package.id,
4263 &modules,
4264 system_package.dependencies.to_vec(),
4265 binary_config,
4266 )
4267 .await
4268 else {
4269 return vec![];
4270 };
4271 results.push(obj_ref);
4272 }
4273
4274 results
4275 }
4276
4277 async fn get_system_package_bytes(
4294 &self,
4295 system_packages: Vec<ObjectRef>,
4296 binary_config: &BinaryConfig,
4297 ) -> Option<Vec<(SequenceNumber, Vec<Vec<u8>>, Vec<ObjectID>)>> {
4298 let ids: Vec<_> = system_packages.iter().map(|(id, _, _)| *id).collect();
4299 let objects = self.get_objects(&ids).await.expect("read cannot fail");
4300
4301 let mut res = Vec::with_capacity(system_packages.len());
4302 for (system_package_ref, object) in system_packages.into_iter().zip(objects.iter()) {
4303 let prev_transaction = match object {
4304 Some(cur_object) if cur_object.compute_object_reference() == system_package_ref => {
4305 info!("Framework {} does not need updating", system_package_ref.0);
4307 continue;
4308 }
4309
4310 Some(cur_object) => cur_object.previous_transaction,
4311 None => TransactionDigest::genesis_marker(),
4312 };
4313
4314 #[cfg(msim)]
4315 let SystemPackage {
4316 id: _,
4317 bytes,
4318 dependencies,
4319 } = framework_injection::get_override_system_package(&system_package_ref.0, self.name)
4320 .unwrap_or_else(|| {
4321 BuiltInFramework::get_package_by_id(&system_package_ref.0).clone()
4322 });
4323
4324 #[cfg(not(msim))]
4325 let SystemPackage {
4326 id: _,
4327 bytes,
4328 dependencies,
4329 } = BuiltInFramework::get_package_by_id(&system_package_ref.0).clone();
4330
4331 let modules: Vec<_> = bytes
4332 .iter()
4333 .map(|m| CompiledModule::deserialize_with_config(m, binary_config).unwrap())
4334 .collect();
4335
4336 let new_object = Object::new_system_package(
4337 &modules,
4338 system_package_ref.1,
4339 dependencies.clone(),
4340 prev_transaction,
4341 );
4342
4343 let new_ref = new_object.compute_object_reference();
4344 if new_ref != system_package_ref {
4345 error!(
4346 "Framework mismatch -- binary: {new_ref:?}\n upgrade: {system_package_ref:?}"
4347 );
4348 return None;
4349 }
4350
4351 res.push((system_package_ref.1, bytes, dependencies));
4352 }
4353
4354 Some(res)
4355 }
4356
4357 fn is_protocol_version_supported_v1(
4361 proposed_protocol_version: ProtocolVersion,
4362 committee: &Committee,
4363 capabilities: Vec<AuthorityCapabilitiesV1>,
4364 mut buffer_stake_bps: u64,
4365 ) -> Option<(ProtocolVersion, Vec<ObjectRef>)> {
4366 if buffer_stake_bps > 10000 {
4367 warn!("clamping buffer_stake_bps to 10000");
4368 buffer_stake_bps = 10000;
4369 }
4370
4371 let mut desired_upgrades: Vec<_> = capabilities
4374 .into_iter()
4375 .filter_map(|mut cap| {
4376 if cap.available_system_packages.is_empty() {
4378 return None;
4379 }
4380
4381 cap.available_system_packages.sort();
4382
4383 info!(
4384 "validator {:?} supports {:?} with system packages: {:?}",
4385 cap.authority.concise(),
4386 cap.supported_protocol_versions,
4387 cap.available_system_packages,
4388 );
4389
4390 cap.supported_protocol_versions
4394 .get_version_digest(proposed_protocol_version)
4395 .map(|digest| (digest, cap.available_system_packages, cap.authority))
4396 })
4397 .collect();
4398
4399 desired_upgrades.sort();
4402 desired_upgrades
4403 .into_iter()
4404 .chunk_by(|(digest, packages, _authority)| (*digest, packages.clone()))
4405 .into_iter()
4406 .find_map(|((digest, packages), group)| {
4407 assert!(!packages.is_empty());
4409
4410 let mut stake_aggregator: StakeAggregator<(), true> =
4411 StakeAggregator::new(Arc::new(committee.clone()));
4412
4413 for (_, _, authority) in group {
4414 stake_aggregator.insert_generic(authority, ());
4415 }
4416
4417 let total_votes = stake_aggregator.total_votes();
4418 let quorum_threshold = committee.quorum_threshold();
4419 let f = committee.total_votes() - committee.quorum_threshold();
4420
4421 let buffer_stake = (f * buffer_stake_bps).div_ceil(10000);
4423 let effective_threshold = quorum_threshold + buffer_stake;
4424
4425 info!(
4426 protocol_config_digest = ?digest,
4427 ?total_votes,
4428 ?quorum_threshold,
4429 ?buffer_stake_bps,
4430 ?effective_threshold,
4431 ?proposed_protocol_version,
4432 ?packages,
4433 "support for upgrade"
4434 );
4435
4436 let has_support = total_votes >= effective_threshold;
4437 has_support.then_some((proposed_protocol_version, packages))
4438 })
4439 }
4440
4441 fn choose_protocol_version_and_system_packages_v1(
4445 current_protocol_version: ProtocolVersion,
4446 committee: &Committee,
4447 capabilities: Vec<AuthorityCapabilitiesV1>,
4448 buffer_stake_bps: u64,
4449 ) -> (ProtocolVersion, Vec<ObjectRef>) {
4450 let mut next_protocol_version = current_protocol_version;
4451 let mut system_packages = vec![];
4452
4453 while let Some((version, packages)) = Self::is_protocol_version_supported_v1(
4457 next_protocol_version + 1,
4458 committee,
4459 capabilities.clone(),
4460 buffer_stake_bps,
4461 ) {
4462 next_protocol_version = version;
4463 system_packages = packages;
4464 }
4465
4466 (next_protocol_version, system_packages)
4467 }
4468
4469 #[instrument(level = "debug", skip_all)]
4470 fn create_authenticator_state_tx(
4471 &self,
4472 epoch_store: &Arc<AuthorityPerEpochStore>,
4473 ) -> Option<EndOfEpochTransactionKind> {
4474 if !epoch_store.protocol_config().enable_jwk_consensus_updates() {
4475 info!("authenticator state transactions not enabled");
4476 return None;
4477 }
4478
4479 let authenticator_state_exists = epoch_store.authenticator_state_exists();
4480 let tx = if authenticator_state_exists {
4481 let next_epoch = epoch_store.epoch().checked_add(1).expect("epoch overflow");
4482 let min_epoch =
4483 next_epoch.saturating_sub(epoch_store.protocol_config().max_age_of_jwk_in_epochs());
4484 let authenticator_obj_initial_shared_version = epoch_store
4485 .epoch_start_config()
4486 .authenticator_obj_initial_shared_version()
4487 .expect("initial version must exist");
4488
4489 let tx = EndOfEpochTransactionKind::new_authenticator_state_expire(
4490 min_epoch,
4491 authenticator_obj_initial_shared_version,
4492 );
4493
4494 info!(?min_epoch, "Creating AuthenticatorStateExpire tx",);
4495
4496 tx
4497 } else {
4498 let tx = EndOfEpochTransactionKind::new_authenticator_state_create();
4499 info!("Creating AuthenticatorStateCreate tx");
4500 tx
4501 };
4502 Some(tx)
4503 }
4504
4505 #[instrument(level = "debug", skip_all)]
4506 fn create_bridge_tx(
4507 &self,
4508 epoch_store: &Arc<AuthorityPerEpochStore>,
4509 ) -> Option<EndOfEpochTransactionKind> {
4510 if !epoch_store.protocol_config().enable_bridge() {
4511 info!("bridge not enabled");
4512 return None;
4513 }
4514 if epoch_store.bridge_exists() {
4515 return None;
4516 }
4517 let tx = EndOfEpochTransactionKind::new_bridge_create(epoch_store.get_chain_identifier());
4518 info!("Creating Bridge Create tx");
4519 Some(tx)
4520 }
4521
4522 #[instrument(level = "debug", skip_all)]
4523 fn init_bridge_committee_tx(
4524 &self,
4525 epoch_store: &Arc<AuthorityPerEpochStore>,
4526 ) -> Option<EndOfEpochTransactionKind> {
4527 if !epoch_store.protocol_config().enable_bridge() {
4528 info!("bridge not enabled");
4529 return None;
4530 }
4531 if !epoch_store
4532 .protocol_config()
4533 .should_try_to_finalize_bridge_committee()
4534 {
4535 info!("should not try to finalize bridge committee yet");
4536 return None;
4537 }
4538 if !epoch_store.bridge_exists() {
4540 return None;
4541 }
4542
4543 if epoch_store.bridge_committee_initiated() {
4544 return None;
4545 }
4546
4547 let bridge_initial_shared_version = epoch_store
4548 .epoch_start_config()
4549 .bridge_obj_initial_shared_version()
4550 .expect("initial version must exist");
4551 let tx = EndOfEpochTransactionKind::init_bridge_committee(bridge_initial_shared_version);
4552 info!("Init Bridge committee tx");
4553 Some(tx)
4554 }
4555
4556 #[instrument(level = "error", skip_all)]
4569 pub async fn create_and_execute_advance_epoch_tx(
4570 &self,
4571 epoch_store: &Arc<AuthorityPerEpochStore>,
4572 gas_cost_summary: &GasCostSummary,
4573 checkpoint: CheckpointSequenceNumber,
4574 epoch_start_timestamp_ms: CheckpointTimestamp,
4575 ) -> anyhow::Result<(
4576 IotaSystemState,
4577 Option<SystemEpochInfoEvent>,
4578 TransactionEffects,
4579 )> {
4580 let mut txns = Vec::new();
4581
4582 if let Some(tx) = self.create_authenticator_state_tx(epoch_store) {
4583 txns.push(tx);
4584 }
4585 if let Some(tx) = self.create_bridge_tx(epoch_store) {
4586 txns.push(tx);
4587 }
4588 if let Some(tx) = self.init_bridge_committee_tx(epoch_store) {
4589 txns.push(tx);
4590 }
4591
4592 let next_epoch = epoch_store.epoch() + 1;
4593
4594 let buffer_stake_bps = epoch_store.get_effective_buffer_stake_bps();
4595
4596 let (next_epoch_protocol_version, next_epoch_system_packages) =
4597 Self::choose_protocol_version_and_system_packages_v1(
4598 epoch_store.protocol_version(),
4599 epoch_store.committee(),
4600 epoch_store
4601 .get_capabilities_v1()
4602 .expect("read capabilities from db cannot fail"),
4603 buffer_stake_bps,
4604 );
4605
4606 let config = epoch_store.protocol_config();
4610 let binary_config = to_binary_config(config);
4611 let Some(next_epoch_system_package_bytes) = self
4612 .get_system_package_bytes(next_epoch_system_packages.clone(), &binary_config)
4613 .await
4614 else {
4615 error!(
4616 "upgraded system packages {:?} are not locally available, cannot create \
4617 ChangeEpochTx. validator binary must be upgraded to the correct version!",
4618 next_epoch_system_packages
4619 );
4620 return Err(anyhow!(
4630 "missing system packages: cannot form ChangeEpochTx"
4631 ));
4632 };
4633
4634 if config.protocol_defined_base_fee()
4637 && config.max_committee_members_count_as_option().is_some()
4638 {
4639 txns.push(EndOfEpochTransactionKind::new_change_epoch_v2(
4640 next_epoch,
4641 next_epoch_protocol_version,
4642 gas_cost_summary.storage_cost,
4643 gas_cost_summary.computation_cost,
4644 gas_cost_summary.computation_cost_burned,
4645 gas_cost_summary.storage_rebate,
4646 gas_cost_summary.non_refundable_storage_fee,
4647 epoch_start_timestamp_ms,
4648 next_epoch_system_package_bytes,
4649 ));
4650 } else {
4651 txns.push(EndOfEpochTransactionKind::new_change_epoch(
4652 next_epoch,
4653 next_epoch_protocol_version,
4654 gas_cost_summary.storage_cost,
4655 gas_cost_summary.computation_cost,
4656 gas_cost_summary.storage_rebate,
4657 gas_cost_summary.non_refundable_storage_fee,
4658 epoch_start_timestamp_ms,
4659 next_epoch_system_package_bytes,
4660 ));
4661 }
4662
4663 let tx = VerifiedTransaction::new_end_of_epoch_transaction(txns);
4664
4665 let executable_tx = VerifiedExecutableTransaction::new_from_checkpoint(
4666 tx.clone(),
4667 epoch_store.epoch(),
4668 checkpoint,
4669 );
4670
4671 let tx_digest = executable_tx.digest();
4672
4673 info!(
4674 ?next_epoch,
4675 ?next_epoch_protocol_version,
4676 ?next_epoch_system_packages,
4677 computation_cost=?gas_cost_summary.computation_cost,
4678 computation_cost_burned=?gas_cost_summary.computation_cost_burned,
4679 storage_cost=?gas_cost_summary.storage_cost,
4680 storage_rebate=?gas_cost_summary.storage_rebate,
4681 non_refundable_storage_fee=?gas_cost_summary.non_refundable_storage_fee,
4682 ?tx_digest,
4683 "Creating advance epoch transaction"
4684 );
4685
4686 fail_point_async!("change_epoch_tx_delay");
4687 let tx_lock = epoch_store.acquire_tx_lock(tx_digest).await;
4688
4689 if self
4693 .get_transaction_cache_reader()
4694 .is_tx_already_executed(tx_digest)
4695 .expect("read cannot fail")
4696 {
4697 warn!("change epoch tx has already been executed via state sync");
4698 return Err(anyhow::anyhow!(
4699 "change epoch tx has already been executed via state sync",
4700 ));
4701 }
4702
4703 let execution_guard = self
4704 .execution_lock_for_executable_transaction(&executable_tx)
4705 .await?;
4706
4707 epoch_store
4711 .assign_shared_object_versions_idempotent(
4712 self.get_object_cache_reader().as_ref(),
4713 &[executable_tx.clone()],
4714 )
4715 .await?;
4716
4717 let input_objects =
4718 self.read_objects_for_execution(&tx_lock, &executable_tx, epoch_store)?;
4719
4720 let (temporary_store, effects, _execution_error_opt) =
4721 self.prepare_certificate(&execution_guard, &executable_tx, input_objects, epoch_store)?;
4722 let system_obj = get_iota_system_state(&temporary_store.written)
4723 .expect("change epoch tx must write to system object");
4724 let system_epoch_info_event = temporary_store
4726 .events
4727 .data
4728 .into_iter()
4729 .find(|event| event.is_system_epoch_info_event())
4730 .map(SystemEpochInfoEvent::from);
4731 assert!(system_epoch_info_event.is_some() || system_obj.safe_mode());
4734
4735 self.get_state_sync_store()
4739 .insert_transaction_and_effects(&tx, &effects)
4740 .map_err(|err| {
4741 let err: anyhow::Error = err.into();
4742 err
4743 })?;
4744
4745 info!(
4746 "Effects summary of the change epoch transaction: {:?}",
4747 effects.summary_for_debug()
4748 );
4749 epoch_store.record_checkpoint_builder_is_safe_mode_metric(system_obj.safe_mode());
4750 assert!(effects.status().is_ok());
4752 Ok((system_obj, system_epoch_info_event, effects))
4753 }
4754
4755 #[instrument(level = "error", skip_all)]
4759 async fn revert_uncommitted_epoch_transactions(
4760 &self,
4761 epoch_store: &AuthorityPerEpochStore,
4762 ) -> IotaResult {
4763 {
4764 let state = epoch_store.get_reconfig_state_write_lock_guard();
4765 if state.should_accept_user_certs() {
4766 epoch_store.close_user_certs(state);
4775 }
4776 }
4778 let pending_certificates = epoch_store.pending_consensus_certificates();
4779 info!(
4780 "Reverting {} locally executed transactions that was not included in the epoch: {:?}",
4781 pending_certificates.len(),
4782 pending_certificates,
4783 );
4784 for digest in pending_certificates {
4785 if epoch_store.is_transaction_executed_in_checkpoint(&digest)? {
4786 info!(
4787 "Not reverting pending consensus transaction {:?} - it was included in checkpoint",
4788 digest
4789 );
4790 continue;
4791 }
4792 info!("Reverting {:?} at the end of epoch", digest);
4793 epoch_store.revert_executed_transaction(&digest)?;
4794 self.get_reconfig_api().revert_state_update(&digest)?;
4795 }
4796 info!("All uncommitted local transactions reverted");
4797 Ok(())
4798 }
4799
4800 #[instrument(level = "error", skip_all)]
4801 async fn reopen_epoch_db(
4802 &self,
4803 cur_epoch_store: &AuthorityPerEpochStore,
4804 new_committee: Committee,
4805 epoch_start_configuration: EpochStartConfiguration,
4806 expensive_safety_check_config: &ExpensiveSafetyCheckConfig,
4807 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
4808 let new_epoch = new_committee.epoch;
4809 info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch");
4810 assert_eq!(
4811 epoch_start_configuration.epoch_start_state().epoch(),
4812 new_committee.epoch
4813 );
4814 fail_point!("before-open-new-epoch-store");
4815 let new_epoch_store = cur_epoch_store.new_at_next_epoch(
4816 self.name,
4817 new_committee,
4818 epoch_start_configuration,
4819 self.get_backing_package_store().clone(),
4820 self.get_object_store().clone(),
4821 expensive_safety_check_config,
4822 cur_epoch_store.get_chain_identifier(),
4823 );
4824 self.epoch_store.store(new_epoch_store.clone());
4825 Ok(new_epoch_store)
4826 }
4827
4828 #[cfg(test)]
4829 pub(crate) fn iter_live_object_set_for_testing(
4830 &self,
4831 ) -> impl Iterator<Item = authority_store_tables::LiveObject> + '_ {
4832 self.get_accumulator_store().iter_live_object_set()
4833 }
4834
4835 #[cfg(test)]
4836 pub(crate) fn shutdown_execution_for_test(&self) {
4837 self.tx_execution_shutdown
4838 .lock()
4839 .take()
4840 .unwrap()
4841 .send(())
4842 .unwrap();
4843 }
4844
4845 pub async fn insert_objects_unsafe_for_testing_only(&self, objects: &[Object]) -> IotaResult {
4848 self.get_reconfig_api()
4849 .bulk_insert_genesis_objects(objects)?;
4850 self.get_object_cache_reader()
4851 .force_reload_system_packages(&BuiltInFramework::all_package_ids());
4852 Ok(())
4853 }
4854}
4855
4856pub struct RandomnessRoundReceiver {
4857 authority_state: Arc<AuthorityState>,
4858 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
4859}
4860
4861impl RandomnessRoundReceiver {
4862 pub fn spawn(
4863 authority_state: Arc<AuthorityState>,
4864 randomness_rx: mpsc::Receiver<(EpochId, RandomnessRound, Vec<u8>)>,
4865 ) -> JoinHandle<()> {
4866 let rrr = RandomnessRoundReceiver {
4867 authority_state,
4868 randomness_rx,
4869 };
4870 spawn_monitored_task!(rrr.run())
4871 }
4872
4873 async fn run(mut self) {
4874 info!("RandomnessRoundReceiver event loop started");
4875
4876 loop {
4877 tokio::select! {
4878 maybe_recv = self.randomness_rx.recv() => {
4879 if let Some((epoch, round, bytes)) = maybe_recv {
4880 self.handle_new_randomness(epoch, round, bytes);
4881 } else {
4882 break;
4883 }
4884 },
4885 }
4886 }
4887
4888 info!("RandomnessRoundReceiver event loop ended");
4889 }
4890
4891 #[instrument(level = "debug", skip_all, fields(?epoch, ?round))]
4892 fn handle_new_randomness(&self, epoch: EpochId, round: RandomnessRound, bytes: Vec<u8>) {
4893 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
4894 if epoch_store.epoch() != epoch {
4895 warn!(
4896 "dropping randomness for epoch {epoch}, round {round}, because we are in epoch {}",
4897 epoch_store.epoch()
4898 );
4899 return;
4900 }
4901 let transaction = VerifiedTransaction::new_randomness_state_update(
4902 epoch,
4903 round,
4904 bytes,
4905 epoch_store
4906 .epoch_start_config()
4907 .randomness_obj_initial_shared_version(),
4908 );
4909 debug!(
4910 "created randomness state update transaction with digest: {:?}",
4911 transaction.digest()
4912 );
4913 let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch);
4914 let digest = *transaction.digest();
4915
4916 self.authority_state
4918 .transaction_manager()
4919 .enqueue(vec![transaction], &epoch_store);
4920
4921 let authority_state = self.authority_state.clone();
4922 spawn_monitored_task!(async move {
4923 const RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT: Duration = Duration::from_secs(300);
4932 let result = tokio::time::timeout(
4933 RANDOMNESS_STATE_UPDATE_EXECUTION_TIMEOUT,
4934 authority_state
4935 .get_transaction_cache_reader()
4936 .notify_read_executed_effects(&[digest]),
4937 )
4938 .await;
4939 let result = match result {
4940 Ok(result) => result,
4941 Err(_) => {
4942 if cfg!(debug_assertions) {
4943 panic!(
4945 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
4946 );
4947 }
4948 warn!(
4949 "randomness state update transaction execution timed out at epoch {epoch}, round {round}"
4950 );
4951 authority_state
4953 .get_transaction_cache_reader()
4954 .notify_read_executed_effects(&[digest])
4955 .await
4956 }
4957 };
4958
4959 let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}"));
4960 let effects = effects.pop().expect("should return effects");
4961 if *effects.status() != ExecutionStatus::Success {
4962 panic!(
4963 "failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"
4964 );
4965 }
4966 debug!(
4967 "successfully executed randomness state update transaction at epoch {epoch}, round {round}"
4968 );
4969 });
4970 }
4971}
4972
4973#[async_trait]
4974impl TransactionKeyValueStoreTrait for AuthorityState {
4975 async fn multi_get(
4976 &self,
4977 transaction_keys: &[TransactionDigest],
4978 effects_keys: &[TransactionDigest],
4979 ) -> IotaResult<KVStoreTransactionData> {
4980 let txns = if !transaction_keys.is_empty() {
4981 self.get_transaction_cache_reader()
4982 .multi_get_transaction_blocks(transaction_keys)?
4983 .into_iter()
4984 .map(|t| t.map(|t| (*t).clone().into_inner()))
4985 .collect()
4986 } else {
4987 vec![]
4988 };
4989
4990 let fx = if !effects_keys.is_empty() {
4991 self.get_transaction_cache_reader()
4992 .multi_get_executed_effects(effects_keys)?
4993 } else {
4994 vec![]
4995 };
4996
4997 Ok((txns, fx))
4998 }
4999
5000 async fn multi_get_checkpoints(
5001 &self,
5002 checkpoint_summaries: &[CheckpointSequenceNumber],
5003 checkpoint_contents: &[CheckpointSequenceNumber],
5004 checkpoint_summaries_by_digest: &[CheckpointDigest],
5005 ) -> IotaResult<(
5006 Vec<Option<CertifiedCheckpointSummary>>,
5007 Vec<Option<CheckpointContents>>,
5008 Vec<Option<CertifiedCheckpointSummary>>,
5009 )> {
5010 let mut summaries = Vec::with_capacity(checkpoint_summaries.len());
5012 let store = self.get_checkpoint_store();
5013 for seq in checkpoint_summaries {
5014 let checkpoint = store
5015 .get_checkpoint_by_sequence_number(*seq)?
5016 .map(|c| c.into_inner());
5017
5018 summaries.push(checkpoint);
5019 }
5020
5021 let mut contents = Vec::with_capacity(checkpoint_contents.len());
5022 for seq in checkpoint_contents {
5023 let checkpoint = store
5024 .get_checkpoint_by_sequence_number(*seq)?
5025 .and_then(|summary| {
5026 store
5027 .get_checkpoint_contents(&summary.content_digest)
5028 .expect("db read cannot fail")
5029 });
5030 contents.push(checkpoint);
5031 }
5032
5033 let mut summaries_by_digest = Vec::with_capacity(checkpoint_summaries_by_digest.len());
5034 for digest in checkpoint_summaries_by_digest {
5035 let checkpoint = store
5036 .get_checkpoint_by_digest(digest)?
5037 .map(|c| c.into_inner());
5038 summaries_by_digest.push(checkpoint);
5039 }
5040
5041 Ok((summaries, contents, summaries_by_digest))
5042 }
5043
5044 async fn get_transaction_perpetual_checkpoint(
5045 &self,
5046 digest: TransactionDigest,
5047 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
5048 self.get_checkpoint_cache()
5049 .get_transaction_perpetual_checkpoint(&digest)
5050 .map(|res| res.map(|(_epoch, checkpoint)| checkpoint))
5051 }
5052
5053 async fn get_object(
5054 &self,
5055 object_id: ObjectID,
5056 version: VersionNumber,
5057 ) -> IotaResult<Option<Object>> {
5058 self.get_object_cache_reader()
5059 .get_object_by_key(&object_id, version)
5060 }
5061
5062 async fn multi_get_transactions_perpetual_checkpoints(
5063 &self,
5064 digests: &[TransactionDigest],
5065 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
5066 let res = self
5067 .get_checkpoint_cache()
5068 .multi_get_transactions_perpetual_checkpoints(digests)?;
5069
5070 Ok(res
5071 .into_iter()
5072 .map(|maybe| maybe.map(|(_epoch, checkpoint)| checkpoint))
5073 .collect())
5074 }
5075
5076 #[instrument(skip(self))]
5077 async fn multi_get_events_by_tx_digests(
5078 &self,
5079 digests: &[TransactionDigest],
5080 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
5081 if digests.is_empty() {
5082 return Ok(vec![]);
5083 }
5084 let events_digests: Vec<_> = self
5085 .get_transaction_cache_reader()
5086 .multi_get_executed_effects(digests)?
5087 .into_iter()
5088 .map(|t| t.and_then(|t| t.events_digest().cloned()))
5089 .collect();
5090 let non_empty_events: Vec<_> = events_digests.iter().filter_map(|e| *e).collect();
5091 let mut events = self
5092 .get_transaction_cache_reader()
5093 .multi_get_events(&non_empty_events)?
5094 .into_iter();
5095 Ok(events_digests
5096 .into_iter()
5097 .map(|ev| ev.and_then(|_| events.next()?))
5098 .collect())
5099 }
5100}
5101
5102#[cfg(msim)]
5103pub mod framework_injection {
5104 use std::{
5105 cell::RefCell,
5106 collections::{BTreeMap, BTreeSet},
5107 };
5108
5109 use iota_framework::{BuiltInFramework, SystemPackage};
5110 use iota_types::{
5111 base_types::{AuthorityName, ObjectID},
5112 is_system_package,
5113 };
5114 use move_binary_format::CompiledModule;
5115
5116 type FrameworkOverrideConfig = BTreeMap<ObjectID, PackageOverrideConfig>;
5117
5118 thread_local! {
5120 static OVERRIDE: RefCell<FrameworkOverrideConfig> = RefCell::new(FrameworkOverrideConfig::default());
5121 }
5122
5123 type Framework = Vec<CompiledModule>;
5124
5125 pub type PackageUpgradeCallback =
5126 Box<dyn Fn(AuthorityName) -> Option<Framework> + Send + Sync + 'static>;
5127
5128 enum PackageOverrideConfig {
5129 Global(Framework),
5130 PerValidator(PackageUpgradeCallback),
5131 }
5132
5133 fn compiled_modules_to_bytes(modules: &[CompiledModule]) -> Vec<Vec<u8>> {
5134 modules
5135 .iter()
5136 .map(|m| {
5137 let mut buf = Vec::new();
5138 m.serialize_with_version(m.version, &mut buf).unwrap();
5139 buf
5140 })
5141 .collect()
5142 }
5143
5144 pub fn set_override(package_id: ObjectID, modules: Vec<CompiledModule>) {
5145 OVERRIDE.with(|bs| {
5146 bs.borrow_mut()
5147 .insert(package_id, PackageOverrideConfig::Global(modules))
5148 });
5149 }
5150
5151 pub fn set_override_cb(package_id: ObjectID, func: PackageUpgradeCallback) {
5152 OVERRIDE.with(|bs| {
5153 bs.borrow_mut()
5154 .insert(package_id, PackageOverrideConfig::PerValidator(func))
5155 });
5156 }
5157
5158 pub fn get_override_bytes(package_id: &ObjectID, name: AuthorityName) -> Option<Vec<Vec<u8>>> {
5159 OVERRIDE.with(|cfg| {
5160 cfg.borrow().get(package_id).and_then(|entry| match entry {
5161 PackageOverrideConfig::Global(framework) => {
5162 Some(compiled_modules_to_bytes(framework))
5163 }
5164 PackageOverrideConfig::PerValidator(func) => {
5165 func(name).map(|fw| compiled_modules_to_bytes(&fw))
5166 }
5167 })
5168 })
5169 }
5170
5171 pub fn get_override_modules(
5172 package_id: &ObjectID,
5173 name: AuthorityName,
5174 ) -> Option<Vec<CompiledModule>> {
5175 OVERRIDE.with(|cfg| {
5176 cfg.borrow().get(package_id).and_then(|entry| match entry {
5177 PackageOverrideConfig::Global(framework) => Some(framework.clone()),
5178 PackageOverrideConfig::PerValidator(func) => func(name),
5179 })
5180 })
5181 }
5182
5183 pub fn get_override_system_package(
5184 package_id: &ObjectID,
5185 name: AuthorityName,
5186 ) -> Option<SystemPackage> {
5187 let bytes = get_override_bytes(package_id, name)?;
5188 let dependencies = if is_system_package(*package_id) {
5189 BuiltInFramework::get_package_by_id(package_id)
5190 .dependencies
5191 .to_vec()
5192 } else {
5193 BuiltInFramework::all_package_ids()
5196 };
5197 Some(SystemPackage {
5198 id: *package_id,
5199 bytes,
5200 dependencies,
5201 })
5202 }
5203
5204 pub fn get_extra_packages(name: AuthorityName) -> Vec<SystemPackage> {
5205 let built_in = BTreeSet::from_iter(BuiltInFramework::all_package_ids());
5206 let extra: Vec<ObjectID> = OVERRIDE.with(|cfg| {
5207 cfg.borrow()
5208 .keys()
5209 .filter_map(|package| (!built_in.contains(package)).then_some(*package))
5210 .collect()
5211 });
5212
5213 extra
5214 .into_iter()
5215 .map(|package| SystemPackage {
5216 id: package,
5217 bytes: get_override_bytes(&package, name).unwrap(),
5218 dependencies: BuiltInFramework::all_package_ids(),
5219 })
5220 .collect()
5221 }
5222}
5223
5224#[derive(Debug, Serialize, Deserialize, Clone)]
5225pub struct ObjDumpFormat {
5226 pub id: ObjectID,
5227 pub version: VersionNumber,
5228 pub digest: ObjectDigest,
5229 pub object: Object,
5230}
5231
5232impl ObjDumpFormat {
5233 fn new(object: Object) -> Self {
5234 let oref = object.compute_object_reference();
5235 Self {
5236 id: oref.0,
5237 version: oref.1,
5238 digest: oref.2,
5239 object,
5240 }
5241 }
5242}
5243
5244#[derive(Debug, Serialize, Deserialize, Clone)]
5245pub struct NodeStateDump {
5246 pub tx_digest: TransactionDigest,
5247 pub sender_signed_data: SenderSignedData,
5248 pub executed_epoch: u64,
5249 pub reference_gas_price: u64,
5250 pub protocol_version: u64,
5251 pub epoch_start_timestamp_ms: u64,
5252 pub computed_effects: TransactionEffects,
5253 pub expected_effects_digest: TransactionEffectsDigest,
5254 pub relevant_system_packages: Vec<ObjDumpFormat>,
5255 pub shared_objects: Vec<ObjDumpFormat>,
5256 pub loaded_child_objects: Vec<ObjDumpFormat>,
5257 pub modified_at_versions: Vec<ObjDumpFormat>,
5258 pub runtime_reads: Vec<ObjDumpFormat>,
5259 pub input_objects: Vec<ObjDumpFormat>,
5260}
5261
5262impl NodeStateDump {
5263 pub fn new(
5264 tx_digest: &TransactionDigest,
5265 effects: &TransactionEffects,
5266 expected_effects_digest: TransactionEffectsDigest,
5267 object_store: &dyn ObjectStore,
5268 epoch_store: &Arc<AuthorityPerEpochStore>,
5269 inner_temporary_store: &InnerTemporaryStore,
5270 certificate: &VerifiedExecutableTransaction,
5271 ) -> IotaResult<Self> {
5272 let executed_epoch = epoch_store.epoch();
5274 let reference_gas_price = epoch_store.reference_gas_price();
5275 let epoch_start_config = epoch_store.epoch_start_config();
5276 let protocol_version = epoch_store.protocol_version().as_u64();
5277 let epoch_start_timestamp_ms = epoch_start_config.epoch_data().epoch_start_timestamp();
5278
5279 let mut relevant_system_packages = Vec::new();
5281 for sys_package_id in BuiltInFramework::all_package_ids() {
5282 if let Some(w) = object_store.get_object(&sys_package_id)? {
5283 relevant_system_packages.push(ObjDumpFormat::new(w))
5284 }
5285 }
5286
5287 let mut shared_objects = Vec::new();
5289 for kind in effects.input_shared_objects() {
5290 match kind {
5291 InputSharedObject::Mutate(obj_ref) | InputSharedObject::ReadOnly(obj_ref) => {
5292 if let Some(w) = object_store.get_object_by_key(&obj_ref.0, obj_ref.1)? {
5293 shared_objects.push(ObjDumpFormat::new(w))
5294 }
5295 }
5296 InputSharedObject::ReadDeleted(..)
5297 | InputSharedObject::MutateDeleted(..)
5298 | InputSharedObject::Cancelled(..) => (), }
5301 }
5302
5303 let mut loaded_child_objects = Vec::new();
5306 for (id, meta) in &inner_temporary_store.loaded_runtime_objects {
5307 if let Some(w) = object_store.get_object_by_key(id, meta.version)? {
5308 loaded_child_objects.push(ObjDumpFormat::new(w))
5309 }
5310 }
5311
5312 let mut modified_at_versions = Vec::new();
5314 for (id, ver) in effects.modified_at_versions() {
5315 if let Some(w) = object_store.get_object_by_key(&id, ver)? {
5316 modified_at_versions.push(ObjDumpFormat::new(w))
5317 }
5318 }
5319
5320 let mut runtime_reads = Vec::new();
5324 for obj in inner_temporary_store
5325 .runtime_packages_loaded_from_db
5326 .values()
5327 {
5328 runtime_reads.push(ObjDumpFormat::new(obj.object().clone()));
5329 }
5330
5331 Ok(Self {
5334 tx_digest: *tx_digest,
5335 executed_epoch,
5336 reference_gas_price,
5337 epoch_start_timestamp_ms,
5338 protocol_version,
5339 relevant_system_packages,
5340 shared_objects,
5341 loaded_child_objects,
5342 modified_at_versions,
5343 runtime_reads,
5344 sender_signed_data: certificate.clone().into_message(),
5345 input_objects: inner_temporary_store
5346 .input_objects
5347 .values()
5348 .map(|o| ObjDumpFormat::new(o.clone()))
5349 .collect(),
5350 computed_effects: effects.clone(),
5351 expected_effects_digest,
5352 })
5353 }
5354
5355 pub fn all_objects(&self) -> Vec<ObjDumpFormat> {
5356 let mut objects = Vec::new();
5357 objects.extend(self.relevant_system_packages.clone());
5358 objects.extend(self.shared_objects.clone());
5359 objects.extend(self.loaded_child_objects.clone());
5360 objects.extend(self.modified_at_versions.clone());
5361 objects.extend(self.runtime_reads.clone());
5362 objects.extend(self.input_objects.clone());
5363 objects
5364 }
5365
5366 pub fn write_to_file(&self, path: &Path) -> Result<PathBuf, anyhow::Error> {
5367 let file_name = format!(
5368 "{}_{}_NODE_DUMP.json",
5369 self.tx_digest,
5370 AuthorityState::unixtime_now_ms()
5371 );
5372 let mut path = path.to_path_buf();
5373 path.push(&file_name);
5374 let mut file = File::create(path.clone())?;
5375 file.write_all(serde_json::to_string_pretty(self)?.as_bytes())?;
5376 Ok(path)
5377 }
5378
5379 pub fn read_from_file(path: &PathBuf) -> Result<Self, anyhow::Error> {
5380 let file = File::open(path)?;
5381 serde_json::from_reader(file).map_err(|e| anyhow::anyhow!(e))
5382 }
5383}