1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::HashMap,
9 fmt,
10 future::Future,
11 path::PathBuf,
12 sync::{Arc, Weak},
13 time::Duration,
14};
15
16use anemo::Network;
17use anemo_tower::{
18 callback::CallbackLayer,
19 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
20};
21use anyhow::{Result, anyhow};
22use arc_swap::ArcSwap;
23use futures::future::BoxFuture;
24pub use handle::IotaNodeHandle;
25use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
26use iota_common::debug_fatal;
27use iota_config::{
28 ConsensusConfig, NodeConfig,
29 node::{DBCheckpointConfig, RunWithRange},
30 node_config_metrics::NodeConfigMetrics,
31 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
32};
33use iota_core::{
34 authority::{
35 AuthorityState, AuthorityStore, RandomnessRoundReceiver,
36 authority_per_epoch_store::AuthorityPerEpochStore,
37 authority_store_pruner::ObjectsCompactionFilter,
38 authority_store_tables::{
39 AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
40 },
41 backpressure::BackpressureManager,
42 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
43 },
44 authority_aggregator::{
45 AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
46 },
47 authority_client::NetworkAuthorityClient,
48 authority_server::{ValidatorService, ValidatorServiceMetrics},
49 checkpoint_progress_tracker::CheckpointProgressTracker,
50 checkpoints::{
51 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
52 SubmitCheckpointToConsensus,
53 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
54 },
55 connection_monitor::ConnectionMonitor,
56 consensus_adapter::{
57 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
58 ConsensusClient,
59 },
60 consensus_handler::ConsensusHandlerInitializer,
61 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
62 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
63 db_checkpoint_handler::DBCheckpointHandler,
64 epoch::{
65 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
66 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
67 reconfiguration::ReconfigurationInitiator,
68 },
69 execution_cache::build_execution_cache,
70 global_state_hasher::{GlobalStateHashMetrics, GlobalStateHasher},
71 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
72 jsonrpc_index::IndexStore,
73 module_cache_metrics::ResolverMetrics,
74 overload_monitor::overload_monitor,
75 safe_client::SafeClientMetricsBase,
76 signature_verifier::SignatureVerifierMetrics,
77 storage::{GrpcReadStore, RocksDbStore},
78 transaction_orchestrator::TransactionOrchestrator,
79 validator_tx_finalizer::ValidatorTxFinalizer,
80};
81use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
82use iota_json_rpc::{
83 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
84 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
85 transaction_builder_api::TransactionBuilderApi,
86 transaction_execution_api::TransactionExecutionApi,
87};
88use iota_json_rpc_api::JsonRpcMetrics;
89use iota_macros::{fail_point, fail_point_async, replay_log};
90use iota_metrics::{
91 RegistryID, RegistryService,
92 hardware_metrics::register_hardware_metrics,
93 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
94 server_timing_middleware, spawn_monitored_task,
95};
96use iota_names::config::IotaNamesConfig;
97use iota_network::{
98 api::{ValidatorPeerServer, ValidatorServer, ValidatorV2Server},
99 discovery,
100 discovery::TrustedPeerChangeEvent,
101 randomness, state_sync,
102};
103use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
104use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
105use iota_sdk_types::{
106 RandomnessRound,
107 crypto::{Intent, IntentMessage, IntentScope},
108};
109use iota_snapshot::uploader::StateSnapshotUploader;
110use iota_storage::{
111 FileCompression, StorageFormat,
112 http_key_value_store::HttpKVStore,
113 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
114 key_value_store_metrics::KeyValueStoreMetrics,
115};
116use iota_types::{
117 base_types::{AuthorityName, ConciseableName, EpochId},
118 committee::Committee,
119 crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits},
120 digests::ChainIdentifier,
121 error::{IotaError, IotaResult},
122 executable_transaction::VerifiedExecutableTransaction,
123 execution_config_utils::to_binary_config,
124 full_checkpoint_content::CheckpointData,
125 iota_system_state::{
126 IotaSystemState, IotaSystemStateTrait,
127 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
128 },
129 messages_consensus::{
130 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
131 SignedAuthorityCapabilitiesV1,
132 },
133 messages_grpc::HandleCapabilityNotificationRequestV1,
134 quorum_driver_types::QuorumDriverEffectsQueueResult,
135 supported_protocol_versions::SupportedProtocolVersions,
136 transaction::{Transaction, VerifiedCertificate},
137};
138use prometheus::Registry;
139#[cfg(msim)]
140use simulator::*;
141use tap::tap::TapFallible;
142use tokio::{
143 sync::{Mutex, broadcast, mpsc, watch},
144 task::{JoinHandle, JoinSet},
145};
146use tokio_util::sync::CancellationToken;
147use tower::ServiceBuilder;
148use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
149use typed_store::{
150 DBMetrics,
151 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
152};
153
154use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
155
156pub mod admin;
157mod handle;
158pub mod metrics;
159
160pub struct ValidatorComponents {
161 validator_server_handle: SpawnOnce,
162 validator_overload_monitor_handle: Option<JoinHandle<()>>,
163 consensus_manager: ConsensusManager,
164 consensus_store_pruner: ConsensusStorePruner,
165 consensus_adapter: Arc<ConsensusAdapter>,
166 checkpoint_service_tasks: JoinSet<()>,
168 checkpoint_metrics: Arc<CheckpointMetrics>,
169 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
170 validator_registry_id: RegistryID,
171}
172
173#[cfg(msim)]
174mod simulator {
175 use std::sync::atomic::AtomicBool;
176
177 pub(super) struct SimState {
178 pub sim_node: iota_simulator::runtime::NodeHandle,
179 pub sim_safe_mode_expected: AtomicBool,
180 _leak_detector: iota_simulator::NodeLeakDetector,
181 }
182
183 impl Default for SimState {
184 fn default() -> Self {
185 Self {
186 sim_node: iota_simulator::runtime::NodeHandle::current(),
187 sim_safe_mode_expected: AtomicBool::new(false),
188 _leak_detector: iota_simulator::NodeLeakDetector::new(),
189 }
190 }
191 }
192}
193
194#[derive(Clone)]
195pub struct ServerVersion {
196 pub bin: &'static str,
197 pub version: &'static str,
198}
199
200impl ServerVersion {
201 pub fn new(bin: &'static str, version: &'static str) -> Self {
202 Self { bin, version }
203 }
204}
205
206impl std::fmt::Display for ServerVersion {
207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 f.write_str(self.bin)?;
209 f.write_str("/")?;
210 f.write_str(self.version)
211 }
212}
213
214pub struct IotaNode {
215 config: NodeConfig,
216 validator_components: Mutex<Option<ValidatorComponents>>,
217 _http_server: Option<iota_http::ServerHandle>,
219 state: Arc<AuthorityState>,
220 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
221 registry_service: RegistryService,
222 metrics: Arc<IotaNodeMetrics>,
223
224 _discovery: discovery::Handle,
225 state_sync_handle: state_sync::Handle,
226 randomness_handle: randomness::Handle,
227 checkpoint_store: Arc<CheckpointStore>,
228 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
229 connection_monitor_status: Arc<ConnectionMonitorStatus>,
230
231 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
233
234 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
237
238 backpressure_manager: Arc<BackpressureManager>,
239
240 checkpoint_progress_tracker: Arc<CheckpointProgressTracker>,
241
242 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
243
244 #[cfg(msim)]
245 sim_state: SimState,
246
247 _state_archive_handle: Option<broadcast::Sender<()>>,
248
249 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
250 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
252
253 grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
255
256 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
262}
263
264impl fmt::Debug for IotaNode {
265 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
266 f.debug_struct("IotaNode")
267 .field("name", &self.state.name.concise())
268 .finish()
269 }
270}
271
272impl IotaNode {
273 pub async fn start(
274 config: NodeConfig,
275 registry_service: RegistryService,
276 ) -> Result<Arc<IotaNode>> {
277 Self::start_async(
278 config,
279 registry_service,
280 ServerVersion::new("iota-node", "unknown"),
281 )
282 .await
283 }
284
285 pub async fn start_async(
286 config: NodeConfig,
287 registry_service: RegistryService,
288 server_version: ServerVersion,
289 ) -> Result<Arc<IotaNode>> {
290 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
291 let mut config = config.clone();
292 if config.supported_protocol_versions.is_none() {
293 info!(
294 "populating config.supported_protocol_versions with default {:?}",
295 SupportedProtocolVersions::SYSTEM_DEFAULT
296 );
297 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
298 }
299
300 let run_with_range = config.run_with_range;
301 let is_validator = config.consensus_config().is_some();
302 let is_full_node = !is_validator;
303 let prometheus_registry = registry_service.default_registry();
304
305 info!(node =? config.authority_public_key(),
306 "Initializing iota-node listening on {}", config.network_address
307 );
308
309 let genesis = config.genesis()?.clone();
310
311 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
312 info!("IOTA chain identifier: {chain_identifier}");
313
314 let db_corrupted_path = &config.db_path().join("status");
316 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
317 panic!("Failed to check database corruption: {err}");
318 }
319
320 DBMetrics::init(&prometheus_registry);
322
323 iota_metrics::init_metrics(&prometheus_registry);
325 #[cfg(not(msim))]
328 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
329
330 register_hardware_metrics(®istry_service, &config.db_path)
332 .expect("Failed registering hardware metrics");
333 prometheus_registry
335 .register(iota_metrics::uptime_metric(
336 if is_validator {
337 "validator"
338 } else {
339 "fullnode"
340 },
341 server_version.version,
342 &chain_identifier.to_string(),
343 ))
344 .expect("Failed registering uptime metric");
345
346 let migration_tx_data = if genesis.contains_migrations() {
349 Some(config.load_migration_tx_data()?)
352 } else {
353 None
354 };
355
356 let secret = Arc::pin(config.authority_key_pair().copy());
357 let genesis_committee = genesis.committee()?;
358 let committee_store = Arc::new(CommitteeStore::new(
359 config.db_path().join("epochs"),
360 &genesis_committee,
361 None,
362 ));
363
364 let mut pruner_db = None;
365 if config
366 .authority_store_pruning_config
367 .enable_compaction_filter
368 {
369 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
370 &config.db_path().join("store"),
371 )));
372 }
373 let compaction_filter = pruner_db
374 .clone()
375 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
376
377 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
379 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
380 enable_write_stall,
381 compaction_filter,
382 };
383 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
384 &config.db_path().join("store"),
385 Some(perpetual_tables_options),
386 ));
387 let is_genesis = perpetual_tables
388 .database_is_empty()
389 .expect("Database read should not fail at init.");
390 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
391 let backpressure_manager =
392 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
393
394 let perpetual_tables_for_progress = perpetual_tables.clone();
395 let store = AuthorityStore::open(
396 perpetual_tables,
397 &genesis,
398 &config,
399 &prometheus_registry,
400 migration_tx_data.as_ref(),
401 )
402 .await?;
403
404 let cur_epoch = store.get_recovery_epoch_at_restart()?;
405 let committee = committee_store
406 .get_committee(&cur_epoch)?
407 .expect("Committee of the current epoch must exist");
408 let epoch_start_configuration = store
409 .get_epoch_start_configuration()?
410 .expect("EpochStartConfiguration of the current epoch must exist");
411 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
412 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
413
414 let cache_traits = build_execution_cache(
415 &config.execution_cache_config,
416 &prometheus_registry,
417 &store,
418 backpressure_manager.clone(),
419 );
420
421 let auth_agg = {
422 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
423 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
424 Arc::new(ArcSwap::new(Arc::new(
425 AuthorityAggregator::new_from_epoch_start_state(
426 epoch_start_configuration.epoch_start_state(),
427 &committee_store,
428 safe_client_metrics_base,
429 auth_agg_metrics,
430 ),
431 )))
432 };
433
434 let chain = match config.chain_override_for_testing {
435 Some(chain) => chain,
436 None => chain_identifier.chain(),
437 };
438
439 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
440 let epoch_store = AuthorityPerEpochStore::new(
441 config.authority_public_key(),
442 committee.clone(),
443 &config.db_path().join("store"),
444 Some(epoch_options.options),
445 EpochMetrics::new(®istry_service.default_registry()),
446 epoch_start_configuration,
447 cache_traits.backing_package_store.clone(),
448 cache_metrics,
449 signature_verifier_metrics,
450 &config.expensive_safety_check_config,
451 (chain_identifier, chain),
452 checkpoint_store
453 .get_highest_executed_checkpoint_seq_number()
454 .expect("checkpoint store read cannot fail")
455 .unwrap_or(0),
456 )?;
457
458 info!("created epoch store");
459
460 replay_log!(
461 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
462 epoch_store.epoch(),
463 epoch_store.protocol_config()
464 );
465
466 if is_genesis {
468 info!("checking IOTA conservation at genesis");
469 cache_traits
474 .reconfig_api
475 .try_expensive_check_iota_conservation(&epoch_store, None)
476 .expect("IOTA conservation check cannot fail at genesis");
477 }
478
479 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
480 let default_buffer_stake = epoch_store
481 .protocol_config()
482 .buffer_stake_for_protocol_upgrade_bps();
483 if effective_buffer_stake != default_buffer_stake {
484 warn!(
485 ?effective_buffer_stake,
486 ?default_buffer_stake,
487 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
488 );
489 }
490
491 checkpoint_store.insert_genesis_checkpoint(
492 genesis.checkpoint(),
493 genesis.checkpoint_contents().clone(),
494 &epoch_store,
495 );
496
497 unmark_db_corruption(db_corrupted_path)?;
499
500 info!("creating state sync store");
501 let state_sync_store = RocksDbStore::new(
502 cache_traits.clone(),
503 committee_store.clone(),
504 checkpoint_store.clone(),
505 );
506
507 let index_store = if is_full_node && config.enable_index_processing {
508 info!("creating index store");
509 Some(Arc::new(IndexStore::new(
510 config.db_path().join("indexes"),
511 &prometheus_registry,
512 epoch_store
513 .protocol_config()
514 .max_move_identifier_len_as_option(),
515 )))
516 } else {
517 None
518 };
519
520 let grpc_indexes_store = if is_full_node && config.enable_grpc_api {
521 Some(Arc::new(
522 GrpcIndexesStore::new(
523 config.db_path().join(GRPC_INDEXES_DIR),
524 Arc::clone(&store),
525 &checkpoint_store,
526 )
527 .await,
528 ))
529 } else {
530 None
531 };
532
533 info!("creating archive reader");
534 let archive_readers =
539 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
540 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
541 let (randomness_tx, randomness_rx) = mpsc::channel(
542 config
543 .p2p_config
544 .randomness
545 .clone()
546 .unwrap_or_default()
547 .mailbox_capacity(),
548 );
549 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
550 Self::create_p2p_network(
551 &config,
552 state_sync_store.clone(),
553 chain_identifier,
554 trusted_peer_change_rx,
555 archive_readers.clone(),
556 randomness_tx,
557 &prometheus_registry,
558 )?;
559
560 send_trusted_peer_change(
563 &config,
564 &trusted_peer_change_tx,
565 epoch_store.epoch_start_state(),
566 );
567
568 info!("start state archival");
569 let state_archive_handle =
571 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
572 .await?;
573
574 info!("start snapshot upload");
575 let state_snapshot_handle =
577 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
578
579 let checkpoint_progress_tracker = Arc::new(CheckpointProgressTracker::new());
580
581 info!("start db checkpoint");
583 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
584 &config,
585 &prometheus_registry,
586 state_snapshot_handle.is_some(),
587 Some(checkpoint_progress_tracker.clone()),
588 )?;
589
590 let mut genesis_objects = genesis.objects().to_vec();
591 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
592 genesis_objects.extend(migration_tx_data.get_objects());
593 }
594
595 let authority_name = config.authority_public_key();
596 let validator_tx_finalizer =
597 config
598 .enable_validator_tx_finalizer
599 .then_some(Arc::new(ValidatorTxFinalizer::new(
600 auth_agg.clone(),
601 authority_name,
602 &prometheus_registry,
603 )));
604
605 info!("create authority state");
606 let state = AuthorityState::new(
607 authority_name,
608 secret,
609 config.supported_protocol_versions.unwrap(),
610 store.clone(),
611 cache_traits.clone(),
612 epoch_store.clone(),
613 committee_store.clone(),
614 index_store.clone(),
615 grpc_indexes_store,
616 checkpoint_store.clone(),
617 &prometheus_registry,
618 &genesis_objects,
619 &db_checkpoint_config,
620 config.clone(),
621 archive_readers,
622 validator_tx_finalizer,
623 chain_identifier,
624 pruner_db,
625 Some(checkpoint_progress_tracker.clone()),
626 config.policy_config.clone(),
627 config.firewall_config.clone(),
628 )
629 .await;
630
631 if epoch_store.epoch() == 0 {
633 let genesis_tx = &genesis.transaction();
634 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
635 Self::execute_transaction_immediately_at_zero_epoch(
637 &state,
638 &epoch_store,
639 genesis_tx,
640 span,
641 )
642 .await;
643
644 if let Some(migration_tx_data) = migration_tx_data {
646 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
647 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
648 Self::execute_transaction_immediately_at_zero_epoch(
649 &state,
650 &epoch_store,
651 tx,
652 span,
653 )
654 .await;
655 }
656 }
657 }
658
659 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
662
663 if config
664 .expensive_safety_check_config
665 .enable_secondary_index_checks()
666 {
667 if let Some(indexes) = state.indexes.clone() {
668 iota_core::verify_indexes::verify_indexes(
669 state.get_global_state_hash_store().as_ref(),
670 indexes,
671 )
672 .expect("secondary indexes are inconsistent");
673 }
674 }
675
676 let (end_of_epoch_channel, end_of_epoch_receiver) =
677 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
678
679 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
680 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
681 auth_agg.load_full(),
682 state.clone(),
683 end_of_epoch_receiver,
684 &config.db_path(),
685 &prometheus_registry,
686 Some(&config),
687 )))
688 } else {
689 None
690 };
691
692 let http_server = build_http_server(
693 state.clone(),
694 &transaction_orchestrator.clone(),
695 &config,
696 &prometheus_registry,
697 )
698 .await?;
699
700 let global_state_hasher = Arc::new(GlobalStateHasher::new(
701 cache_traits.global_state_hash_store.clone(),
702 GlobalStateHashMetrics::new(&prometheus_registry),
703 ));
704
705 let authority_names_to_peer_ids = epoch_store
706 .epoch_start_state()
707 .get_authority_names_to_peer_ids();
708
709 let network_connection_metrics =
710 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
711
712 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
713
714 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
715 p2p_network.downgrade(),
716 network_connection_metrics,
717 HashMap::new(),
718 None,
719 );
720
721 let connection_monitor_status = ConnectionMonitorStatus {
722 connection_statuses,
723 authority_names_to_peer_ids,
724 };
725
726 let connection_monitor_status = Arc::new(connection_monitor_status);
727 let iota_node_metrics =
728 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
729
730 iota_node_metrics
731 .binary_max_protocol_version
732 .set(ProtocolVersion::MAX.as_u64() as i64);
733 iota_node_metrics
734 .configured_max_protocol_version
735 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
736
737 let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
741 transaction_orchestrator
742 .clone()
743 .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
744
745 let grpc_server_handle = build_grpc_server(
746 &config,
747 state.clone(),
748 state_sync_store.clone(),
749 executor,
750 &prometheus_registry,
751 server_version,
752 )
753 .await?;
754
755 let validator_components = if state.is_committee_validator(&epoch_store) {
756 let (components, _) = futures::join!(
757 Self::construct_validator_components(
758 config.clone(),
759 state.clone(),
760 committee,
761 epoch_store.clone(),
762 checkpoint_store.clone(),
763 state_sync_handle.clone(),
764 randomness_handle.clone(),
765 Arc::downgrade(&global_state_hasher),
766 backpressure_manager.clone(),
767 connection_monitor_status.clone(),
768 ®istry_service,
769 ),
770 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
771 );
772 let mut components = components?;
773
774 components.consensus_adapter.submit_recovered(&epoch_store);
775
776 components.validator_server_handle = components.validator_server_handle.start().await;
778
779 Some(components)
780 } else {
781 None
782 };
783
784 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
786
787 let node = Self {
788 config,
789 validator_components: Mutex::new(validator_components),
790 _http_server: http_server,
791 state,
792 transaction_orchestrator,
793 registry_service,
794 metrics: iota_node_metrics,
795
796 _discovery: discovery_handle,
797 state_sync_handle,
798 randomness_handle,
799 checkpoint_store,
800 global_state_hasher: Mutex::new(Some(global_state_hasher)),
801 end_of_epoch_channel,
802 connection_monitor_status,
803 trusted_peer_change_tx,
804 backpressure_manager,
805 checkpoint_progress_tracker: checkpoint_progress_tracker.clone(),
806
807 _db_checkpoint_handle: db_checkpoint_handle,
808
809 #[cfg(msim)]
810 sim_state: Default::default(),
811
812 _state_archive_handle: state_archive_handle,
813 _state_snapshot_uploader_handle: state_snapshot_handle,
814 shutdown_channel_tx: shutdown_channel,
815
816 grpc_server_handle: Mutex::new(grpc_server_handle),
817
818 auth_agg,
819 };
820
821 info!("IotaNode started!");
822 let node = Arc::new(node);
823 let node_copy = node.clone();
824 spawn_monitored_task!(async move {
825 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
826 if let Err(error) = result {
827 warn!("Reconfiguration finished with error {:?}", error);
828 }
829 });
830
831 node.checkpoint_progress_tracker
832 .spawn_logging_task(node.checkpoint_store.clone(), perpetual_tables_for_progress);
833
834 Ok(node)
835 }
836
837 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
838 self.end_of_epoch_channel.subscribe()
839 }
840
841 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
842 self.shutdown_channel_tx.subscribe()
843 }
844
845 pub fn current_epoch_for_testing(&self) -> EpochId {
846 self.state.current_epoch_for_testing()
847 }
848
849 pub fn db_checkpoint_path(&self) -> PathBuf {
850 self.config.db_checkpoint_path()
851 }
852
853 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
855 info!("close_epoch (current epoch = {})", epoch_store.epoch());
856 self.validator_components
857 .lock()
858 .await
859 .as_ref()
860 .ok_or_else(|| IotaError::from("Node is not a validator"))?
861 .consensus_adapter
862 .close_epoch(epoch_store);
863 Ok(())
864 }
865
866 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
867 self.state
868 .clear_override_protocol_upgrade_buffer_stake(epoch)
869 }
870
871 pub fn set_override_protocol_upgrade_buffer_stake(
872 &self,
873 epoch: EpochId,
874 buffer_stake_bps: u64,
875 ) -> IotaResult {
876 self.state
877 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
878 }
879
880 pub async fn close_epoch_for_testing(&self) -> IotaResult {
883 let epoch_store = self.state.epoch_store_for_testing();
884 self.close_epoch(&epoch_store).await
885 }
886
887 async fn start_state_archival(
888 config: &NodeConfig,
889 prometheus_registry: &Registry,
890 state_sync_store: RocksDbStore,
891 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
892 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
893 let local_store_config = ObjectStoreConfig {
894 object_store: Some(ObjectStoreType::File),
895 directory: Some(config.archive_path()),
896 ..Default::default()
897 };
898 let archive_writer = ArchiveWriter::new(
899 local_store_config,
900 remote_store_config.clone(),
901 FileCompression::Zstd,
902 StorageFormat::Blob,
903 Duration::from_secs(600),
904 256 * 1024 * 1024,
905 prometheus_registry,
906 )
907 .await?;
908 Ok(Some(archive_writer.start(state_sync_store).await?))
909 } else {
910 Ok(None)
911 }
912 }
913
914 fn start_state_snapshot(
917 config: &NodeConfig,
918 prometheus_registry: &Registry,
919 checkpoint_store: Arc<CheckpointStore>,
920 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
921 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
922 let snapshot_uploader = StateSnapshotUploader::new(
923 &config.db_checkpoint_path(),
924 &config.snapshot_path(),
925 remote_store_config.clone(),
926 60,
927 prometheus_registry,
928 checkpoint_store,
929 )?;
930 Ok(Some(snapshot_uploader.start()))
931 } else {
932 Ok(None)
933 }
934 }
935
936 fn start_db_checkpoint(
937 config: &NodeConfig,
938 prometheus_registry: &Registry,
939 state_snapshot_enabled: bool,
940 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
941 ) -> Result<(
942 DBCheckpointConfig,
943 Option<tokio::sync::broadcast::Sender<()>>,
944 )> {
945 let checkpoint_path = Some(
946 config
947 .db_checkpoint_config
948 .checkpoint_path
949 .clone()
950 .unwrap_or_else(|| config.db_checkpoint_path()),
951 );
952 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
953 DBCheckpointConfig {
954 checkpoint_path,
955 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
956 true
957 } else {
958 config
959 .db_checkpoint_config
960 .perform_db_checkpoints_at_epoch_end
961 },
962 ..config.db_checkpoint_config.clone()
963 }
964 } else {
965 config.db_checkpoint_config.clone()
966 };
967
968 match (
969 db_checkpoint_config.object_store_config.as_ref(),
970 state_snapshot_enabled,
971 ) {
972 (None, false) => Ok((db_checkpoint_config, None)),
977 (_, _) => {
978 let handler = DBCheckpointHandler::new(
979 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
980 db_checkpoint_config.object_store_config.as_ref(),
981 60,
982 db_checkpoint_config
983 .prune_and_compact_before_upload
984 .unwrap_or(true),
985 config.authority_store_pruning_config.clone(),
986 prometheus_registry,
987 state_snapshot_enabled,
988 checkpoint_progress_tracker,
989 )?;
990 Ok((
991 db_checkpoint_config,
992 Some(DBCheckpointHandler::start(handler)),
993 ))
994 }
995 }
996 }
997
998 fn create_p2p_network(
999 config: &NodeConfig,
1000 state_sync_store: RocksDbStore,
1001 chain_identifier: ChainIdentifier,
1002 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1003 archive_readers: ArchiveReaderBalancer,
1004 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1005 prometheus_registry: &Registry,
1006 ) -> Result<(
1007 Network,
1008 discovery::Handle,
1009 state_sync::Handle,
1010 randomness::Handle,
1011 )> {
1012 let (state_sync, state_sync_server) = state_sync::Builder::new()
1013 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1014 .store(state_sync_store)
1015 .archive_readers(archive_readers)
1016 .with_metrics(prometheus_registry)
1017 .build();
1018
1019 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1020 .config(config.p2p_config.clone())
1021 .build();
1022
1023 let (randomness, randomness_router) =
1024 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1025 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1026 .with_metrics(prometheus_registry)
1027 .build();
1028
1029 let p2p_network = {
1030 let routes = anemo::Router::new()
1031 .add_rpc_service(discovery_server)
1032 .add_rpc_service(state_sync_server);
1033 let routes = routes.merge(randomness_router);
1034
1035 let inbound_network_metrics =
1036 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1037 let outbound_network_metrics =
1038 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1039
1040 let service = ServiceBuilder::new()
1041 .layer(
1042 TraceLayer::new_for_server_errors()
1043 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1044 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1045 )
1046 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1047 Arc::new(inbound_network_metrics),
1048 config.p2p_config.excessive_message_size(),
1049 )))
1050 .service(routes);
1051
1052 let outbound_layer = ServiceBuilder::new()
1053 .layer(
1054 TraceLayer::new_for_client_and_server_errors()
1055 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1056 .on_failure(DefaultOnFailure::new().level(tracing::Level::DEBUG)),
1057 )
1058 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1059 Arc::new(outbound_network_metrics),
1060 config.p2p_config.excessive_message_size(),
1061 )))
1062 .into_inner();
1063
1064 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1065 anemo_config.max_frame_size = Some(1 << 30);
1068
1069 let mut quic_config = anemo_config.quic.unwrap_or_default();
1072 if quic_config.socket_send_buffer_size.is_none() {
1073 quic_config.socket_send_buffer_size = Some(20 << 20);
1074 }
1075 if quic_config.socket_receive_buffer_size.is_none() {
1076 quic_config.socket_receive_buffer_size = Some(20 << 20);
1077 }
1078 quic_config.allow_failed_socket_buffer_size_setting = true;
1079
1080 if quic_config.max_concurrent_bidi_streams.is_none() {
1083 quic_config.max_concurrent_bidi_streams = Some(500);
1084 }
1085 if quic_config.max_concurrent_uni_streams.is_none() {
1086 quic_config.max_concurrent_uni_streams = Some(500);
1087 }
1088 if quic_config.stream_receive_window.is_none() {
1089 quic_config.stream_receive_window = Some(100 << 20);
1090 }
1091 if quic_config.receive_window.is_none() {
1092 quic_config.receive_window = Some(200 << 20);
1093 }
1094 if quic_config.send_window.is_none() {
1095 quic_config.send_window = Some(200 << 20);
1096 }
1097 if quic_config.crypto_buffer_size.is_none() {
1098 quic_config.crypto_buffer_size = Some(1 << 20);
1099 }
1100 if quic_config.max_idle_timeout_ms.is_none() {
1101 quic_config.max_idle_timeout_ms = Some(30_000);
1102 }
1103 if quic_config.keep_alive_interval_ms.is_none() {
1104 quic_config.keep_alive_interval_ms = Some(5_000);
1105 }
1106 anemo_config.quic = Some(quic_config);
1107
1108 let server_name = format!("iota-{chain_identifier}");
1109 let network = Network::bind(config.p2p_config.listen_address)
1110 .server_name(&server_name)
1111 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1112 .config(anemo_config)
1113 .outbound_request_layer(outbound_layer)
1114 .start(service)?;
1115 info!(
1116 server_name = server_name,
1117 "P2p network started on {}",
1118 network.local_addr()
1119 );
1120
1121 network
1122 };
1123
1124 let discovery_handle =
1125 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1126 let state_sync_handle = state_sync.start(p2p_network.clone());
1127 let randomness_handle = randomness.start(p2p_network.clone());
1128
1129 Ok((
1130 p2p_network,
1131 discovery_handle,
1132 state_sync_handle,
1133 randomness_handle,
1134 ))
1135 }
1136
1137 async fn construct_validator_components(
1140 config: NodeConfig,
1141 state: Arc<AuthorityState>,
1142 committee: Arc<Committee>,
1143 epoch_store: Arc<AuthorityPerEpochStore>,
1144 checkpoint_store: Arc<CheckpointStore>,
1145 state_sync_handle: state_sync::Handle,
1146 randomness_handle: randomness::Handle,
1147 global_state_hasher: Weak<GlobalStateHasher>,
1148 backpressure_manager: Arc<BackpressureManager>,
1149 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1150 registry_service: &RegistryService,
1151 ) -> Result<ValidatorComponents> {
1152 let mut config_clone = config.clone();
1153 let consensus_config = config_clone
1154 .consensus_config
1155 .as_mut()
1156 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1157 let validator_registry = Registry::new();
1158 let validator_registry_id = registry_service.add(validator_registry.clone());
1159
1160 let client = Arc::new(UpdatableConsensusClient::new());
1161 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1162 &committee,
1163 consensus_config,
1164 state.name,
1165 connection_monitor_status.clone(),
1166 &validator_registry,
1167 client.clone(),
1168 checkpoint_store.clone(),
1169 ));
1170 let consensus_manager = ConsensusManager::new(
1171 &config,
1172 consensus_config,
1173 registry_service,
1174 &validator_registry,
1175 client,
1176 );
1177
1178 let consensus_store_pruner = ConsensusStorePruner::new(
1181 consensus_manager.get_storage_base_path(),
1182 consensus_config.db_retention_epochs(),
1183 consensus_config.db_pruner_period(),
1184 &validator_registry,
1185 );
1186
1187 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1188 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1189
1190 let validator_server_handle = Self::start_grpc_validator_service(
1191 &config,
1192 state.clone(),
1193 consensus_adapter.clone(),
1194 &validator_registry,
1195 )
1196 .await?;
1197
1198 let validator_overload_monitor_handle = if config
1201 .authority_overload_config
1202 .max_load_shedding_percentage
1203 > 0
1204 {
1205 let authority_state = Arc::downgrade(&state);
1206 let overload_config = config.authority_overload_config.clone();
1207 fail_point!("starting_overload_monitor");
1208 Some(spawn_monitored_task!(overload_monitor(
1209 authority_state,
1210 overload_config,
1211 )))
1212 } else {
1213 None
1214 };
1215
1216 Self::start_epoch_specific_validator_components(
1217 &config,
1218 state.clone(),
1219 consensus_adapter,
1220 checkpoint_store,
1221 epoch_store,
1222 state_sync_handle,
1223 randomness_handle,
1224 consensus_manager,
1225 consensus_store_pruner,
1226 global_state_hasher,
1227 backpressure_manager,
1228 validator_server_handle,
1229 validator_overload_monitor_handle,
1230 checkpoint_metrics,
1231 iota_tx_validator_metrics,
1232 validator_registry_id,
1233 )
1234 .await
1235 }
1236
1237 async fn start_epoch_specific_validator_components(
1240 config: &NodeConfig,
1241 state: Arc<AuthorityState>,
1242 consensus_adapter: Arc<ConsensusAdapter>,
1243 checkpoint_store: Arc<CheckpointStore>,
1244 epoch_store: Arc<AuthorityPerEpochStore>,
1245 state_sync_handle: state_sync::Handle,
1246 randomness_handle: randomness::Handle,
1247 consensus_manager: ConsensusManager,
1248 consensus_store_pruner: ConsensusStorePruner,
1249 global_state_hasher: Weak<GlobalStateHasher>,
1250 backpressure_manager: Arc<BackpressureManager>,
1251 validator_server_handle: SpawnOnce,
1252 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1253 checkpoint_metrics: Arc<CheckpointMetrics>,
1254 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1255 validator_registry_id: RegistryID,
1256 ) -> Result<ValidatorComponents> {
1257 let checkpoint_service = Self::build_checkpoint_service(
1258 config,
1259 consensus_adapter.clone(),
1260 checkpoint_store.clone(),
1261 epoch_store.clone(),
1262 state.clone(),
1263 state_sync_handle,
1264 global_state_hasher,
1265 checkpoint_metrics.clone(),
1266 );
1267
1268 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1273
1274 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1275
1276 let randomness_manager = RandomnessManager::try_new(
1277 Arc::downgrade(&epoch_store),
1278 Box::new(consensus_adapter.clone()),
1279 randomness_handle,
1280 config.authority_key_pair(),
1281 )
1282 .await;
1283 if let Some(randomness_manager) = randomness_manager {
1284 epoch_store
1285 .set_randomness_manager(randomness_manager)
1286 .await?;
1287 }
1288
1289 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1290 state.clone(),
1291 checkpoint_service.clone(),
1292 epoch_store.clone(),
1293 low_scoring_authorities,
1294 backpressure_manager,
1295 );
1296
1297 info!("Starting consensus manager");
1298
1299 consensus_manager
1300 .start(
1301 config,
1302 epoch_store.clone(),
1303 consensus_handler_initializer,
1304 IotaTxValidator::new(
1305 epoch_store.clone(),
1306 checkpoint_service.clone(),
1307 state.transaction_manager().clone(),
1308 iota_tx_validator_metrics.clone(),
1309 ),
1310 )
1311 .await;
1312 let consensus_replay_waiter = consensus_manager.replay_waiter();
1313
1314 info!("Spawning checkpoint service");
1315 let checkpoint_service_tasks = checkpoint_service.spawn(consensus_replay_waiter).await;
1316
1317 Ok(ValidatorComponents {
1318 validator_server_handle,
1319 validator_overload_monitor_handle,
1320 consensus_manager,
1321 consensus_store_pruner,
1322 consensus_adapter,
1323 checkpoint_service_tasks,
1324 checkpoint_metrics,
1325 iota_tx_validator_metrics,
1326 validator_registry_id,
1327 })
1328 }
1329
1330 fn build_checkpoint_service(
1337 config: &NodeConfig,
1338 consensus_adapter: Arc<ConsensusAdapter>,
1339 checkpoint_store: Arc<CheckpointStore>,
1340 epoch_store: Arc<AuthorityPerEpochStore>,
1341 state: Arc<AuthorityState>,
1342 state_sync_handle: state_sync::Handle,
1343 global_state_hasher: Weak<GlobalStateHasher>,
1344 checkpoint_metrics: Arc<CheckpointMetrics>,
1345 ) -> Arc<CheckpointService> {
1346 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1347 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1348
1349 debug!(
1350 "Starting checkpoint service with epoch start timestamp {}
1351 and epoch duration {}",
1352 epoch_start_timestamp_ms, epoch_duration_ms
1353 );
1354
1355 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1356 sender: consensus_adapter,
1357 signer: state.secret.clone(),
1358 authority: config.authority_public_key(),
1359 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1360 .checked_add(epoch_duration_ms)
1361 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1362 metrics: checkpoint_metrics.clone(),
1363 });
1364
1365 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1366 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1367 let max_checkpoint_size_bytes =
1368 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1369
1370 CheckpointService::build(
1371 state.clone(),
1372 checkpoint_store,
1373 epoch_store,
1374 state.get_transaction_cache_reader().clone(),
1375 global_state_hasher,
1376 checkpoint_output,
1377 Box::new(certified_checkpoint_output),
1378 checkpoint_metrics,
1379 max_tx_per_checkpoint,
1380 max_checkpoint_size_bytes,
1381 )
1382 }
1383
1384 fn construct_consensus_adapter(
1385 committee: &Committee,
1386 consensus_config: &ConsensusConfig,
1387 authority: AuthorityName,
1388 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1389 prometheus_registry: &Registry,
1390 consensus_client: Arc<dyn ConsensusClient>,
1391 checkpoint_store: Arc<CheckpointStore>,
1392 ) -> ConsensusAdapter {
1393 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1394 ConsensusAdapter::new(
1398 consensus_client,
1399 checkpoint_store,
1400 authority,
1401 connection_monitor_status,
1402 consensus_config.max_pending_transactions(),
1403 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1404 consensus_config.max_submit_position,
1405 consensus_config.submit_delay_step_override(),
1406 ca_metrics,
1407 )
1408 }
1409
1410 async fn start_grpc_validator_service(
1411 config: &NodeConfig,
1412 state: Arc<AuthorityState>,
1413 consensus_adapter: Arc<ConsensusAdapter>,
1414 prometheus_registry: &Registry,
1415 ) -> Result<SpawnOnce> {
1416 let validator_service = ValidatorService::new(
1417 state,
1418 consensus_adapter,
1419 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1420 config.policy_config.clone().map(|p| p.client_id_source),
1421 );
1422
1423 let mut server_conf = iota_network_stack::config::Config::new();
1424 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1425 server_conf.load_shed = config.grpc_load_shed;
1426 let server_builder =
1427 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1428 .add_service(ValidatorServer::new(validator_service.clone()))
1429 .add_service(ValidatorV2Server::new(validator_service.clone()))
1430 .add_service(ValidatorPeerServer::new(validator_service));
1431
1432 let tls_config = iota_tls::create_rustls_server_config(
1433 config.network_key_pair().copy().private(),
1434 IOTA_TLS_SERVER_NAME.to_string(),
1435 );
1436
1437 let network_address = config.network_address().clone();
1438
1439 let bind_future = async move {
1440 let server = server_builder
1441 .bind(&network_address, Some(tls_config))
1442 .await
1443 .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1444
1445 let local_addr = server.local_addr();
1446 info!("Listening to traffic on {local_addr}");
1447
1448 Ok(server)
1449 };
1450
1451 Ok(SpawnOnce::new(bind_future))
1452 }
1453
1454 async fn reexecute_pending_consensus_certs(
1472 epoch_store: &Arc<AuthorityPerEpochStore>,
1473 state: &Arc<AuthorityState>,
1474 ) {
1475 let mut pending_consensus_certificates = Vec::new();
1476 let mut additional_certs = Vec::new();
1477
1478 for tx in epoch_store.get_all_pending_consensus_transactions() {
1479 match tx.kind {
1480 ConsensusTransactionKind::CertifiedTransaction(tx)
1490 if !tx.contains_shared_object() =>
1491 {
1492 let tx = *tx;
1493 let tx = VerifiedExecutableTransaction::new_from_certificate(
1496 VerifiedCertificate::new_unchecked(tx),
1497 );
1498 if let Some(fx_digest) = epoch_store
1501 .get_signed_effects_digest(tx.digest())
1502 .expect("db error")
1503 {
1504 pending_consensus_certificates.push((tx, fx_digest));
1505 } else {
1506 additional_certs.push(tx);
1507 }
1508 }
1509 _ => (),
1510 }
1511 }
1512
1513 let digests = pending_consensus_certificates
1514 .iter()
1515 .map(|(tx, _)| *tx.digest())
1516 .collect::<Vec<_>>();
1517
1518 info!(
1519 "reexecuting {} pending consensus certificates: {:?}",
1520 digests.len(),
1521 digests
1522 );
1523
1524 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1525 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1526
1527 let timeout = if cfg!(msim) { 120 } else { 60 };
1533 if tokio::time::timeout(
1534 std::time::Duration::from_secs(timeout),
1535 state
1536 .get_transaction_cache_reader()
1537 .try_notify_read_executed_effects_digests(&digests),
1538 )
1539 .await
1540 .is_err()
1541 {
1542 if let Ok(executed_effects_digests) = state
1544 .get_transaction_cache_reader()
1545 .try_multi_get_executed_effects_digests(&digests)
1546 {
1547 let pending_digests = digests
1548 .iter()
1549 .zip(executed_effects_digests.iter())
1550 .filter_map(|(digest, executed_effects_digest)| {
1551 if executed_effects_digest.is_none() {
1552 Some(digest)
1553 } else {
1554 None
1555 }
1556 })
1557 .collect::<Vec<_>>();
1558 debug_fatal!(
1559 "Timed out waiting for effects digests to be executed: {:?}",
1560 pending_digests
1561 );
1562 } else {
1563 debug_fatal!(
1564 "Timed out waiting for effects digests to be executed, digests not found"
1565 );
1566 }
1567 }
1568 }
1569
1570 pub fn state(&self) -> Arc<AuthorityState> {
1571 self.state.clone()
1572 }
1573
1574 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1576 self.state.reference_gas_price_for_testing()
1577 }
1578
1579 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1580 self.state.committee_store().clone()
1581 }
1582
1583 pub fn clone_authority_aggregator(
1593 &self,
1594 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1595 self.transaction_orchestrator
1596 .as_ref()
1597 .map(|to| to.clone_authority_aggregator())
1598 }
1599
1600 pub fn transaction_orchestrator(
1601 &self,
1602 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1603 self.transaction_orchestrator.clone()
1604 }
1605
1606 pub fn subscribe_to_transaction_orchestrator_effects(
1607 &self,
1608 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1609 self.transaction_orchestrator
1610 .as_ref()
1611 .map(|to| to.subscribe_to_effects_queue())
1612 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1613 }
1614
1615 pub async fn monitor_reconfiguration(
1621 self: Arc<Self>,
1622 mut epoch_store: Arc<AuthorityPerEpochStore>,
1623 ) -> Result<()> {
1624 let checkpoint_executor_metrics =
1625 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1626
1627 loop {
1628 let mut hasher_guard = self.global_state_hasher.lock().await;
1629 let hasher = hasher_guard.take().unwrap();
1630 info!(
1631 "Creating checkpoint executor for epoch {}",
1632 epoch_store.epoch()
1633 );
1634
1635 let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1637 guard.as_ref().map(|handle| {
1638 let tx = handle.checkpoint_data_broadcaster().clone();
1639 Box::new(move |data: &CheckpointData| {
1640 tx.send_traced(data);
1641 }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1642 })
1643 } else {
1644 None
1645 };
1646
1647 let checkpoint_executor = CheckpointExecutor::new(
1648 epoch_store.clone(),
1649 self.checkpoint_store.clone(),
1650 self.state.clone(),
1651 hasher.clone(),
1652 self.backpressure_manager.clone(),
1653 self.config.checkpoint_executor_config.clone(),
1654 checkpoint_executor_metrics.clone(),
1655 data_sender,
1656 Some(self.checkpoint_progress_tracker.clone()),
1657 );
1658
1659 let run_with_range = self.config.run_with_range;
1660
1661 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1662
1663 self.metrics
1665 .current_protocol_version
1666 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1667
1668 if let Some(components) = &*self.validator_components.lock().await {
1670 tokio::time::sleep(Duration::from_millis(1)).await;
1672
1673 let config = cur_epoch_store.protocol_config();
1674 let binary_config = to_binary_config(config);
1675 let transaction = ConsensusTransaction::new_capability_notification_v1(
1676 AuthorityCapabilitiesV1::new(
1677 self.state.name,
1678 cur_epoch_store.get_chain(),
1679 self.config
1680 .supported_protocol_versions
1681 .expect("Supported versions should be populated")
1682 .truncate_below(config.version),
1684 self.state
1685 .get_available_system_packages(&binary_config)
1686 .await,
1687 ),
1688 );
1689 info!(?transaction, "submitting capabilities to consensus");
1690 components
1691 .consensus_adapter
1692 .submit(transaction, None, &cur_epoch_store)?;
1693 } else if self.state.is_active_validator(&cur_epoch_store)
1694 && cur_epoch_store
1695 .protocol_config()
1696 .track_non_committee_eligible_validators()
1697 {
1698 let epoch_store = cur_epoch_store.clone();
1702 let node_clone = self.clone();
1703 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1704 node_clone
1705 .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1706 .instrument(trace_span!(
1707 "send_signed_capability_notification_to_committee_with_retry"
1708 ))
1709 .await;
1710 }));
1711 }
1712
1713 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1714
1715 if stop_condition == StopReason::RunWithRangeCondition {
1716 IotaNode::shutdown(&self).await;
1717 self.shutdown_channel_tx
1718 .send(run_with_range)
1719 .expect("RunWithRangeCondition met but failed to send shutdown message");
1720 return Ok(());
1721 }
1722
1723 let latest_system_state = self
1725 .state
1726 .get_object_cache_reader()
1727 .try_get_iota_system_state_object_unsafe()
1728 .expect("Read IOTA System State object cannot fail");
1729
1730 #[cfg(msim)]
1731 if !self
1732 .sim_state
1733 .sim_safe_mode_expected
1734 .load(Ordering::Relaxed)
1735 {
1736 debug_assert!(!latest_system_state.safe_mode());
1737 }
1738
1739 #[cfg(not(msim))]
1740 debug_assert!(!latest_system_state.safe_mode());
1741
1742 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1743 if self.state.is_fullnode(&cur_epoch_store) {
1744 warn!(
1745 "Failed to send end of epoch notification to subscriber: {:?}",
1746 err
1747 );
1748 }
1749 }
1750
1751 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1752 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1753
1754 self.auth_agg.store(Arc::new(
1755 self.auth_agg
1756 .load()
1757 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1758 ));
1759
1760 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1761 let next_epoch = next_epoch_committee.epoch();
1762 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1763
1764 info!(
1765 next_epoch,
1766 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1767 );
1768
1769 fail_point_async!("reconfig_delay");
1770
1771 let authority_names_to_peer_ids =
1776 new_epoch_start_state.get_authority_names_to_peer_ids();
1777 self.connection_monitor_status
1778 .update_mapping_for_epoch(authority_names_to_peer_ids);
1779
1780 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1781
1782 send_trusted_peer_change(
1783 &self.config,
1784 &self.trusted_peer_change_tx,
1785 &new_epoch_start_state,
1786 );
1787
1788 let mut validator_components_lock_guard = self.validator_components.lock().await;
1789
1790 let new_epoch_store = self
1794 .reconfigure_state(
1795 &self.state,
1796 &cur_epoch_store,
1797 next_epoch_committee.clone(),
1798 new_epoch_start_state,
1799 hasher.clone(),
1800 )
1801 .await?;
1802
1803 let new_validator_components = if let Some(ValidatorComponents {
1804 validator_server_handle,
1805 validator_overload_monitor_handle,
1806 consensus_manager,
1807 consensus_store_pruner,
1808 consensus_adapter,
1809 mut checkpoint_service_tasks,
1810 checkpoint_metrics,
1811 iota_tx_validator_metrics,
1812 validator_registry_id,
1813 }) = validator_components_lock_guard.take()
1814 {
1815 info!("Reconfiguring the validator.");
1816 checkpoint_service_tasks.abort_all();
1821 while let Some(result) = checkpoint_service_tasks.join_next().await {
1822 if let Err(err) = result {
1823 if err.is_panic() {
1824 std::panic::resume_unwind(err.into_panic());
1825 }
1826 warn!("Error in checkpoint service task: {:?}", err);
1827 }
1828 }
1829 info!("Checkpoint service has shut down.");
1830
1831 consensus_manager.shutdown().await;
1832 info!("Consensus has shut down.");
1833
1834 info!("Epoch store finished reconfiguration.");
1835
1836 let global_state_hasher_metrics = Arc::into_inner(hasher)
1839 .expect("Object state hasher should have no other references at this point")
1840 .metrics();
1841 let new_hasher = Arc::new(GlobalStateHasher::new(
1842 self.state.get_global_state_hash_store().clone(),
1843 global_state_hasher_metrics,
1844 ));
1845 let weak_hasher = Arc::downgrade(&new_hasher);
1846 *hasher_guard = Some(new_hasher);
1847
1848 consensus_store_pruner.prune(next_epoch).await;
1849
1850 if self.state.is_committee_validator(&new_epoch_store) {
1851 Some(
1853 Self::start_epoch_specific_validator_components(
1854 &self.config,
1855 self.state.clone(),
1856 consensus_adapter,
1857 self.checkpoint_store.clone(),
1858 new_epoch_store.clone(),
1859 self.state_sync_handle.clone(),
1860 self.randomness_handle.clone(),
1861 consensus_manager,
1862 consensus_store_pruner,
1863 weak_hasher,
1864 self.backpressure_manager.clone(),
1865 validator_server_handle,
1866 validator_overload_monitor_handle,
1867 checkpoint_metrics,
1868 iota_tx_validator_metrics,
1869 validator_registry_id,
1870 )
1871 .await?,
1872 )
1873 } else {
1874 info!("This node is no longer a validator after reconfiguration");
1875 if self.registry_service.remove(validator_registry_id) {
1876 debug!("Removed validator metrics registry");
1877 } else {
1878 warn!("Failed to remove validator metrics registry");
1879 }
1880 validator_server_handle.shutdown();
1881 debug!("Validator grpc server shutdown triggered");
1882
1883 None
1884 }
1885 } else {
1886 let global_state_hasher_metrics = Arc::into_inner(hasher)
1889 .expect("Object state hasher should have no other references at this point")
1890 .metrics();
1891 let new_hasher = Arc::new(GlobalStateHasher::new(
1892 self.state.get_global_state_hash_store().clone(),
1893 global_state_hasher_metrics,
1894 ));
1895 let weak_hasher = Arc::downgrade(&new_hasher);
1896 *hasher_guard = Some(new_hasher);
1897
1898 if self.state.is_committee_validator(&new_epoch_store) {
1899 info!("Promoting the node from fullnode to validator, starting grpc server");
1900
1901 let mut components = Self::construct_validator_components(
1902 self.config.clone(),
1903 self.state.clone(),
1904 Arc::new(next_epoch_committee.clone()),
1905 new_epoch_store.clone(),
1906 self.checkpoint_store.clone(),
1907 self.state_sync_handle.clone(),
1908 self.randomness_handle.clone(),
1909 weak_hasher,
1910 self.backpressure_manager.clone(),
1911 self.connection_monitor_status.clone(),
1912 &self.registry_service,
1913 )
1914 .await?;
1915
1916 components.validator_server_handle =
1917 components.validator_server_handle.start().await;
1918
1919 Some(components)
1920 } else {
1921 None
1922 }
1923 };
1924 *validator_components_lock_guard = new_validator_components;
1925
1926 cur_epoch_store.release_db_handles();
1929
1930 drop(cur_epoch_store);
1934
1935 self.state.epoch_db_pruner().prune_old_epoch_dbs().await;
1939
1940 if cfg!(msim)
1941 && !matches!(
1942 self.config
1943 .authority_store_pruning_config
1944 .num_epochs_to_retain_for_checkpoints(),
1945 None | Some(u64::MAX) | Some(0)
1946 )
1947 {
1948 self.state
1949 .prune_checkpoints_for_eligible_epochs_for_testing(
1950 self.config.clone(),
1951 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1952 )
1953 .await?;
1954 }
1955
1956 epoch_store = new_epoch_store;
1957 info!("Reconfiguration finished");
1958 }
1959 }
1960
1961 async fn shutdown(&self) {
1962 if let Some(validator_components) = &*self.validator_components.lock().await {
1963 validator_components.consensus_manager.shutdown().await;
1964 }
1965
1966 if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
1968 info!("Shutting down gRPC server");
1969 if let Err(e) = grpc_handle.shutdown().await {
1970 warn!("Failed to gracefully shutdown gRPC server: {e}");
1971 }
1972 }
1973 }
1974
1975 async fn reconfigure_state(
1978 &self,
1979 state: &Arc<AuthorityState>,
1980 cur_epoch_store: &AuthorityPerEpochStore,
1981 next_epoch_committee: Committee,
1982 next_epoch_start_system_state: EpochStartSystemState,
1983 global_state_hasher: Arc<GlobalStateHasher>,
1984 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
1985 let next_epoch = next_epoch_committee.epoch();
1986
1987 let last_checkpoint = self
1988 .checkpoint_store
1989 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1990 .expect("Error loading last checkpoint for current epoch")
1991 .expect("Could not load last checkpoint for current epoch");
1992 let epoch_supply_change = last_checkpoint
1993 .end_of_epoch_data
1994 .as_ref()
1995 .ok_or_else(|| {
1996 IotaError::from("last checkpoint in epoch should contain end of epoch data")
1997 })?
1998 .epoch_supply_change;
1999
2000 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2001
2002 assert_eq!(
2003 Some(last_checkpoint_seq),
2004 self.checkpoint_store
2005 .get_highest_executed_checkpoint_seq_number()
2006 .expect("Error loading highest executed checkpoint sequence number")
2007 );
2008
2009 let epoch_start_configuration = EpochStartConfiguration::new(
2010 next_epoch_start_system_state,
2011 *last_checkpoint.digest(),
2012 state.get_object_store().as_ref(),
2013 EpochFlag::default_flags_for_new_epoch(&state.config),
2014 )
2015 .expect("EpochStartConfiguration construction cannot fail");
2016
2017 let new_epoch_store = self
2018 .state
2019 .reconfigure(
2020 cur_epoch_store,
2021 self.config.supported_protocol_versions.unwrap(),
2022 next_epoch_committee,
2023 epoch_start_configuration,
2024 global_state_hasher,
2025 &self.config.expensive_safety_check_config,
2026 epoch_supply_change,
2027 last_checkpoint_seq,
2028 )
2029 .await
2030 .expect("Reconfigure authority state cannot fail");
2031 info!(next_epoch, "Node State has been reconfigured");
2032 assert_eq!(next_epoch, new_epoch_store.epoch());
2033 self.state.get_reconfig_api().update_epoch_flags_metrics(
2034 cur_epoch_store.epoch_start_config().flags(),
2035 new_epoch_store.epoch_start_config().flags(),
2036 );
2037
2038 Ok(new_epoch_store)
2039 }
2040
2041 pub fn get_config(&self) -> &NodeConfig {
2042 &self.config
2043 }
2044
2045 async fn execute_transaction_immediately_at_zero_epoch(
2046 state: &Arc<AuthorityState>,
2047 epoch_store: &Arc<AuthorityPerEpochStore>,
2048 tx: &Transaction,
2049 span: tracing::Span,
2050 ) {
2051 let _guard = span.enter();
2052 let transaction =
2053 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2054 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2055 tx.data().clone(),
2056 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2057 ),
2058 );
2059 state
2060 .try_execute_immediately(&transaction, None, epoch_store)
2061 .unwrap();
2062 }
2063
2064 pub fn randomness_handle(&self) -> randomness::Handle {
2065 self.randomness_handle.clone()
2066 }
2067
2068 async fn send_signed_capability_notification_to_committee_with_retry(
2074 &self,
2075 epoch_store: &Arc<AuthorityPerEpochStore>,
2076 ) {
2077 const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2078 const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2079 const MAX_RETRY_INTERVAL_SECS: u64 = 300; let config = epoch_store.protocol_config();
2083 let binary_config = to_binary_config(config);
2084
2085 let capabilities = AuthorityCapabilitiesV1::new(
2087 self.state.name,
2088 epoch_store.get_chain(),
2089 self.config
2090 .supported_protocol_versions
2091 .expect("Supported versions should be populated")
2092 .truncate_below(config.version),
2093 self.state
2094 .get_available_system_packages(&binary_config)
2095 .await,
2096 );
2097
2098 let signature = AuthoritySignature::new_secure(
2100 &IntentMessage::new(
2101 Intent::iota_app(IntentScope::AuthorityCapabilities),
2102 &capabilities,
2103 ),
2104 &epoch_store.epoch(),
2105 self.config.authority_key_pair(),
2106 );
2107
2108 let request = HandleCapabilityNotificationRequestV1 {
2109 message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2110 };
2111
2112 let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2113
2114 loop {
2115 let auth_agg = self.auth_agg.load();
2116 match auth_agg
2117 .send_capability_notification_to_quorum(request.clone())
2118 .await
2119 {
2120 Ok(_) => {
2121 info!("Successfully sent capability notification to committee");
2122 break;
2123 }
2124 Err(err) => {
2125 match &err {
2126 AggregatorSendCapabilityNotificationError::RetryableNotification {
2127 errors,
2128 } => {
2129 warn!(
2130 "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2131 retry_interval, errors
2132 );
2133 }
2134 AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2135 errors,
2136 } => {
2137 error!(
2138 "Failed to send capability notification to committee (non-retryable error): {:?}",
2139 errors
2140 );
2141 break;
2142 }
2143 };
2144
2145 tokio::time::sleep(retry_interval).await;
2147
2148 retry_interval = std::cmp::min(
2150 retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2151 Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2152 );
2153 }
2154 }
2155 }
2156 }
2157}
2158
2159#[cfg(msim)]
2160impl IotaNode {
2161 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2162 self.sim_state.sim_node.id()
2163 }
2164
2165 pub fn set_safe_mode_expected(&self, new_value: bool) {
2166 info!("Setting safe mode expected to {}", new_value);
2167 self.sim_state
2168 .sim_safe_mode_expected
2169 .store(new_value, Ordering::Relaxed);
2170 }
2171}
2172
2173enum SpawnOnce {
2174 Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2176 #[allow(unused)]
2177 Started(iota_http::ServerHandle),
2178}
2179
2180impl SpawnOnce {
2181 pub fn new(
2182 future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2183 ) -> Self {
2184 Self::Unstarted(Mutex::new(Box::pin(future)))
2185 }
2186
2187 pub async fn start(self) -> Self {
2188 match self {
2189 Self::Unstarted(future) => {
2190 let server = future
2191 .into_inner()
2192 .await
2193 .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2194 let handle = server.handle().clone();
2195 tokio::spawn(async move {
2196 if let Err(err) = server.serve().await {
2197 info!("Server stopped: {err}");
2198 }
2199 info!("Server stopped");
2200 });
2201 Self::Started(handle)
2202 }
2203 Self::Started(_) => self,
2204 }
2205 }
2206
2207 pub fn shutdown(self) {
2208 if let SpawnOnce::Started(handle) = self {
2209 handle.trigger_shutdown();
2210 }
2211 }
2212}
2213
2214fn send_trusted_peer_change(
2217 config: &NodeConfig,
2218 sender: &watch::Sender<TrustedPeerChangeEvent>,
2219 new_epoch_start_state: &EpochStartSystemState,
2220) {
2221 let new_committee =
2222 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2223
2224 sender.send_modify(|event| {
2225 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2226 event.new_committee = new_committee;
2227 })
2228}
2229
2230fn build_kv_store(
2231 state: &Arc<AuthorityState>,
2232 config: &NodeConfig,
2233 registry: &Registry,
2234) -> Result<Arc<TransactionKeyValueStore>> {
2235 let metrics = KeyValueStoreMetrics::new(registry);
2236 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2237
2238 let base_url = &config.transaction_kv_store_read_config.base_url;
2239
2240 if base_url.is_empty() {
2241 info!("no http kv store url provided, using local db only");
2242 return Ok(Arc::new(db_store));
2243 }
2244
2245 base_url.parse::<url::Url>().tap_err(|e| {
2246 error!(
2247 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2248 base_url, e
2249 )
2250 })?;
2251
2252 let http_store = HttpKVStore::new_kv(
2253 base_url,
2254 config.transaction_kv_store_read_config.cache_size,
2255 metrics.clone(),
2256 )?;
2257 info!("using local key-value store with fallback to http key-value store");
2258 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2259 db_store,
2260 http_store,
2261 metrics,
2262 "json_rpc_fallback",
2263 )))
2264}
2265
2266async fn build_grpc_server(
2281 config: &NodeConfig,
2282 state: Arc<AuthorityState>,
2283 state_sync_store: RocksDbStore,
2284 executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2285 prometheus_registry: &Registry,
2286 server_version: ServerVersion,
2287) -> Result<Option<GrpcServerHandle>> {
2288 if config.consensus_config().is_some() || !config.enable_grpc_api {
2290 return Ok(None);
2291 }
2292
2293 let Some(grpc_config) = &config.grpc_api_config else {
2294 return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2295 };
2296
2297 let chain_id = state.get_chain_identifier();
2299
2300 let grpc_read_store = Arc::new(GrpcReadStore::new(state.clone(), state_sync_store));
2301
2302 let shutdown_token = CancellationToken::new();
2304
2305 let grpc_reader = Arc::new(GrpcReader::new(
2307 grpc_read_store,
2308 Some(server_version.to_string()),
2309 ));
2310
2311 let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2313 let client_id_source = config
2314 .policy_config
2315 .as_ref()
2316 .map(|p| p.client_id_source.clone());
2317
2318 let handle = start_grpc_server(
2319 grpc_reader,
2320 executor,
2321 grpc_config.clone(),
2322 shutdown_token,
2323 chain_id,
2324 Some(grpc_server_metrics),
2325 state.traffic_controller.clone(),
2326 client_id_source,
2327 )
2328 .await?;
2329
2330 Ok(Some(handle))
2331}
2332
2333pub async fn build_http_server(
2348 state: Arc<AuthorityState>,
2349 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2350 config: &NodeConfig,
2351 prometheus_registry: &Registry,
2352) -> Result<Option<iota_http::ServerHandle>> {
2353 if config.consensus_config().is_some() {
2355 return Ok(None);
2356 }
2357
2358 let mut router = axum::Router::new();
2359
2360 let json_rpc_router = {
2361 let traffic_controller = state.traffic_controller.clone();
2362 let mut server = JsonRpcServerBuilder::new(
2363 env!("CARGO_PKG_VERSION"),
2364 prometheus_registry,
2365 traffic_controller,
2366 config.policy_config.clone(),
2367 );
2368
2369 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2370
2371 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2372 server.register_module(ReadApi::new(
2373 state.clone(),
2374 kv_store.clone(),
2375 metrics.clone(),
2376 ))?;
2377 server.register_module(CoinReadApi::new(
2378 state.clone(),
2379 kv_store.clone(),
2380 metrics.clone(),
2381 )?)?;
2382
2383 if config.run_with_range.is_none() {
2386 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2387 }
2388 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2389
2390 if let Some(transaction_orchestrator) = transaction_orchestrator {
2391 server.register_module(TransactionExecutionApi::new(
2392 state.clone(),
2393 transaction_orchestrator.clone(),
2394 metrics.clone(),
2395 ))?;
2396 }
2397
2398 let iota_names_config = config
2399 .iota_names_config
2400 .clone()
2401 .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2402
2403 server.register_module(IndexerApi::new(
2404 state.clone(),
2405 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2406 kv_store,
2407 metrics,
2408 iota_names_config,
2409 config.indexer_max_subscriptions,
2410 ))?;
2411 server.register_module(MoveUtils::new(state.clone()))?;
2412
2413 let server_type = config.jsonrpc_server_type();
2414
2415 server.to_router(server_type).await?
2416 };
2417
2418 router = router.merge(json_rpc_router);
2419
2420 router = router
2421 .route("/health", axum::routing::get(health_check_handler))
2422 .route_layer(axum::Extension(state));
2423
2424 let layers = ServiceBuilder::new()
2425 .map_request(|mut request: axum::http::Request<_>| {
2426 if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2427 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2428 request.extensions_mut().insert(axum_connect_info);
2429 }
2430 request
2431 })
2432 .layer(axum::middleware::from_fn(server_timing_middleware));
2433
2434 router = router.layer(layers);
2435
2436 let handle = iota_http::Builder::new()
2437 .serve(&config.json_rpc_address, router)
2438 .map_err(|e| anyhow::anyhow!("{e}"))?;
2439 info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2440
2441 Ok(Some(handle))
2442}
2443
2444#[derive(Debug, serde::Serialize, serde::Deserialize)]
2445pub struct Threshold {
2446 pub threshold_seconds: Option<u32>,
2447}
2448
2449async fn health_check_handler(
2450 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2451 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2452) -> impl axum::response::IntoResponse {
2453 if let Some(threshold_seconds) = threshold_seconds {
2454 let summary = match state
2456 .get_checkpoint_store()
2457 .get_highest_executed_checkpoint()
2458 {
2459 Ok(Some(summary)) => summary,
2460 Ok(None) => {
2461 warn!("Highest executed checkpoint not found");
2462 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2463 }
2464 Err(err) => {
2465 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2466 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2467 }
2468 };
2469
2470 let latest_chain_time = summary.timestamp();
2472 let threshold =
2473 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2474
2475 if latest_chain_time < threshold {
2477 warn!(
2478 ?latest_chain_time,
2479 ?threshold,
2480 "failing health check due to checkpoint lag"
2481 );
2482 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2483 }
2484 }
2485 (axum::http::StatusCode::OK, "up")
2487}
2488
2489#[cfg(not(test))]
2490fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2491 protocol_config.max_transactions_per_checkpoint() as usize
2492}
2493
2494#[cfg(test)]
2495fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2496 2
2497}