1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::HashMap,
9 fmt,
10 future::Future,
11 path::PathBuf,
12 sync::{Arc, Weak},
13 time::Duration,
14};
15
16use anemo::Network;
17use anemo_tower::{
18 callback::CallbackLayer,
19 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
20};
21use anyhow::{Result, anyhow};
22use arc_swap::ArcSwap;
23use futures::future::BoxFuture;
24pub use handle::IotaNodeHandle;
25use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
26use iota_common::debug_fatal;
27use iota_config::{
28 ConsensusConfig, NodeConfig,
29 node::{DBCheckpointConfig, RunWithRange},
30 node_config_metrics::NodeConfigMetrics,
31 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
32};
33use iota_core::{
34 authority::{
35 AuthorityState, AuthorityStore, RandomnessRoundReceiver,
36 authority_per_epoch_store::AuthorityPerEpochStore,
37 authority_store_pruner::ObjectsCompactionFilter,
38 authority_store_tables::{
39 AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
40 },
41 backpressure::BackpressureManager,
42 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
43 },
44 authority_aggregator::{
45 AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
46 },
47 authority_client::NetworkAuthorityClient,
48 authority_server::{
49 ValidatorService, ValidatorServiceMetrics, soft_lock::PreConsensusSoftLocks,
50 },
51 checkpoint_progress_tracker::CheckpointProgressTracker,
52 checkpoints::{
53 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
54 SubmitCheckpointToConsensus,
55 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
56 },
57 connection_monitor::ConnectionMonitor,
58 consensus_adapter::{
59 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
60 ConsensusClient,
61 },
62 consensus_handler::ConsensusHandlerInitializer,
63 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
64 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
65 db_checkpoint_handler::DBCheckpointHandler,
66 epoch::{
67 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
68 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
69 reconfiguration::ReconfigurationInitiator,
70 },
71 execution_cache::build_execution_cache,
72 global_state_hasher::{GlobalStateHashMetrics, GlobalStateHasher},
73 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
74 jsonrpc_index::IndexStore,
75 module_cache_metrics::ResolverMetrics,
76 overload_monitor::{consensus_queue_overload_monitor, overload_monitor},
77 safe_client::SafeClientMetricsBase,
78 signature_verifier::SignatureVerifierMetrics,
79 storage::{GrpcReadStore, RocksDbStore},
80 transaction_orchestrator::TransactionOrchestrator,
81 validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87 transaction_builder_api::TransactionBuilderApi,
88 transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93 RegistryID, RegistryService,
94 hardware_metrics::register_hardware_metrics,
95 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96 server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100 api::{ValidatorPeerServer, ValidatorServer, ValidatorV2Server},
101 discovery,
102 discovery::TrustedPeerChangeEvent,
103 randomness, state_sync,
104};
105use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
106use iota_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
107use iota_sdk_types::{
108 RandomnessRound,
109 crypto::{Intent, IntentMessage, IntentScope},
110};
111use iota_snapshot::{
112 reader::{StateSnapshotReaderV1, latest_available_epoch},
113 uploader::StateSnapshotUploader,
114};
115use iota_storage::{
116 FileCompression, StorageFormat,
117 http_key_value_store::HttpKVStore,
118 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
119 key_value_store_metrics::KeyValueStoreMetrics,
120};
121use iota_types::{
122 base_types::{AuthorityName, ConciseableName, EpochId},
123 committee::Committee,
124 crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits},
125 digests::{ChainIdentifier, get_devnet_chain_identifier},
126 error::{IotaError, IotaResult},
127 executable_transaction::VerifiedExecutableTransaction,
128 execution_config_utils::to_binary_config,
129 full_checkpoint_content::CheckpointData,
130 iota_system_state::{
131 IotaSystemState, IotaSystemStateTrait,
132 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
133 },
134 messages_consensus::{
135 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
136 SignedAuthorityCapabilitiesV1,
137 },
138 messages_grpc::HandleCapabilityNotificationRequestV1,
139 quorum_driver_types::QuorumDriverEffectsQueueResult,
140 supported_protocol_versions::SupportedProtocolVersions,
141 transaction::{Transaction, VerifiedCertificate},
142};
143use prometheus_filtered::Registry;
144#[cfg(msim)]
145use simulator::*;
146use tap::tap::TapFallible;
147use tokio::{
148 sync::{Mutex, broadcast, mpsc, watch},
149 task::{JoinHandle, JoinSet},
150};
151use tokio_util::sync::CancellationToken;
152use tower::ServiceBuilder;
153use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
154use typed_store::{
155 DBMetrics,
156 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
157};
158
159use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
160
161pub mod admin;
162mod handle;
163pub mod metrics;
164
165pub struct ValidatorComponents {
166 validator_server_handle: SpawnOnce,
167 validator_overload_monitor_handle: Option<JoinHandle<()>>,
168 consensus_queue_overload_monitor_handle: Option<JoinHandle<()>>,
173 soft_lock_sweep_handle: JoinHandle<()>,
176 overload_notifier_handle: Option<JoinHandle<()>>,
177 consensus_manager: Arc<ConsensusManager>,
178 consensus_store_pruner: ConsensusStorePruner,
179 consensus_adapter: Arc<ConsensusAdapter>,
180 soft_locks: Arc<PreConsensusSoftLocks>,
181 checkpoint_service_tasks: JoinSet<()>,
183 checkpoint_metrics: Arc<CheckpointMetrics>,
184 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
185 validator_registry_id: RegistryID,
186}
187
188#[cfg(msim)]
189mod simulator {
190 use std::sync::atomic::AtomicBool;
191
192 pub(super) struct SimState {
193 pub sim_node: iota_simulator::runtime::NodeHandle,
194 pub sim_safe_mode_expected: AtomicBool,
195 _leak_detector: iota_simulator::NodeLeakDetector,
196 }
197
198 impl Default for SimState {
199 fn default() -> Self {
200 Self {
201 sim_node: iota_simulator::runtime::NodeHandle::current(),
202 sim_safe_mode_expected: AtomicBool::new(false),
203 _leak_detector: iota_simulator::NodeLeakDetector::new(),
204 }
205 }
206 }
207}
208
209#[derive(Clone)]
210pub struct ServerVersion {
211 pub bin: &'static str,
212 pub version: &'static str,
213}
214
215impl ServerVersion {
216 pub fn new(bin: &'static str, version: &'static str) -> Self {
217 Self { bin, version }
218 }
219}
220
221impl std::fmt::Display for ServerVersion {
222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 f.write_str(self.bin)?;
224 f.write_str("/")?;
225 f.write_str(self.version)
226 }
227}
228
229pub struct IotaNode {
230 config: NodeConfig,
231 validator_components: Mutex<Option<ValidatorComponents>>,
232 _http_server: Option<iota_http::ServerHandle>,
234 state: Arc<AuthorityState>,
235 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
236 registry_service: RegistryService,
237 metrics: Arc<IotaNodeMetrics>,
238
239 _discovery: discovery::Handle,
240 state_sync_handle: state_sync::Handle,
241 randomness_handle: randomness::Handle,
242 checkpoint_store: Arc<CheckpointStore>,
243 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
244 connection_monitor_status: Arc<ConnectionMonitorStatus>,
245
246 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
248
249 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
252
253 backpressure_manager: Arc<BackpressureManager>,
254
255 checkpoint_progress_tracker: Arc<CheckpointProgressTracker>,
256
257 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
258
259 #[cfg(msim)]
260 sim_state: SimState,
261
262 _state_archive_handle: Option<broadcast::Sender<()>>,
263
264 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
265 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
267
268 grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
270
271 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
277}
278
279impl fmt::Debug for IotaNode {
280 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
281 f.debug_struct("IotaNode")
282 .field("name", &self.state.name.concise())
283 .finish()
284 }
285}
286
287impl IotaNode {
288 pub async fn start(
289 config: NodeConfig,
290 registry_service: RegistryService,
291 ) -> Result<Arc<IotaNode>> {
292 Self::start_async(
293 config,
294 registry_service,
295 ServerVersion::new("iota-node", "unknown"),
296 )
297 .await
298 }
299
300 fn start_overload_notifier(
305 config: &NodeConfig,
306 state: Arc<AuthorityState>,
307 epoch_store: Arc<AuthorityPerEpochStore>,
308 consensus_adapter: Arc<ConsensusAdapter>,
309 ) -> Option<JoinHandle<()>> {
310 if !epoch_store.protocol_config().enable_pcool_flow() {
311 return None;
312 }
313
314 let poll_interval = config.authority_overload_config.overload_monitor_interval;
315 let authority_name = state.name;
316
317 Some(spawn_monitored_task!(async move {
318 let mut last_notified_percentage: u32 = epoch_store
320 .load_overload_notification(&authority_name)
321 .unwrap_or(0) as u32;
322 loop {
323 tokio::time::sleep(poll_interval).await;
324 let current = state
325 .overload_info
326 .local_load_shedding_percentage
327 .load(std::sync::atomic::Ordering::Relaxed);
328 if current != last_notified_percentage {
329 last_notified_percentage = current;
330 let transaction = ConsensusTransaction::new_overload_notification_v1(
331 authority_name,
332 current as u8,
333 );
334 if let Err(e) = consensus_adapter.submit(transaction, None, &epoch_store) {
335 tracing::warn!(
336 "Failed to submit overload notification to consensus: {:?}",
337 e
338 );
339 }
340 }
341 }
342 }))
343 }
344
345 pub async fn start_async(
346 config: NodeConfig,
347 registry_service: RegistryService,
348 server_version: ServerVersion,
349 ) -> Result<Arc<IotaNode>> {
350 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
351 let mut config = config.clone();
352 if config.supported_protocol_versions.is_none() {
353 info!(
354 "populating config.supported_protocol_versions with default {:?}",
355 SupportedProtocolVersions::SYSTEM_DEFAULT
356 );
357 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
358 }
359
360 let run_with_range = config.run_with_range;
361 let is_validator = config.consensus_config().is_some();
362 let is_full_node = !is_validator;
363 let prometheus_registry = registry_service.default_registry();
364
365 info!(node =? config.authority_public_key(),
366 "Initializing iota-node listening on {}", config.network_address
367 );
368
369 let genesis = config.genesis()?.clone();
370
371 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
372 info!("IOTA chain identifier: {chain_identifier}");
373
374 let db_corrupted_path = &config.db_path().join("status");
376 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
377 panic!("Failed to check database corruption: {err}");
378 }
379
380 DBMetrics::init(&prometheus_registry);
382
383 iota_metrics::init_metrics(&prometheus_registry);
385 #[cfg(not(msim))]
388 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
389
390 register_hardware_metrics(®istry_service, &config.db_path)
392 .expect("Failed registering hardware metrics");
393 prometheus_registry
395 .register(iota_metrics::uptime_metric(
396 if is_validator {
397 "validator"
398 } else {
399 "fullnode"
400 },
401 server_version.version,
402 &chain_identifier.to_string(),
403 ))
404 .expect("Failed registering uptime metric");
405
406 let migration_tx_data = if genesis.contains_migrations() {
409 Some(config.load_migration_tx_data()?)
412 } else {
413 None
414 };
415
416 let secret = Arc::pin(config.authority_key_pair().copy());
417 let genesis_committee = genesis.committee()?;
418 let committee_store = Arc::new(CommitteeStore::new(
419 config.db_path().join("epochs"),
420 &genesis_committee,
421 None,
422 ));
423
424 let mut pruner_db = None;
425 if config
426 .authority_store_pruning_config
427 .enable_compaction_filter
428 {
429 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
430 &config.db_path().join("store"),
431 )));
432 }
433 let compaction_filter = pruner_db
434 .clone()
435 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
436
437 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
439 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
440 enable_write_stall,
441 compaction_filter,
442 };
443 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
444 &config.db_path().join("store"),
445 Some(perpetual_tables_options),
446 ));
447 let is_genesis = perpetual_tables
448 .database_is_empty()
449 .expect("Database read should not fail at init.");
450 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
451 let backpressure_manager =
452 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
453
454 let perpetual_tables_for_progress = perpetual_tables.clone();
455 let store = AuthorityStore::open(
456 perpetual_tables,
457 &genesis,
458 &config,
459 &prometheus_registry,
460 migration_tx_data.as_ref(),
461 )
462 .await?;
463
464 let cur_epoch = store.get_recovery_epoch_at_restart()?;
465 let committee = committee_store
466 .get_committee(&cur_epoch)?
467 .expect("Committee of the current epoch must exist");
468 let epoch_start_configuration = store
469 .get_epoch_start_configuration()?
470 .expect("EpochStartConfiguration of the current epoch must exist");
471 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
472 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
473
474 let cache_traits = build_execution_cache(
475 &config.execution_cache_config,
476 &prometheus_registry,
477 &store,
478 backpressure_manager.clone(),
479 );
480
481 let auth_agg = {
482 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
483 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
484 Arc::new(ArcSwap::new(Arc::new(
485 AuthorityAggregator::new_from_epoch_start_state(
486 epoch_start_configuration.epoch_start_state(),
487 &committee_store,
488 safe_client_metrics_base,
489 auth_agg_metrics,
490 ),
491 )))
492 };
493
494 let chain = match config.chain_override_for_testing {
495 Some(chain) => chain,
496 None => chain_identifier.chain(),
497 };
498
499 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
500 let epoch_store = AuthorityPerEpochStore::new(
501 config.authority_public_key(),
502 committee.clone(),
503 &config.db_path().join("store"),
504 Some(epoch_options.options),
505 EpochMetrics::new(®istry_service.default_registry()),
506 epoch_start_configuration,
507 cache_traits.backing_package_store.clone(),
508 cache_metrics,
509 signature_verifier_metrics,
510 &config.expensive_safety_check_config,
511 (chain_identifier, chain),
512 checkpoint_store
513 .get_highest_executed_checkpoint_seq_number()
514 .expect("checkpoint store read cannot fail")
515 .unwrap_or(0),
516 )?;
517
518 info!("created epoch store");
519
520 replay_log!(
521 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
522 epoch_store.epoch(),
523 epoch_store.protocol_config()
524 );
525
526 if is_genesis {
528 info!("checking IOTA conservation at genesis");
529 cache_traits
534 .reconfig_api
535 .try_expensive_check_iota_conservation(&epoch_store, None)
536 .expect("IOTA conservation check cannot fail at genesis");
537 }
538
539 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
540 let default_buffer_stake = epoch_store
541 .protocol_config()
542 .buffer_stake_for_protocol_upgrade_bps();
543 if effective_buffer_stake != default_buffer_stake {
544 warn!(
545 ?effective_buffer_stake,
546 ?default_buffer_stake,
547 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
548 );
549 }
550
551 checkpoint_store.insert_genesis_checkpoint(
552 genesis.checkpoint(),
553 genesis.checkpoint_contents().clone(),
554 &epoch_store,
555 );
556
557 unmark_db_corruption(db_corrupted_path)?;
559
560 info!("creating state sync store");
561 let state_sync_store = RocksDbStore::new(
562 cache_traits.clone(),
563 committee_store.clone(),
564 checkpoint_store.clone(),
565 );
566
567 let index_store = if is_full_node && config.enable_index_processing {
568 info!("creating index store");
569 Some(Arc::new(IndexStore::new(
570 config.db_path().join("indexes"),
571 &prometheus_registry,
572 epoch_store
573 .protocol_config()
574 .max_move_identifier_len_as_option(),
575 )))
576 } else {
577 None
578 };
579
580 let grpc_indexes_store = if is_full_node && config.enable_grpc_api {
581 Some(Arc::new(
582 GrpcIndexesStore::new(
583 config.db_path().join(GRPC_INDEXES_DIR),
584 Arc::clone(&store),
585 &checkpoint_store,
586 )
587 .await,
588 ))
589 } else {
590 None
591 };
592
593 Self::seed_epoch_info(&checkpoint_store, &store, &genesis, chain_identifier)
597 .await
598 .expect("failed to seed the epoch_info chain");
599
600 info!("creating archive reader");
601 let archive_readers =
606 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
607 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
608 let (randomness_tx, randomness_rx) = mpsc::channel(
609 config
610 .p2p_config
611 .randomness
612 .clone()
613 .unwrap_or_default()
614 .mailbox_capacity(),
615 );
616 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
617 Self::create_p2p_network(
618 &config,
619 state_sync_store.clone(),
620 chain_identifier,
621 trusted_peer_change_rx,
622 archive_readers.clone(),
623 randomness_tx,
624 &prometheus_registry,
625 )?;
626
627 send_trusted_peer_change(
630 &config,
631 &trusted_peer_change_tx,
632 epoch_store.epoch_start_state(),
633 );
634
635 info!("start state archival");
636 let state_archive_handle =
638 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
639 .await?;
640
641 info!("start snapshot upload");
642 let state_snapshot_handle = Self::start_state_snapshot(
644 &config,
645 &prometheus_registry,
646 checkpoint_store.clone(),
647 is_full_node,
648 )?;
649
650 let checkpoint_progress_tracker = Arc::new(CheckpointProgressTracker::new());
651
652 info!("start db checkpoint");
654 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
655 &config,
656 &prometheus_registry,
657 state_snapshot_handle.is_some(),
658 Some(checkpoint_progress_tracker.clone()),
659 )?;
660
661 let mut genesis_objects = genesis.objects().to_vec();
662 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
663 genesis_objects.extend(migration_tx_data.get_objects());
664 }
665
666 let authority_name = config.authority_public_key();
667 let validator_tx_finalizer =
668 config
669 .enable_validator_tx_finalizer
670 .then_some(Arc::new(ValidatorTxFinalizer::new(
671 auth_agg.clone(),
672 authority_name,
673 &prometheus_registry,
674 )));
675
676 info!("create authority state");
677 let state = AuthorityState::new(
678 authority_name,
679 secret,
680 config.supported_protocol_versions.unwrap(),
681 store.clone(),
682 cache_traits.clone(),
683 epoch_store.clone(),
684 committee_store.clone(),
685 index_store.clone(),
686 grpc_indexes_store,
687 checkpoint_store.clone(),
688 &prometheus_registry,
689 &genesis_objects,
690 &db_checkpoint_config,
691 config.clone(),
692 archive_readers,
693 validator_tx_finalizer,
694 chain_identifier,
695 pruner_db,
696 Some(checkpoint_progress_tracker.clone()),
697 config.policy_config.clone(),
698 config.firewall_config.clone(),
699 )
700 .await;
701
702 if epoch_store.epoch() == 0 {
704 let genesis_tx = &genesis.transaction();
705 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
706 Self::execute_transaction_immediately_at_zero_epoch(
708 &state,
709 &epoch_store,
710 genesis_tx,
711 span,
712 )
713 .await;
714
715 if let Some(migration_tx_data) = migration_tx_data {
717 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
718 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
719 Self::execute_transaction_immediately_at_zero_epoch(
720 &state,
721 &epoch_store,
722 tx,
723 span,
724 )
725 .await;
726 }
727 }
728 }
729
730 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
733
734 if config
735 .expensive_safety_check_config
736 .enable_secondary_index_checks()
737 {
738 if let Some(indexes) = state.indexes.clone() {
739 iota_core::verify_indexes::verify_indexes(
740 state.get_global_state_hash_store().as_ref(),
741 indexes,
742 )
743 .expect("secondary indexes are inconsistent");
744 }
745 }
746
747 let (end_of_epoch_channel, end_of_epoch_receiver) =
748 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
749
750 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
751 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
752 auth_agg.load_full(),
753 state.clone(),
754 end_of_epoch_receiver,
755 &config.db_path(),
756 &prometheus_registry,
757 Some(&config),
758 )))
759 } else {
760 None
761 };
762
763 let http_server = build_http_server(
764 state.clone(),
765 &transaction_orchestrator.clone(),
766 &config,
767 &prometheus_registry,
768 )
769 .await?;
770
771 let global_state_hasher = Arc::new(GlobalStateHasher::new(
772 cache_traits.global_state_hash_store.clone(),
773 GlobalStateHashMetrics::new(&prometheus_registry),
774 ));
775
776 let authority_names_to_peer_ids = epoch_store
777 .epoch_start_state()
778 .get_authority_names_to_peer_ids();
779
780 let network_connection_metrics =
781 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
782
783 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
784
785 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
786 p2p_network.downgrade(),
787 network_connection_metrics,
788 HashMap::new(),
789 None,
790 );
791
792 let connection_monitor_status = ConnectionMonitorStatus {
793 connection_statuses,
794 authority_names_to_peer_ids,
795 };
796
797 let connection_monitor_status = Arc::new(connection_monitor_status);
798 let iota_node_metrics =
799 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
800
801 iota_node_metrics
802 .binary_max_protocol_version
803 .set(ProtocolVersion::MAX.as_u64() as i64);
804 iota_node_metrics
805 .configured_max_protocol_version
806 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
807
808 let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
812 transaction_orchestrator
813 .clone()
814 .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
815
816 let grpc_server_handle = build_grpc_server(
817 &config,
818 state.clone(),
819 state_sync_store.clone(),
820 executor,
821 &prometheus_registry,
822 server_version,
823 )
824 .await?;
825
826 let validator_components = if state.is_committee_validator(&epoch_store) {
827 let (components, _) = futures::join!(
828 Self::construct_validator_components(
829 config.clone(),
830 state.clone(),
831 committee,
832 epoch_store.clone(),
833 checkpoint_store.clone(),
834 state_sync_handle.clone(),
835 randomness_handle.clone(),
836 Arc::downgrade(&global_state_hasher),
837 backpressure_manager.clone(),
838 connection_monitor_status.clone(),
839 ®istry_service,
840 ),
841 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
842 );
843 let mut components = components?;
844
845 components.consensus_adapter.submit_recovered(&epoch_store);
846
847 components.validator_server_handle = components.validator_server_handle.start().await;
849
850 Some(components)
851 } else {
852 None
853 };
854
855 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
857
858 let node = Self {
859 config,
860 validator_components: Mutex::new(validator_components),
861 _http_server: http_server,
862 state,
863 transaction_orchestrator,
864 registry_service,
865 metrics: iota_node_metrics,
866
867 _discovery: discovery_handle,
868 state_sync_handle,
869 randomness_handle,
870 checkpoint_store,
871 global_state_hasher: Mutex::new(Some(global_state_hasher)),
872 end_of_epoch_channel,
873 connection_monitor_status,
874 trusted_peer_change_tx,
875 backpressure_manager,
876 checkpoint_progress_tracker: checkpoint_progress_tracker.clone(),
877
878 _db_checkpoint_handle: db_checkpoint_handle,
879
880 #[cfg(msim)]
881 sim_state: Default::default(),
882
883 _state_archive_handle: state_archive_handle,
884 _state_snapshot_uploader_handle: state_snapshot_handle,
885 shutdown_channel_tx: shutdown_channel,
886
887 grpc_server_handle: Mutex::new(grpc_server_handle),
888
889 auth_agg,
890 };
891
892 info!("IotaNode started!");
893 let node = Arc::new(node);
894 let node_copy = node.clone();
895 spawn_monitored_task!(async move {
896 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
897 if let Err(error) = result {
898 warn!("Reconfiguration finished with error {:?}", error);
899 }
900 });
901
902 node.checkpoint_progress_tracker
903 .spawn_logging_task(node.checkpoint_store.clone(), perpetual_tables_for_progress);
904
905 Ok(node)
906 }
907
908 async fn seed_epoch_info(
923 checkpoint_store: &CheckpointStore,
924 authority_store: &AuthorityStore,
925 genesis: &iota_config::genesis::Genesis,
926 expected_chain_id: ChainIdentifier,
927 ) -> anyhow::Result<()> {
928 let chain = expected_chain_id.chain();
929 let recognized_source = formal_snapshot_read_config(expected_chain_id);
930
931 if checkpoint_store
934 .epoch_info_gap()
935 .map_err(|e| anyhow::anyhow!("checking epoch_info completeness: {e}"))?
936 .is_some()
937 {
938 if let Some(remote_store_config) = &recognized_source {
940 if let Err(e) = Self::backfill_epoch_info_from_snapshot(
941 checkpoint_store,
942 remote_store_config,
943 genesis.committee()?,
944 genesis.iota_system_object(),
945 expected_chain_id,
946 )
947 .await
948 {
949 warn!(
950 "epoch_info snapshot backfill failed ({e:#}); falling back to \
951 rebuilding from local checkpoint history"
952 );
953 }
954 }
955
956 checkpoint_store
958 .backfill_epoch_info_from_local_history(authority_store)
959 .map_err(|e| anyhow::anyhow!("rebuilding epoch_info from local history: {e}"))?;
960 }
961
962 checkpoint_store
965 .ensure_current_epoch_info(authority_store)
966 .map_err(|e| anyhow::anyhow!("seeding the current epoch_info row: {e}"))?;
967
968 if let Some((highest_indexed, last_executed)) = checkpoint_store
971 .epoch_info_gap()
972 .map_err(|e| anyhow::anyhow!("re-checking epoch_info completeness: {e}"))?
973 {
974 let detail = format!(
975 "the epoch_info chain is incomplete after backfilling (finalized through \
976 {highest_indexed:?}, executed through epoch {last_executed})"
977 );
978 if recognized_source.is_some() {
979 anyhow::bail!(
980 "{detail}: the latest published snapshot is older than this node's history \
981 and the missing epochs' checkpoint data is already pruned locally; retry \
982 once a newer snapshot is available"
983 );
984 }
985 warn!(
986 "{detail} and {chain:?} has no public formal-snapshot source to backfill from; \
987 the missing epochs are left unfilled (live indexing only extends the chain \
988 forward), so this node cannot produce snapshots or serve the epoch gRPC API for \
989 those epochs"
990 );
991 }
992 Ok(())
993 }
994
995 async fn backfill_epoch_info_from_snapshot(
1005 checkpoint_store: &CheckpointStore,
1006 remote_store_config: &ObjectStoreConfig,
1007 genesis_committee: Committee,
1008 genesis_system_state: IotaSystemState,
1009 expected_chain_id: ChainIdentifier,
1010 ) -> anyhow::Result<u64> {
1011 let epoch = latest_available_epoch(remote_store_config).await?;
1012 info!("restoring epoch_info from snapshot EPOCH_INFO up to epoch {epoch}");
1013 let (snapshot_chain_id, epoch_info) =
1014 StateSnapshotReaderV1::read_epoch_info_only(epoch, remote_store_config).await?;
1015 let verified = iota_snapshot::verify_epoch_info_chain(
1016 epoch_info,
1017 genesis_committee,
1018 genesis_system_state,
1019 snapshot_chain_id,
1020 expected_chain_id,
1021 )?;
1022 verified.restore_epoch_info(checkpoint_store).await?;
1023 info!("restored epoch_info from snapshot up to epoch {epoch}");
1024 Ok(epoch)
1025 }
1026
1027 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
1028 self.end_of_epoch_channel.subscribe()
1029 }
1030
1031 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
1032 self.shutdown_channel_tx.subscribe()
1033 }
1034
1035 pub fn current_epoch_for_testing(&self) -> EpochId {
1036 self.state.current_epoch_for_testing()
1037 }
1038
1039 pub fn db_checkpoint_path(&self) -> PathBuf {
1040 self.config.db_checkpoint_path()
1041 }
1042
1043 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
1045 info!("close_epoch (current epoch = {})", epoch_store.epoch());
1046 self.validator_components
1047 .lock()
1048 .await
1049 .as_ref()
1050 .ok_or_else(|| IotaError::from("Node is not a validator"))?
1051 .consensus_adapter
1052 .close_epoch(epoch_store);
1053 Ok(())
1054 }
1055
1056 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
1057 self.state
1058 .clear_override_protocol_upgrade_buffer_stake(epoch)
1059 }
1060
1061 pub fn set_override_protocol_upgrade_buffer_stake(
1062 &self,
1063 epoch: EpochId,
1064 buffer_stake_bps: u64,
1065 ) -> IotaResult {
1066 self.state
1067 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1068 }
1069
1070 pub async fn close_epoch_for_testing(&self) -> IotaResult {
1073 let epoch_store = self.state.epoch_store_for_testing();
1074 self.close_epoch(&epoch_store).await
1075 }
1076
1077 async fn start_state_archival(
1078 config: &NodeConfig,
1079 prometheus_registry: &Registry,
1080 state_sync_store: RocksDbStore,
1081 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1082 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1083 let local_store_config = ObjectStoreConfig {
1084 object_store: Some(ObjectStoreType::File),
1085 directory: Some(config.archive_path()),
1086 ..Default::default()
1087 };
1088 let archive_writer = ArchiveWriter::new(
1089 local_store_config,
1090 remote_store_config.clone(),
1091 FileCompression::Zstd,
1092 StorageFormat::Blob,
1093 Duration::from_secs(600),
1094 256 * 1024 * 1024,
1095 prometheus_registry,
1096 )
1097 .await?;
1098 Ok(Some(archive_writer.start(state_sync_store).await?))
1099 } else {
1100 Ok(None)
1101 }
1102 }
1103
1104 fn start_state_snapshot(
1107 config: &NodeConfig,
1108 prometheus_registry: &Registry,
1109 checkpoint_store: Arc<CheckpointStore>,
1110 is_full_node: bool,
1111 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1112 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1113 anyhow::ensure!(
1115 is_full_node,
1116 "Snapshot upload is configured, but this node is a validator. \
1117 Snapshot publication is only supported on fullnodes. Remove the \
1118 `state_snapshot_write_config.object_store_config` setting, or move \
1119 the upload to a fullnode."
1120 );
1121 let snapshot_uploader = StateSnapshotUploader::new(
1122 &config.db_checkpoint_path(),
1123 &config.snapshot_path(),
1124 remote_store_config.clone(),
1125 config.state_snapshot_write_config.concurrency,
1126 60,
1127 prometheus_registry,
1128 checkpoint_store,
1129 )?;
1130 Ok(Some(snapshot_uploader.start()))
1131 } else {
1132 Ok(None)
1133 }
1134 }
1135
1136 fn start_db_checkpoint(
1137 config: &NodeConfig,
1138 prometheus_registry: &Registry,
1139 state_snapshot_enabled: bool,
1140 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
1141 ) -> Result<(
1142 DBCheckpointConfig,
1143 Option<tokio::sync::broadcast::Sender<()>>,
1144 )> {
1145 let checkpoint_path = Some(
1146 config
1147 .db_checkpoint_config
1148 .checkpoint_path
1149 .clone()
1150 .unwrap_or_else(|| config.db_checkpoint_path()),
1151 );
1152 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1153 DBCheckpointConfig {
1154 checkpoint_path,
1155 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1156 true
1157 } else {
1158 config
1159 .db_checkpoint_config
1160 .perform_db_checkpoints_at_epoch_end
1161 },
1162 ..config.db_checkpoint_config.clone()
1163 }
1164 } else {
1165 config.db_checkpoint_config.clone()
1166 };
1167
1168 match (
1169 db_checkpoint_config.object_store_config.as_ref(),
1170 state_snapshot_enabled,
1171 ) {
1172 (None, false) => Ok((db_checkpoint_config, None)),
1177 (_, _) => {
1178 let handler = DBCheckpointHandler::new(
1179 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1180 db_checkpoint_config.object_store_config.as_ref(),
1181 60,
1182 db_checkpoint_config
1183 .prune_and_compact_before_upload
1184 .unwrap_or(true),
1185 config.authority_store_pruning_config.clone(),
1186 prometheus_registry,
1187 state_snapshot_enabled,
1188 checkpoint_progress_tracker,
1189 )?;
1190 Ok((
1191 db_checkpoint_config,
1192 Some(DBCheckpointHandler::start(handler)),
1193 ))
1194 }
1195 }
1196 }
1197
1198 fn create_p2p_network(
1199 config: &NodeConfig,
1200 state_sync_store: RocksDbStore,
1201 chain_identifier: ChainIdentifier,
1202 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1203 archive_readers: ArchiveReaderBalancer,
1204 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1205 prometheus_registry: &Registry,
1206 ) -> Result<(
1207 Network,
1208 discovery::Handle,
1209 state_sync::Handle,
1210 randomness::Handle,
1211 )> {
1212 let (state_sync, state_sync_server) = state_sync::Builder::new()
1213 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1214 .store(state_sync_store)
1215 .archive_readers(archive_readers)
1216 .with_metrics(prometheus_registry)
1217 .build();
1218
1219 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1220 .config(config.p2p_config.clone())
1221 .build();
1222
1223 let (randomness, randomness_router) =
1224 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1225 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1226 .with_metrics(prometheus_registry)
1227 .build();
1228
1229 let p2p_network = {
1230 let routes = anemo::Router::new()
1231 .add_rpc_service(discovery_server)
1232 .add_rpc_service(state_sync_server);
1233 let routes = routes.merge(randomness_router);
1234
1235 let inbound_network_metrics =
1236 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1237 let outbound_network_metrics =
1238 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1239
1240 let service = ServiceBuilder::new()
1241 .layer(
1242 TraceLayer::new_for_server_errors()
1243 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1244 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1245 )
1246 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1247 Arc::new(inbound_network_metrics),
1248 config.p2p_config.excessive_message_size(),
1249 )))
1250 .service(routes);
1251
1252 let outbound_layer = ServiceBuilder::new()
1253 .layer(
1254 TraceLayer::new_for_client_and_server_errors()
1255 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1256 .on_failure(DefaultOnFailure::new().level(tracing::Level::DEBUG)),
1257 )
1258 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1259 Arc::new(outbound_network_metrics),
1260 config.p2p_config.excessive_message_size(),
1261 )))
1262 .into_inner();
1263
1264 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1265 anemo_config.max_frame_size = Some(1 << 30);
1268
1269 let mut quic_config = anemo_config.quic.unwrap_or_default();
1272 if quic_config.socket_send_buffer_size.is_none() {
1273 quic_config.socket_send_buffer_size = Some(20 << 20);
1274 }
1275 if quic_config.socket_receive_buffer_size.is_none() {
1276 quic_config.socket_receive_buffer_size = Some(20 << 20);
1277 }
1278 quic_config.allow_failed_socket_buffer_size_setting = true;
1279
1280 if quic_config.max_concurrent_bidi_streams.is_none() {
1283 quic_config.max_concurrent_bidi_streams = Some(500);
1284 }
1285 if quic_config.max_concurrent_uni_streams.is_none() {
1286 quic_config.max_concurrent_uni_streams = Some(500);
1287 }
1288 if quic_config.stream_receive_window.is_none() {
1289 quic_config.stream_receive_window = Some(100 << 20);
1290 }
1291 if quic_config.receive_window.is_none() {
1292 quic_config.receive_window = Some(200 << 20);
1293 }
1294 if quic_config.send_window.is_none() {
1295 quic_config.send_window = Some(200 << 20);
1296 }
1297 if quic_config.crypto_buffer_size.is_none() {
1298 quic_config.crypto_buffer_size = Some(1 << 20);
1299 }
1300 if quic_config.max_idle_timeout_ms.is_none() {
1301 quic_config.max_idle_timeout_ms = Some(30_000);
1302 }
1303 if quic_config.keep_alive_interval_ms.is_none() {
1304 quic_config.keep_alive_interval_ms = Some(5_000);
1305 }
1306 anemo_config.quic = Some(quic_config);
1307
1308 let server_name = format!("iota-{chain_identifier}");
1309 let network = Network::bind(config.p2p_config.listen_address)
1310 .server_name(&server_name)
1311 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1312 .config(anemo_config)
1313 .outbound_request_layer(outbound_layer)
1314 .start(service)?;
1315 info!(
1316 server_name = server_name,
1317 "P2p network started on {}",
1318 network.local_addr()
1319 );
1320
1321 network
1322 };
1323
1324 let discovery_handle =
1325 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1326 let state_sync_handle = state_sync.start(p2p_network.clone());
1327 let randomness_handle = randomness.start(p2p_network.clone());
1328
1329 Ok((
1330 p2p_network,
1331 discovery_handle,
1332 state_sync_handle,
1333 randomness_handle,
1334 ))
1335 }
1336
1337 async fn construct_validator_components(
1340 config: NodeConfig,
1341 state: Arc<AuthorityState>,
1342 committee: Arc<Committee>,
1343 epoch_store: Arc<AuthorityPerEpochStore>,
1344 checkpoint_store: Arc<CheckpointStore>,
1345 state_sync_handle: state_sync::Handle,
1346 randomness_handle: randomness::Handle,
1347 global_state_hasher: Weak<GlobalStateHasher>,
1348 backpressure_manager: Arc<BackpressureManager>,
1349 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1350 registry_service: &RegistryService,
1351 ) -> Result<ValidatorComponents> {
1352 let mut config_clone = config.clone();
1353 let consensus_config = config_clone
1354 .consensus_config
1355 .as_mut()
1356 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1357 let validator_registry = Registry::new();
1358 let validator_registry_id = registry_service.add(validator_registry.clone());
1359
1360 let client = Arc::new(UpdatableConsensusClient::new());
1361 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1362 &committee,
1363 consensus_config,
1364 state.name,
1365 connection_monitor_status.clone(),
1366 &validator_registry,
1367 client.clone(),
1368 checkpoint_store.clone(),
1369 ));
1370 let consensus_manager = Arc::new(ConsensusManager::new(
1371 &config,
1372 consensus_config,
1373 registry_service,
1374 &validator_registry,
1375 client,
1376 ));
1377
1378 let consensus_store_pruner = ConsensusStorePruner::new(
1381 consensus_manager.get_storage_base_path(),
1382 consensus_config.db_retention_epochs(),
1383 consensus_config.db_pruner_period(),
1384 &validator_registry,
1385 );
1386
1387 let soft_locks = Arc::new(if config.enable_soft_locking {
1388 PreConsensusSoftLocks::new()
1389 } else {
1390 info!("pre-consensus soft-locking disabled via node config");
1391 PreConsensusSoftLocks::disabled()
1392 });
1393
1394 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1395 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1396 let validator_service_metrics = Arc::new(ValidatorServiceMetrics::new(&validator_registry));
1397
1398 let soft_lock_sweep_handle = PreConsensusSoftLocks::spawn_sweep(
1409 Arc::downgrade(&soft_locks),
1410 validator_service_metrics.clone(),
1411 );
1412
1413 let validator_server_handle = Self::start_grpc_validator_service(
1414 &config,
1415 state.clone(),
1416 consensus_adapter.clone(),
1417 &validator_registry,
1418 soft_locks.clone(),
1419 validator_service_metrics.clone(),
1420 )
1421 .await?;
1422
1423 let validator_overload_monitor_handle = if config
1426 .authority_overload_config
1427 .max_load_shedding_percentage
1428 > 0
1429 {
1430 let authority_state = Arc::downgrade(&state);
1431 let overload_config = config.authority_overload_config.clone();
1432 fail_point!("starting_overload_monitor");
1433 Some(spawn_monitored_task!(overload_monitor(
1434 authority_state,
1435 overload_config,
1436 )))
1437 } else {
1438 None
1439 };
1440
1441 let consensus_queue_overload_monitor_handle =
1448 if epoch_store.protocol_config().enable_pcool_flow() {
1449 let consensus_queue_monitor_authority_state = Arc::downgrade(&state);
1450 let consensus_queue_monitor_consensus_adapter = Arc::downgrade(&consensus_adapter);
1451 let consensus_queue_monitor_interval =
1452 config.authority_overload_config.overload_monitor_interval;
1453 Some(spawn_monitored_task!(consensus_queue_overload_monitor(
1454 consensus_queue_monitor_authority_state,
1455 consensus_queue_monitor_consensus_adapter,
1456 consensus_queue_monitor_interval,
1457 )))
1458 } else {
1459 None
1460 };
1461
1462 Self::start_epoch_specific_validator_components(
1463 &config,
1464 state.clone(),
1465 consensus_adapter,
1466 checkpoint_store,
1467 epoch_store,
1468 state_sync_handle,
1469 randomness_handle,
1470 consensus_manager,
1471 consensus_store_pruner,
1472 global_state_hasher,
1473 backpressure_manager,
1474 soft_locks,
1475 validator_server_handle,
1476 validator_overload_monitor_handle,
1477 consensus_queue_overload_monitor_handle,
1478 soft_lock_sweep_handle,
1479 checkpoint_metrics,
1480 iota_tx_validator_metrics,
1481 validator_registry_id,
1482 )
1483 .await
1484 }
1485
1486 async fn start_epoch_specific_validator_components(
1489 config: &NodeConfig,
1490 state: Arc<AuthorityState>,
1491 consensus_adapter: Arc<ConsensusAdapter>,
1492 checkpoint_store: Arc<CheckpointStore>,
1493 epoch_store: Arc<AuthorityPerEpochStore>,
1494 state_sync_handle: state_sync::Handle,
1495 randomness_handle: randomness::Handle,
1496 consensus_manager: Arc<ConsensusManager>,
1497 consensus_store_pruner: ConsensusStorePruner,
1498 global_state_hasher: Weak<GlobalStateHasher>,
1499 backpressure_manager: Arc<BackpressureManager>,
1500 soft_locks: Arc<PreConsensusSoftLocks>,
1501 validator_server_handle: SpawnOnce,
1502 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1503 consensus_queue_overload_monitor_handle: Option<JoinHandle<()>>,
1504 soft_lock_sweep_handle: JoinHandle<()>,
1505 checkpoint_metrics: Arc<CheckpointMetrics>,
1506 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1507 validator_registry_id: RegistryID,
1508 ) -> Result<ValidatorComponents> {
1509 let checkpoint_service = Self::build_checkpoint_service(
1510 config,
1511 consensus_adapter.clone(),
1512 checkpoint_store.clone(),
1513 epoch_store.clone(),
1514 state.clone(),
1515 state_sync_handle,
1516 global_state_hasher,
1517 checkpoint_metrics.clone(),
1518 );
1519
1520 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1525
1526 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1527
1528 soft_locks.clear();
1533 epoch_store.set_soft_locks(soft_locks.clone());
1534
1535 let randomness_manager = RandomnessManager::try_new(
1536 Arc::downgrade(&epoch_store),
1537 Box::new(consensus_adapter.clone()),
1538 randomness_handle,
1539 config.authority_key_pair(),
1540 )
1541 .await;
1542 if let Some(randomness_manager) = randomness_manager {
1543 epoch_store
1544 .set_randomness_manager(randomness_manager)
1545 .await?;
1546 }
1547
1548 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1549 state.clone(),
1550 checkpoint_service.clone(),
1551 epoch_store.clone(),
1552 low_scoring_authorities,
1553 backpressure_manager,
1554 );
1555
1556 info!("Starting consensus manager asynchronously");
1557
1558 tokio::spawn({
1560 let config = config.clone();
1561 let epoch_store = epoch_store.clone();
1562 let iota_tx_validator = IotaTxValidator::new(
1563 epoch_store.clone(),
1564 checkpoint_service.clone(),
1565 state.transaction_manager().clone(),
1566 iota_tx_validator_metrics.clone(),
1567 );
1568 let consensus_manager = consensus_manager.clone();
1569 async move {
1570 consensus_manager
1571 .start(
1572 &config,
1573 epoch_store,
1574 consensus_handler_initializer,
1575 iota_tx_validator,
1576 )
1577 .await;
1578 }
1579 });
1580 let replay_waiter = consensus_manager.replay_waiter();
1581
1582 info!("Spawning checkpoint service");
1583 let replay_waiter = if std::env::var("DISABLE_REPLAY_WAITER").is_ok() {
1584 None
1585 } else {
1586 Some(replay_waiter)
1587 };
1588 let checkpoint_service_tasks = checkpoint_service.spawn(replay_waiter).await;
1589
1590 let overload_notifier_handle = Self::start_overload_notifier(
1591 config,
1592 state.clone(),
1593 epoch_store.clone(),
1594 consensus_adapter.clone(),
1595 );
1596
1597 Ok(ValidatorComponents {
1598 validator_server_handle,
1599 validator_overload_monitor_handle,
1600 consensus_queue_overload_monitor_handle,
1601 soft_lock_sweep_handle,
1602 overload_notifier_handle,
1603 consensus_manager,
1604 consensus_store_pruner,
1605 consensus_adapter,
1606 soft_locks,
1607 checkpoint_service_tasks,
1608 checkpoint_metrics,
1609 iota_tx_validator_metrics,
1610 validator_registry_id,
1611 })
1612 }
1613
1614 fn build_checkpoint_service(
1621 config: &NodeConfig,
1622 consensus_adapter: Arc<ConsensusAdapter>,
1623 checkpoint_store: Arc<CheckpointStore>,
1624 epoch_store: Arc<AuthorityPerEpochStore>,
1625 state: Arc<AuthorityState>,
1626 state_sync_handle: state_sync::Handle,
1627 global_state_hasher: Weak<GlobalStateHasher>,
1628 checkpoint_metrics: Arc<CheckpointMetrics>,
1629 ) -> Arc<CheckpointService> {
1630 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1631 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1632
1633 debug!(
1634 "Starting checkpoint service with epoch start timestamp {}
1635 and epoch duration {}",
1636 epoch_start_timestamp_ms, epoch_duration_ms
1637 );
1638
1639 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1640 sender: consensus_adapter,
1641 signer: state.secret.clone(),
1642 authority: config.authority_public_key(),
1643 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1644 .checked_add(epoch_duration_ms)
1645 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1646 metrics: checkpoint_metrics.clone(),
1647 });
1648
1649 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1650 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1651 let max_checkpoint_size_bytes =
1652 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1653
1654 CheckpointService::build(
1655 state.clone(),
1656 checkpoint_store,
1657 epoch_store,
1658 state.get_transaction_cache_reader().clone(),
1659 global_state_hasher,
1660 checkpoint_output,
1661 Box::new(certified_checkpoint_output),
1662 checkpoint_metrics,
1663 max_tx_per_checkpoint,
1664 max_checkpoint_size_bytes,
1665 )
1666 }
1667
1668 fn construct_consensus_adapter(
1669 committee: &Committee,
1670 consensus_config: &ConsensusConfig,
1671 authority: AuthorityName,
1672 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1673 prometheus_registry: &Registry,
1674 consensus_client: Arc<dyn ConsensusClient>,
1675 checkpoint_store: Arc<CheckpointStore>,
1676 ) -> ConsensusAdapter {
1677 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1678 ConsensusAdapter::new(
1682 consensus_client,
1683 checkpoint_store,
1684 authority,
1685 connection_monitor_status,
1686 consensus_config.max_pending_transactions(),
1687 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1688 consensus_config.max_submit_position,
1689 consensus_config.submit_delay_step_override(),
1690 ca_metrics,
1691 consensus_config.graduated_load_shedding_soft_limit_pct(),
1692 )
1693 }
1694
1695 async fn start_grpc_validator_service(
1696 config: &NodeConfig,
1697 state: Arc<AuthorityState>,
1698 consensus_adapter: Arc<ConsensusAdapter>,
1699 prometheus_registry: &Registry,
1700 soft_locks: Arc<PreConsensusSoftLocks>,
1701 validator_service_metrics: Arc<ValidatorServiceMetrics>,
1702 ) -> Result<SpawnOnce> {
1703 let validator_service = ValidatorService::new(
1704 state,
1705 consensus_adapter,
1706 validator_service_metrics,
1707 config.policy_config.clone().map(|p| p.client_id_source),
1708 soft_locks,
1709 );
1710
1711 let mut server_conf = iota_network_stack::config::Config::new();
1712 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1713 server_conf.load_shed = config.grpc_load_shed;
1714 let server_builder =
1715 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1716 .add_service(ValidatorServer::new(validator_service.clone()))
1717 .add_service(ValidatorV2Server::new(validator_service.clone()))
1718 .add_service(ValidatorPeerServer::new(validator_service));
1719
1720 let tls_config = iota_tls::create_rustls_server_config(
1721 config.network_key_pair().copy().private(),
1722 IOTA_TLS_SERVER_NAME.to_string(),
1723 );
1724
1725 let network_address = config.network_address().clone();
1726
1727 let bind_future = async move {
1728 let server = server_builder
1729 .bind(&network_address, Some(tls_config))
1730 .await
1731 .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1732
1733 let local_addr = server.local_addr();
1734 info!("Listening to traffic on {local_addr}");
1735
1736 Ok(server)
1737 };
1738
1739 Ok(SpawnOnce::new(bind_future))
1740 }
1741
1742 async fn reexecute_pending_consensus_certs(
1760 epoch_store: &Arc<AuthorityPerEpochStore>,
1761 state: &Arc<AuthorityState>,
1762 ) {
1763 let mut pending_consensus_certificates = Vec::new();
1764 let mut additional_certs = Vec::new();
1765
1766 for tx in epoch_store.get_all_pending_consensus_transactions() {
1767 match tx.kind {
1768 ConsensusTransactionKind::CertifiedTransaction(tx)
1778 if !tx.contains_shared_object() =>
1779 {
1780 let tx = *tx;
1781 let tx = VerifiedExecutableTransaction::new_from_certificate(
1784 VerifiedCertificate::new_unchecked(tx),
1785 );
1786 if let Some(fx_digest) = epoch_store
1789 .get_signed_effects_digest(tx.digest())
1790 .expect("db error")
1791 {
1792 pending_consensus_certificates.push((tx, fx_digest));
1793 } else {
1794 additional_certs.push(tx);
1795 }
1796 }
1797 _ => (),
1798 }
1799 }
1800
1801 let digests = pending_consensus_certificates
1802 .iter()
1803 .map(|(tx, _)| *tx.digest())
1804 .collect::<Vec<_>>();
1805
1806 info!(
1807 "reexecuting {} pending consensus certificates: {:?}",
1808 digests.len(),
1809 digests
1810 );
1811
1812 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1813 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1814
1815 let timeout = if cfg!(msim) { 120 } else { 60 };
1821 if tokio::time::timeout(
1822 std::time::Duration::from_secs(timeout),
1823 state
1824 .get_transaction_cache_reader()
1825 .try_notify_read_executed_effects_digests(&digests),
1826 )
1827 .await
1828 .is_err()
1829 {
1830 if let Ok(executed_effects_digests) = state
1832 .get_transaction_cache_reader()
1833 .try_multi_get_executed_effects_digests(&digests)
1834 {
1835 let pending_digests = digests
1836 .iter()
1837 .zip(executed_effects_digests.iter())
1838 .filter_map(|(digest, executed_effects_digest)| {
1839 if executed_effects_digest.is_none() {
1840 Some(digest)
1841 } else {
1842 None
1843 }
1844 })
1845 .collect::<Vec<_>>();
1846 debug_fatal!(
1847 "Timed out waiting for effects digests to be executed: {:?}",
1848 pending_digests
1849 );
1850 } else {
1851 debug_fatal!(
1852 "Timed out waiting for effects digests to be executed, digests not found"
1853 );
1854 }
1855 }
1856 }
1857
1858 pub fn state(&self) -> Arc<AuthorityState> {
1859 self.state.clone()
1860 }
1861
1862 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1864 self.state.reference_gas_price_for_testing()
1865 }
1866
1867 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1868 self.state.committee_store().clone()
1869 }
1870
1871 pub fn clone_authority_aggregator(
1881 &self,
1882 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1883 self.transaction_orchestrator
1884 .as_ref()
1885 .map(|to| to.clone_authority_aggregator())
1886 }
1887
1888 pub fn transaction_orchestrator(
1889 &self,
1890 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1891 self.transaction_orchestrator.clone()
1892 }
1893
1894 pub fn subscribe_to_transaction_orchestrator_effects(
1895 &self,
1896 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1897 self.transaction_orchestrator
1898 .as_ref()
1899 .map(|to| to.subscribe_to_effects_queue())
1900 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1901 }
1902
1903 pub async fn monitor_reconfiguration(
1909 self: Arc<Self>,
1910 mut epoch_store: Arc<AuthorityPerEpochStore>,
1911 ) -> Result<()> {
1912 let checkpoint_executor_metrics =
1913 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1914
1915 loop {
1916 let mut hasher_guard = self.global_state_hasher.lock().await;
1917 let hasher = hasher_guard.take().unwrap();
1918 info!(
1919 "Creating checkpoint executor for epoch {}",
1920 epoch_store.epoch()
1921 );
1922
1923 let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1925 guard.as_ref().map(|handle| {
1926 let tx = handle.checkpoint_data_broadcaster().clone();
1927 Box::new(move |data: &CheckpointData| {
1928 tx.send_traced(data);
1929 }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1930 })
1931 } else {
1932 None
1933 };
1934
1935 let checkpoint_executor = CheckpointExecutor::new(
1936 epoch_store.clone(),
1937 self.checkpoint_store.clone(),
1938 self.state.clone(),
1939 hasher.clone(),
1940 self.backpressure_manager.clone(),
1941 self.config.checkpoint_executor_config.clone(),
1942 checkpoint_executor_metrics.clone(),
1943 data_sender,
1944 Some(self.checkpoint_progress_tracker.clone()),
1945 );
1946
1947 let run_with_range = self.config.run_with_range;
1948
1949 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1950
1951 self.metrics
1953 .current_protocol_version
1954 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1955
1956 if let Some(components) = &*self.validator_components.lock().await {
1958 tokio::time::sleep(Duration::from_millis(1)).await;
1960
1961 let config = cur_epoch_store.protocol_config();
1962 let binary_config = to_binary_config(config);
1963 let transaction = ConsensusTransaction::new_capability_notification_v1(
1964 AuthorityCapabilitiesV1::new(
1965 self.state.name,
1966 cur_epoch_store.get_chain(),
1967 self.config
1968 .supported_protocol_versions
1969 .expect("Supported versions should be populated")
1970 .truncate_below(config.version),
1972 self.state
1973 .get_available_system_packages(&binary_config)
1974 .await,
1975 ),
1976 );
1977 info!(?transaction, "submitting capabilities to consensus");
1978 components
1979 .consensus_adapter
1980 .submit(transaction, None, &cur_epoch_store)?;
1981 } else if self.state.is_active_validator(&cur_epoch_store)
1982 && cur_epoch_store
1983 .protocol_config()
1984 .track_non_committee_eligible_validators()
1985 {
1986 let epoch_store = cur_epoch_store.clone();
1990 let node_clone = self.clone();
1991 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1992 node_clone
1993 .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1994 .instrument(trace_span!(
1995 "send_signed_capability_notification_to_committee_with_retry"
1996 ))
1997 .await;
1998 }));
1999 }
2000
2001 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
2002
2003 if stop_condition == StopReason::RunWithRangeCondition {
2004 IotaNode::shutdown(&self).await;
2005 self.shutdown_channel_tx
2006 .send(run_with_range)
2007 .expect("RunWithRangeCondition met but failed to send shutdown message");
2008 return Ok(());
2009 }
2010
2011 let latest_system_state = self
2013 .state
2014 .get_object_cache_reader()
2015 .try_get_iota_system_state_object_unsafe()
2016 .expect("Read IOTA System State object cannot fail");
2017
2018 #[cfg(msim)]
2019 if !self
2020 .sim_state
2021 .sim_safe_mode_expected
2022 .load(Ordering::Relaxed)
2023 {
2024 debug_assert!(!latest_system_state.safe_mode());
2025 }
2026
2027 #[cfg(not(msim))]
2028 debug_assert!(!latest_system_state.safe_mode());
2029
2030 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
2031 if self.state.is_fullnode(&cur_epoch_store) {
2032 warn!(
2033 "Failed to send end of epoch notification to subscriber: {:?}",
2034 err
2035 );
2036 }
2037 }
2038
2039 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
2040 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
2041
2042 self.auth_agg.store(Arc::new(
2043 self.auth_agg
2044 .load()
2045 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
2046 ));
2047
2048 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
2049 let next_epoch = next_epoch_committee.epoch();
2050 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
2051
2052 info!(
2053 next_epoch,
2054 "Finished executing all checkpoints in epoch. About to reconfigure the system."
2055 );
2056
2057 fail_point_async!("reconfig_delay");
2058
2059 let authority_names_to_peer_ids =
2064 new_epoch_start_state.get_authority_names_to_peer_ids();
2065 self.connection_monitor_status
2066 .update_mapping_for_epoch(authority_names_to_peer_ids);
2067
2068 cur_epoch_store.record_epoch_reconfig_start_time_metric();
2069
2070 send_trusted_peer_change(
2071 &self.config,
2072 &self.trusted_peer_change_tx,
2073 &new_epoch_start_state,
2074 );
2075
2076 let mut validator_components_lock_guard = self.validator_components.lock().await;
2077
2078 let new_epoch_store = self
2082 .reconfigure_state(
2083 &self.state,
2084 &cur_epoch_store,
2085 next_epoch_committee.clone(),
2086 new_epoch_start_state,
2087 hasher.clone(),
2088 )
2089 .await?;
2090
2091 let new_validator_components = if let Some(ValidatorComponents {
2092 validator_server_handle,
2093 validator_overload_monitor_handle,
2094 consensus_queue_overload_monitor_handle,
2095 soft_lock_sweep_handle,
2096 overload_notifier_handle,
2097 consensus_manager,
2098 consensus_store_pruner,
2099 consensus_adapter,
2100 soft_locks,
2101 mut checkpoint_service_tasks,
2102 checkpoint_metrics,
2103 iota_tx_validator_metrics,
2104 validator_registry_id,
2105 }) = validator_components_lock_guard.take()
2106 {
2107 info!("Reconfiguring the validator.");
2108 if let Some(handle) = overload_notifier_handle {
2111 handle.abort();
2112 }
2113 checkpoint_service_tasks.abort_all();
2118 while let Some(result) = checkpoint_service_tasks.join_next().await {
2119 if let Err(err) = result {
2120 if err.is_panic() {
2121 std::panic::resume_unwind(err.into_panic());
2122 }
2123 warn!("Error in checkpoint service task: {:?}", err);
2124 }
2125 }
2126 info!("Checkpoint service has shut down.");
2127
2128 consensus_manager.shutdown().await;
2129 info!("Consensus has shut down.");
2130
2131 info!("Epoch store finished reconfiguration.");
2132
2133 let global_state_hasher_metrics = Arc::into_inner(hasher)
2136 .expect("Object state hasher should have no other references at this point")
2137 .metrics();
2138 let new_hasher = Arc::new(GlobalStateHasher::new(
2139 self.state.get_global_state_hash_store().clone(),
2140 global_state_hasher_metrics,
2141 ));
2142 let weak_hasher = Arc::downgrade(&new_hasher);
2143 *hasher_guard = Some(new_hasher);
2144
2145 consensus_store_pruner.prune(next_epoch).await;
2146
2147 if self.state.is_committee_validator(&new_epoch_store) {
2148 Some(
2150 Self::start_epoch_specific_validator_components(
2151 &self.config,
2152 self.state.clone(),
2153 consensus_adapter,
2154 self.checkpoint_store.clone(),
2155 new_epoch_store.clone(),
2156 self.state_sync_handle.clone(),
2157 self.randomness_handle.clone(),
2158 consensus_manager,
2159 consensus_store_pruner,
2160 weak_hasher,
2161 self.backpressure_manager.clone(),
2162 soft_locks,
2163 validator_server_handle,
2164 validator_overload_monitor_handle,
2165 consensus_queue_overload_monitor_handle,
2166 soft_lock_sweep_handle,
2167 checkpoint_metrics,
2168 iota_tx_validator_metrics,
2169 validator_registry_id,
2170 )
2171 .await?,
2172 )
2173 } else {
2174 info!("This node is no longer a validator after reconfiguration");
2175 if self.registry_service.remove(validator_registry_id) {
2176 debug!("Removed validator metrics registry");
2177 } else {
2178 warn!("Failed to remove validator metrics registry");
2179 }
2180 validator_server_handle.shutdown();
2181 debug!("Validator grpc server shutdown triggered");
2182
2183 None
2184 }
2185 } else {
2186 let global_state_hasher_metrics = Arc::into_inner(hasher)
2189 .expect("Object state hasher should have no other references at this point")
2190 .metrics();
2191 let new_hasher = Arc::new(GlobalStateHasher::new(
2192 self.state.get_global_state_hash_store().clone(),
2193 global_state_hasher_metrics,
2194 ));
2195 let weak_hasher = Arc::downgrade(&new_hasher);
2196 *hasher_guard = Some(new_hasher);
2197
2198 if self.state.is_committee_validator(&new_epoch_store) {
2199 info!("Promoting the node from fullnode to validator, starting grpc server");
2200
2201 let mut components = Self::construct_validator_components(
2202 self.config.clone(),
2203 self.state.clone(),
2204 Arc::new(next_epoch_committee.clone()),
2205 new_epoch_store.clone(),
2206 self.checkpoint_store.clone(),
2207 self.state_sync_handle.clone(),
2208 self.randomness_handle.clone(),
2209 weak_hasher,
2210 self.backpressure_manager.clone(),
2211 self.connection_monitor_status.clone(),
2212 &self.registry_service,
2213 )
2214 .await?;
2215
2216 components.validator_server_handle =
2217 components.validator_server_handle.start().await;
2218
2219 Some(components)
2220 } else {
2221 None
2222 }
2223 };
2224 *validator_components_lock_guard = new_validator_components;
2225
2226 cur_epoch_store.release_db_handles();
2229
2230 drop(cur_epoch_store);
2234
2235 self.state.epoch_db_pruner().prune_old_epoch_dbs().await;
2239
2240 if cfg!(msim)
2241 && !matches!(
2242 self.config
2243 .authority_store_pruning_config
2244 .num_epochs_to_retain_for_checkpoints(),
2245 None | Some(u64::MAX) | Some(0)
2246 )
2247 {
2248 self.state
2249 .prune_checkpoints_for_eligible_epochs_for_testing(
2250 self.config.clone(),
2251 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2252 )
2253 .await?;
2254 }
2255
2256 epoch_store = new_epoch_store;
2257 info!("Reconfiguration finished");
2258 }
2259 }
2260
2261 async fn shutdown(&self) {
2262 if let Some(validator_components) = &*self.validator_components.lock().await {
2263 validator_components.consensus_manager.shutdown().await;
2264 }
2265
2266 if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2268 info!("Shutting down gRPC server");
2269 if let Err(e) = grpc_handle.shutdown().await {
2270 warn!("Failed to gracefully shutdown gRPC server: {e}");
2271 }
2272 }
2273 }
2274
2275 async fn reconfigure_state(
2278 &self,
2279 state: &Arc<AuthorityState>,
2280 cur_epoch_store: &AuthorityPerEpochStore,
2281 next_epoch_committee: Committee,
2282 next_epoch_start_system_state: EpochStartSystemState,
2283 global_state_hasher: Arc<GlobalStateHasher>,
2284 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2285 let next_epoch = next_epoch_committee.epoch();
2286
2287 let last_checkpoint = self
2288 .checkpoint_store
2289 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2290 .expect("Error loading last checkpoint for current epoch")
2291 .expect("Could not load last checkpoint for current epoch");
2292 let epoch_supply_change = last_checkpoint
2293 .end_of_epoch_data
2294 .as_ref()
2295 .ok_or_else(|| {
2296 IotaError::from("last checkpoint in epoch should contain end of epoch data")
2297 })?
2298 .epoch_supply_change;
2299
2300 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2301
2302 assert_eq!(
2303 Some(last_checkpoint_seq),
2304 self.checkpoint_store
2305 .get_highest_executed_checkpoint_seq_number()
2306 .expect("Error loading highest executed checkpoint sequence number")
2307 );
2308
2309 let epoch_start_configuration = EpochStartConfiguration::new(
2310 next_epoch_start_system_state,
2311 *last_checkpoint.digest(),
2312 state.get_object_store().as_ref(),
2313 EpochFlag::default_flags_for_new_epoch(&state.config),
2314 )
2315 .expect("EpochStartConfiguration construction cannot fail");
2316
2317 let new_epoch_store = self
2318 .state
2319 .reconfigure(
2320 cur_epoch_store,
2321 self.config.supported_protocol_versions.unwrap(),
2322 next_epoch_committee,
2323 epoch_start_configuration,
2324 global_state_hasher,
2325 &self.config.expensive_safety_check_config,
2326 epoch_supply_change,
2327 last_checkpoint_seq,
2328 )
2329 .await
2330 .expect("Reconfigure authority state cannot fail");
2331 info!(next_epoch, "Node State has been reconfigured");
2332 assert_eq!(next_epoch, new_epoch_store.epoch());
2333 self.state.get_reconfig_api().update_epoch_flags_metrics(
2334 cur_epoch_store.epoch_start_config().flags(),
2335 new_epoch_store.epoch_start_config().flags(),
2336 );
2337
2338 Ok(new_epoch_store)
2339 }
2340
2341 pub fn get_config(&self) -> &NodeConfig {
2342 &self.config
2343 }
2344
2345 async fn execute_transaction_immediately_at_zero_epoch(
2346 state: &Arc<AuthorityState>,
2347 epoch_store: &Arc<AuthorityPerEpochStore>,
2348 tx: &Transaction,
2349 span: tracing::Span,
2350 ) {
2351 let _guard = span.enter();
2352 let transaction =
2353 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2354 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2355 tx.data().clone(),
2356 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2357 ),
2358 );
2359 state
2360 .try_execute_immediately(&transaction, None, epoch_store)
2361 .unwrap();
2362 }
2363
2364 pub fn randomness_handle(&self) -> randomness::Handle {
2365 self.randomness_handle.clone()
2366 }
2367
2368 async fn send_signed_capability_notification_to_committee_with_retry(
2374 &self,
2375 epoch_store: &Arc<AuthorityPerEpochStore>,
2376 ) {
2377 const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2378 const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2379 const MAX_RETRY_INTERVAL_SECS: u64 = 300; let config = epoch_store.protocol_config();
2383 let binary_config = to_binary_config(config);
2384
2385 let capabilities = AuthorityCapabilitiesV1::new(
2387 self.state.name,
2388 epoch_store.get_chain(),
2389 self.config
2390 .supported_protocol_versions
2391 .expect("Supported versions should be populated")
2392 .truncate_below(config.version),
2393 self.state
2394 .get_available_system_packages(&binary_config)
2395 .await,
2396 );
2397
2398 let signature = AuthoritySignature::new_secure(
2400 &IntentMessage::new(
2401 Intent::iota_app(IntentScope::AuthorityCapabilities),
2402 &capabilities,
2403 ),
2404 &epoch_store.epoch(),
2405 self.config.authority_key_pair(),
2406 );
2407
2408 let request = HandleCapabilityNotificationRequestV1 {
2409 message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2410 };
2411
2412 let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2413
2414 loop {
2415 let auth_agg = self.auth_agg.load();
2416 match auth_agg
2417 .send_capability_notification_to_quorum(request.clone())
2418 .await
2419 {
2420 Ok(_) => {
2421 info!("Successfully sent capability notification to committee");
2422 break;
2423 }
2424 Err(err) => {
2425 match &err {
2426 AggregatorSendCapabilityNotificationError::RetryableNotification {
2427 errors,
2428 } => {
2429 warn!(
2430 "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2431 retry_interval, errors
2432 );
2433 }
2434 AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2435 errors,
2436 } => {
2437 error!(
2438 "Failed to send capability notification to committee (non-retryable error): {:?}",
2439 errors
2440 );
2441 break;
2442 }
2443 };
2444
2445 tokio::time::sleep(retry_interval).await;
2447
2448 retry_interval = std::cmp::min(
2450 retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2451 Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2452 );
2453 }
2454 }
2455 }
2456 }
2457}
2458
2459#[cfg(msim)]
2460impl IotaNode {
2461 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2462 self.sim_state.sim_node.id()
2463 }
2464
2465 pub fn set_safe_mode_expected(&self, new_value: bool) {
2466 info!("Setting safe mode expected to {}", new_value);
2467 self.sim_state
2468 .sim_safe_mode_expected
2469 .store(new_value, Ordering::Relaxed);
2470 }
2471}
2472
2473enum SpawnOnce {
2474 Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2476 #[allow(unused)]
2477 Started(iota_http::ServerHandle),
2478}
2479
2480impl SpawnOnce {
2481 pub fn new(
2482 future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2483 ) -> Self {
2484 Self::Unstarted(Mutex::new(Box::pin(future)))
2485 }
2486
2487 pub async fn start(self) -> Self {
2488 match self {
2489 Self::Unstarted(future) => {
2490 let server = future
2491 .into_inner()
2492 .await
2493 .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2494 let handle = server.handle().clone();
2495 tokio::spawn(async move {
2496 if let Err(err) = server.serve().await {
2497 info!("Server stopped: {err}");
2498 }
2499 info!("Server stopped");
2500 });
2501 Self::Started(handle)
2502 }
2503 Self::Started(_) => self,
2504 }
2505 }
2506
2507 pub fn shutdown(self) {
2508 if let SpawnOnce::Started(handle) = self {
2509 handle.trigger_shutdown();
2510 }
2511 }
2512}
2513
2514fn formal_snapshot_read_config(chain_id: ChainIdentifier) -> Option<ObjectStoreConfig> {
2528 let (bucket, endpoint) = match chain_id.chain() {
2529 Chain::Mainnet => (
2530 "iota-mainnet-formal",
2531 "https://formal-snapshot.mainnet.iota.cafe",
2532 ),
2533 Chain::Testnet => (
2534 "iota-testnet-formal",
2535 "https://formal-snapshot.testnet.iota.cafe",
2536 ),
2537 Chain::Unknown if chain_id == get_devnet_chain_identifier() => (
2543 "iota-devnet-formal",
2544 "https://formal-snapshot.devnet.iota.cafe",
2545 ),
2546 Chain::Unknown => return None,
2547 };
2548 Some(ObjectStoreConfig {
2549 object_store: Some(ObjectStoreType::S3),
2550 bucket: Some(bucket.to_owned()),
2551 aws_endpoint: Some(endpoint.to_owned()),
2552 aws_virtual_hosted_style_request: true,
2553 object_store_connection_limit: 200,
2554 no_sign_request: true,
2555 ..Default::default()
2556 })
2557}
2558
2559fn send_trusted_peer_change(
2562 config: &NodeConfig,
2563 sender: &watch::Sender<TrustedPeerChangeEvent>,
2564 new_epoch_start_state: &EpochStartSystemState,
2565) {
2566 let new_committee =
2567 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2568
2569 sender.send_modify(|event| {
2570 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2571 event.new_committee = new_committee;
2572 })
2573}
2574
2575fn build_kv_store(
2576 state: &Arc<AuthorityState>,
2577 config: &NodeConfig,
2578 registry: &Registry,
2579) -> Result<Arc<TransactionKeyValueStore>> {
2580 let metrics = KeyValueStoreMetrics::new(registry);
2581 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2582
2583 let base_url = &config.transaction_kv_store_read_config.base_url;
2584
2585 if base_url.is_empty() {
2586 info!("no http kv store url provided, using local db only");
2587 return Ok(Arc::new(db_store));
2588 }
2589
2590 base_url.parse::<url::Url>().tap_err(|e| {
2591 error!(
2592 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2593 base_url, e
2594 )
2595 })?;
2596
2597 let http_store = HttpKVStore::new_kv(
2598 base_url,
2599 config.transaction_kv_store_read_config.cache_size,
2600 metrics.clone(),
2601 )?;
2602 info!("using local key-value store with fallback to http key-value store");
2603 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2604 db_store,
2605 http_store,
2606 metrics,
2607 "json_rpc_fallback",
2608 )))
2609}
2610
2611async fn build_grpc_server(
2626 config: &NodeConfig,
2627 state: Arc<AuthorityState>,
2628 state_sync_store: RocksDbStore,
2629 executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2630 prometheus_registry: &Registry,
2631 server_version: ServerVersion,
2632) -> Result<Option<GrpcServerHandle>> {
2633 if config.consensus_config().is_some() || !config.enable_grpc_api {
2635 return Ok(None);
2636 }
2637
2638 let Some(grpc_config) = &config.grpc_api_config else {
2639 return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2640 };
2641
2642 let chain_id = state.get_chain_identifier();
2644
2645 let grpc_read_store = Arc::new(GrpcReadStore::new(state.clone(), state_sync_store));
2646
2647 let shutdown_token = CancellationToken::new();
2649
2650 let grpc_reader = Arc::new(GrpcReader::new(
2652 grpc_read_store,
2653 Some(server_version.to_string()),
2654 ));
2655
2656 let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2658 let client_id_source = config
2659 .policy_config
2660 .as_ref()
2661 .map(|p| p.client_id_source.clone());
2662
2663 let handle = start_grpc_server(
2664 grpc_reader,
2665 executor,
2666 grpc_config.clone(),
2667 shutdown_token,
2668 chain_id,
2669 Some(grpc_server_metrics),
2670 state.traffic_controller.clone(),
2671 client_id_source,
2672 )
2673 .await?;
2674
2675 Ok(Some(handle))
2676}
2677
2678pub async fn build_http_server(
2693 state: Arc<AuthorityState>,
2694 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2695 config: &NodeConfig,
2696 prometheus_registry: &Registry,
2697) -> Result<Option<iota_http::ServerHandle>> {
2698 if config.consensus_config().is_some() {
2700 return Ok(None);
2701 }
2702
2703 let mut router = axum::Router::new();
2704
2705 let json_rpc_router = {
2706 let traffic_controller = state.traffic_controller.clone();
2707 let mut server = JsonRpcServerBuilder::new(
2708 env!("CARGO_PKG_VERSION"),
2709 prometheus_registry,
2710 traffic_controller,
2711 config.policy_config.clone(),
2712 );
2713
2714 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2715
2716 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2717 server.register_module(ReadApi::new(
2718 state.clone(),
2719 kv_store.clone(),
2720 metrics.clone(),
2721 ))?;
2722 server.register_module(CoinReadApi::new(
2723 state.clone(),
2724 kv_store.clone(),
2725 metrics.clone(),
2726 )?)?;
2727
2728 if config.run_with_range.is_none() {
2731 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2732 }
2733 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2734
2735 if let Some(transaction_orchestrator) = transaction_orchestrator {
2736 server.register_module(TransactionExecutionApi::new(
2737 state.clone(),
2738 transaction_orchestrator.clone(),
2739 metrics.clone(),
2740 ))?;
2741 }
2742
2743 let iota_names_config = config
2744 .iota_names_config
2745 .clone()
2746 .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2747
2748 server.register_module(IndexerApi::new(
2749 state.clone(),
2750 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2751 kv_store,
2752 metrics,
2753 iota_names_config,
2754 config.indexer_max_subscriptions,
2755 ))?;
2756 server.register_module(MoveUtils::new(state.clone()))?;
2757
2758 let server_type = config.jsonrpc_server_type();
2759
2760 server.to_router(server_type).await?
2761 };
2762
2763 router = router.merge(json_rpc_router);
2764
2765 router = router
2766 .route("/health", axum::routing::get(health_check_handler))
2767 .route_layer(axum::Extension(state));
2768
2769 let layers = ServiceBuilder::new()
2770 .map_request(|mut request: axum::http::Request<_>| {
2771 if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2772 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2773 request.extensions_mut().insert(axum_connect_info);
2774 }
2775 request
2776 })
2777 .layer(axum::middleware::from_fn(server_timing_middleware));
2778
2779 router = router.layer(layers);
2780
2781 let handle = iota_http::Builder::new()
2782 .serve(&config.json_rpc_address, router)
2783 .map_err(|e| anyhow::anyhow!("{e}"))?;
2784 info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2785
2786 Ok(Some(handle))
2787}
2788
2789#[derive(Debug, serde::Serialize, serde::Deserialize)]
2790pub struct Threshold {
2791 pub threshold_seconds: Option<u32>,
2792}
2793
2794async fn health_check_handler(
2795 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2796 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2797) -> impl axum::response::IntoResponse {
2798 if let Some(threshold_seconds) = threshold_seconds {
2799 let summary = match state
2801 .get_checkpoint_store()
2802 .get_highest_executed_checkpoint()
2803 {
2804 Ok(Some(summary)) => summary,
2805 Ok(None) => {
2806 warn!("Highest executed checkpoint not found");
2807 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2808 }
2809 Err(err) => {
2810 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2811 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2812 }
2813 };
2814
2815 let latest_chain_time = summary.timestamp();
2817 let threshold =
2818 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2819
2820 if latest_chain_time < threshold {
2822 warn!(
2823 ?latest_chain_time,
2824 ?threshold,
2825 "failing health check due to checkpoint lag"
2826 );
2827 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2828 }
2829 }
2830 (axum::http::StatusCode::OK, "up")
2832}
2833
2834#[cfg(not(test))]
2835fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2836 protocol_config.max_transactions_per_checkpoint() as usize
2837}
2838
2839#[cfg(test)]
2840fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2841 2
2842}