1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::HashMap,
9 fmt,
10 future::Future,
11 path::PathBuf,
12 sync::{Arc, Weak},
13 time::Duration,
14};
15
16use anemo::Network;
17use anemo_tower::{
18 callback::CallbackLayer,
19 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
20};
21use anyhow::{Result, anyhow};
22use arc_swap::ArcSwap;
23use futures::future::BoxFuture;
24pub use handle::IotaNodeHandle;
25use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
26use iota_common::debug_fatal;
27use iota_config::{
28 ConsensusConfig, NodeConfig,
29 node::{DBCheckpointConfig, RunWithRange},
30 node_config_metrics::NodeConfigMetrics,
31 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
32};
33use iota_core::{
34 authority::{
35 AuthorityState, AuthorityStore, RandomnessRoundReceiver,
36 authority_per_epoch_store::AuthorityPerEpochStore,
37 authority_store_pruner::ObjectsCompactionFilter,
38 authority_store_tables::{
39 AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
40 },
41 backpressure::BackpressureManager,
42 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
43 },
44 authority_aggregator::{
45 AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
46 },
47 authority_client::NetworkAuthorityClient,
48 authority_server::{ValidatorService, ValidatorServiceMetrics},
49 checkpoint_progress_tracker::CheckpointProgressTracker,
50 checkpoints::{
51 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
52 SubmitCheckpointToConsensus,
53 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
54 },
55 connection_monitor::ConnectionMonitor,
56 consensus_adapter::{
57 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
58 ConsensusClient,
59 },
60 consensus_handler::ConsensusHandlerInitializer,
61 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
62 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
63 db_checkpoint_handler::DBCheckpointHandler,
64 epoch::{
65 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
66 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
67 reconfiguration::ReconfigurationInitiator,
68 },
69 execution_cache::build_execution_cache,
70 global_state_hasher::{GlobalStateHashMetrics, GlobalStateHasher},
71 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
72 jsonrpc_index::IndexStore,
73 module_cache_metrics::ResolverMetrics,
74 overload_monitor::overload_monitor,
75 safe_client::SafeClientMetricsBase,
76 signature_verifier::SignatureVerifierMetrics,
77 storage::{GrpcReadStore, RocksDbStore},
78 traffic_controller::metrics::TrafficControllerMetrics,
79 transaction_orchestrator::TransactionOrchestrator,
80 validator_tx_finalizer::ValidatorTxFinalizer,
81};
82use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
83use iota_json_rpc::{
84 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
85 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
86 transaction_builder_api::TransactionBuilderApi,
87 transaction_execution_api::TransactionExecutionApi,
88};
89use iota_json_rpc_api::JsonRpcMetrics;
90use iota_macros::{fail_point, fail_point_async, replay_log};
91use iota_metrics::{
92 RegistryID, RegistryService,
93 hardware_metrics::register_hardware_metrics,
94 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
95 server_timing_middleware, spawn_monitored_task,
96};
97use iota_names::config::IotaNamesConfig;
98use iota_network::{
99 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
100};
101use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
102use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
103use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
104use iota_snapshot::uploader::StateSnapshotUploader;
105use iota_storage::{
106 FileCompression, StorageFormat,
107 http_key_value_store::HttpKVStore,
108 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
109 key_value_store_metrics::KeyValueStoreMetrics,
110};
111use iota_types::{
112 base_types::{AuthorityName, ConciseableName, EpochId},
113 committee::Committee,
114 crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
115 digests::ChainIdentifier,
116 error::{IotaError, IotaResult},
117 executable_transaction::VerifiedExecutableTransaction,
118 execution_config_utils::to_binary_config,
119 full_checkpoint_content::CheckpointData,
120 iota_system_state::{
121 IotaSystemState, IotaSystemStateTrait,
122 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
123 },
124 messages_consensus::{
125 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
126 SignedAuthorityCapabilitiesV1,
127 },
128 messages_grpc::HandleCapabilityNotificationRequestV1,
129 quorum_driver_types::QuorumDriverEffectsQueueResult,
130 supported_protocol_versions::SupportedProtocolVersions,
131 transaction::{Transaction, VerifiedCertificate},
132};
133use prometheus::Registry;
134#[cfg(msim)]
135use simulator::*;
136use tap::tap::TapFallible;
137use tokio::{
138 sync::{Mutex, broadcast, mpsc, watch},
139 task::{JoinHandle, JoinSet},
140};
141use tokio_util::sync::CancellationToken;
142use tower::ServiceBuilder;
143use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
144use typed_store::{
145 DBMetrics,
146 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
147};
148
149use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
150
151pub mod admin;
152mod handle;
153pub mod metrics;
154
155pub struct ValidatorComponents {
156 validator_server_handle: SpawnOnce,
157 validator_overload_monitor_handle: Option<JoinHandle<()>>,
158 consensus_manager: ConsensusManager,
159 consensus_store_pruner: ConsensusStorePruner,
160 consensus_adapter: Arc<ConsensusAdapter>,
161 checkpoint_service_tasks: JoinSet<()>,
163 checkpoint_metrics: Arc<CheckpointMetrics>,
164 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
165 validator_registry_id: RegistryID,
166}
167
168#[cfg(msim)]
169mod simulator {
170 use std::sync::atomic::AtomicBool;
171
172 use super::*;
173
174 pub(super) struct SimState {
175 pub sim_node: iota_simulator::runtime::NodeHandle,
176 pub sim_safe_mode_expected: AtomicBool,
177 _leak_detector: iota_simulator::NodeLeakDetector,
178 }
179
180 impl Default for SimState {
181 fn default() -> Self {
182 Self {
183 sim_node: iota_simulator::runtime::NodeHandle::current(),
184 sim_safe_mode_expected: AtomicBool::new(false),
185 _leak_detector: iota_simulator::NodeLeakDetector::new(),
186 }
187 }
188 }
189}
190
191#[derive(Clone)]
192pub struct ServerVersion {
193 pub bin: &'static str,
194 pub version: &'static str,
195}
196
197impl ServerVersion {
198 pub fn new(bin: &'static str, version: &'static str) -> Self {
199 Self { bin, version }
200 }
201}
202
203impl std::fmt::Display for ServerVersion {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 f.write_str(self.bin)?;
206 f.write_str("/")?;
207 f.write_str(self.version)
208 }
209}
210
211pub struct IotaNode {
212 config: NodeConfig,
213 validator_components: Mutex<Option<ValidatorComponents>>,
214 _http_server: Option<iota_http::ServerHandle>,
216 state: Arc<AuthorityState>,
217 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
218 registry_service: RegistryService,
219 metrics: Arc<IotaNodeMetrics>,
220
221 _discovery: discovery::Handle,
222 state_sync_handle: state_sync::Handle,
223 randomness_handle: randomness::Handle,
224 checkpoint_store: Arc<CheckpointStore>,
225 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
226 connection_monitor_status: Arc<ConnectionMonitorStatus>,
227
228 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
230
231 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
234
235 backpressure_manager: Arc<BackpressureManager>,
236
237 checkpoint_progress_tracker: Arc<CheckpointProgressTracker>,
238
239 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
240
241 #[cfg(msim)]
242 sim_state: SimState,
243
244 _state_archive_handle: Option<broadcast::Sender<()>>,
245
246 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
247 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
249
250 grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
252
253 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
259}
260
261impl fmt::Debug for IotaNode {
262 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
263 f.debug_struct("IotaNode")
264 .field("name", &self.state.name.concise())
265 .finish()
266 }
267}
268
269impl IotaNode {
270 pub async fn start(
271 config: NodeConfig,
272 registry_service: RegistryService,
273 ) -> Result<Arc<IotaNode>> {
274 Self::start_async(
275 config,
276 registry_service,
277 ServerVersion::new("iota-node", "unknown"),
278 )
279 .await
280 }
281
282 pub async fn start_async(
283 config: NodeConfig,
284 registry_service: RegistryService,
285 server_version: ServerVersion,
286 ) -> Result<Arc<IotaNode>> {
287 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
288 let mut config = config.clone();
289 if config.supported_protocol_versions.is_none() {
290 info!(
291 "populating config.supported_protocol_versions with default {:?}",
292 SupportedProtocolVersions::SYSTEM_DEFAULT
293 );
294 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
295 }
296
297 let run_with_range = config.run_with_range;
298 let is_validator = config.consensus_config().is_some();
299 let is_full_node = !is_validator;
300 let prometheus_registry = registry_service.default_registry();
301
302 info!(node =? config.authority_public_key(),
303 "Initializing iota-node listening on {}", config.network_address
304 );
305
306 let genesis = config.genesis()?.clone();
307
308 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
309 info!("IOTA chain identifier: {chain_identifier}");
310
311 let db_corrupted_path = &config.db_path().join("status");
313 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
314 panic!("Failed to check database corruption: {err}");
315 }
316
317 DBMetrics::init(&prometheus_registry);
319
320 iota_metrics::init_metrics(&prometheus_registry);
322 #[cfg(not(msim))]
325 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
326
327 register_hardware_metrics(®istry_service, &config.db_path)
329 .expect("Failed registering hardware metrics");
330 prometheus_registry
332 .register(iota_metrics::uptime_metric(
333 if is_validator {
334 "validator"
335 } else {
336 "fullnode"
337 },
338 server_version.version,
339 &chain_identifier.to_string(),
340 ))
341 .expect("Failed registering uptime metric");
342
343 let migration_tx_data = if genesis.contains_migrations() {
346 Some(config.load_migration_tx_data()?)
349 } else {
350 None
351 };
352
353 let secret = Arc::pin(config.authority_key_pair().copy());
354 let genesis_committee = genesis.committee()?;
355 let committee_store = Arc::new(CommitteeStore::new(
356 config.db_path().join("epochs"),
357 &genesis_committee,
358 None,
359 ));
360
361 let mut pruner_db = None;
362 if config
363 .authority_store_pruning_config
364 .enable_compaction_filter
365 {
366 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
367 &config.db_path().join("store"),
368 )));
369 }
370 let compaction_filter = pruner_db
371 .clone()
372 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
373
374 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
376 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
377 enable_write_stall,
378 compaction_filter,
379 };
380 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
381 &config.db_path().join("store"),
382 Some(perpetual_tables_options),
383 ));
384 let is_genesis = perpetual_tables
385 .database_is_empty()
386 .expect("Database read should not fail at init.");
387 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
388 let backpressure_manager =
389 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
390
391 let perpetual_tables_for_progress = perpetual_tables.clone();
392 let store = AuthorityStore::open(
393 perpetual_tables,
394 &genesis,
395 &config,
396 &prometheus_registry,
397 migration_tx_data.as_ref(),
398 )
399 .await?;
400
401 let cur_epoch = store.get_recovery_epoch_at_restart()?;
402 let committee = committee_store
403 .get_committee(&cur_epoch)?
404 .expect("Committee of the current epoch must exist");
405 let epoch_start_configuration = store
406 .get_epoch_start_configuration()?
407 .expect("EpochStartConfiguration of the current epoch must exist");
408 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
409 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
410
411 let cache_traits = build_execution_cache(
412 &config.execution_cache_config,
413 &epoch_start_configuration,
414 &prometheus_registry,
415 &store,
416 backpressure_manager.clone(),
417 );
418
419 let auth_agg = {
420 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
421 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
422 Arc::new(ArcSwap::new(Arc::new(
423 AuthorityAggregator::new_from_epoch_start_state(
424 epoch_start_configuration.epoch_start_state(),
425 &committee_store,
426 safe_client_metrics_base,
427 auth_agg_metrics,
428 ),
429 )))
430 };
431
432 let chain = match config.chain_override_for_testing {
433 Some(chain) => chain,
434 None => chain_identifier.chain(),
435 };
436
437 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
438 let epoch_store = AuthorityPerEpochStore::new(
439 config.authority_public_key(),
440 committee.clone(),
441 &config.db_path().join("store"),
442 Some(epoch_options.options),
443 EpochMetrics::new(®istry_service.default_registry()),
444 epoch_start_configuration,
445 cache_traits.backing_package_store.clone(),
446 cache_metrics,
447 signature_verifier_metrics,
448 &config.expensive_safety_check_config,
449 (chain_identifier, chain),
450 checkpoint_store
451 .get_highest_executed_checkpoint_seq_number()
452 .expect("checkpoint store read cannot fail")
453 .unwrap_or(0),
454 )?;
455
456 info!("created epoch store");
457
458 replay_log!(
459 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
460 epoch_store.epoch(),
461 epoch_store.protocol_config()
462 );
463
464 if is_genesis {
466 info!("checking IOTA conservation at genesis");
467 cache_traits
472 .reconfig_api
473 .try_expensive_check_iota_conservation(&epoch_store, None)
474 .expect("IOTA conservation check cannot fail at genesis");
475 }
476
477 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
478 let default_buffer_stake = epoch_store
479 .protocol_config()
480 .buffer_stake_for_protocol_upgrade_bps();
481 if effective_buffer_stake != default_buffer_stake {
482 warn!(
483 ?effective_buffer_stake,
484 ?default_buffer_stake,
485 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
486 );
487 }
488
489 checkpoint_store.insert_genesis_checkpoint(
490 genesis.checkpoint(),
491 genesis.checkpoint_contents().clone(),
492 &epoch_store,
493 );
494
495 unmark_db_corruption(db_corrupted_path)?;
497
498 info!("creating state sync store");
499 let state_sync_store = RocksDbStore::new(
500 cache_traits.clone(),
501 committee_store.clone(),
502 checkpoint_store.clone(),
503 );
504
505 let index_store = if is_full_node && config.enable_index_processing {
506 info!("creating index store");
507 Some(Arc::new(IndexStore::new(
508 config.db_path().join("indexes"),
509 &prometheus_registry,
510 epoch_store
511 .protocol_config()
512 .max_move_identifier_len_as_option(),
513 )))
514 } else {
515 None
516 };
517
518 let grpc_indexes_store = if is_full_node && config.enable_grpc_api {
519 Some(Arc::new(
520 GrpcIndexesStore::new(
521 config.db_path().join(GRPC_INDEXES_DIR),
522 Arc::clone(&store),
523 &checkpoint_store,
524 )
525 .await,
526 ))
527 } else {
528 None
529 };
530
531 info!("creating archive reader");
532 let archive_readers =
537 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
538 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
539 let (randomness_tx, randomness_rx) = mpsc::channel(
540 config
541 .p2p_config
542 .randomness
543 .clone()
544 .unwrap_or_default()
545 .mailbox_capacity(),
546 );
547 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
548 Self::create_p2p_network(
549 &config,
550 state_sync_store.clone(),
551 chain_identifier,
552 trusted_peer_change_rx,
553 archive_readers.clone(),
554 randomness_tx,
555 &prometheus_registry,
556 )?;
557
558 send_trusted_peer_change(
561 &config,
562 &trusted_peer_change_tx,
563 epoch_store.epoch_start_state(),
564 );
565
566 info!("start state archival");
567 let state_archive_handle =
569 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
570 .await?;
571
572 info!("start snapshot upload");
573 let state_snapshot_handle =
575 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
576
577 let checkpoint_progress_tracker = Arc::new(CheckpointProgressTracker::new());
578
579 info!("start db checkpoint");
581 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
582 &config,
583 &prometheus_registry,
584 state_snapshot_handle.is_some(),
585 Some(checkpoint_progress_tracker.clone()),
586 )?;
587
588 let mut genesis_objects = genesis.objects().to_vec();
589 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
590 genesis_objects.extend(migration_tx_data.get_objects());
591 }
592
593 let authority_name = config.authority_public_key();
594 let validator_tx_finalizer =
595 config
596 .enable_validator_tx_finalizer
597 .then_some(Arc::new(ValidatorTxFinalizer::new(
598 auth_agg.clone(),
599 authority_name,
600 &prometheus_registry,
601 )));
602
603 info!("create authority state");
604 let state = AuthorityState::new(
605 authority_name,
606 secret,
607 config.supported_protocol_versions.unwrap(),
608 store.clone(),
609 cache_traits.clone(),
610 epoch_store.clone(),
611 committee_store.clone(),
612 index_store.clone(),
613 grpc_indexes_store,
614 checkpoint_store.clone(),
615 &prometheus_registry,
616 &genesis_objects,
617 &db_checkpoint_config,
618 config.clone(),
619 archive_readers,
620 validator_tx_finalizer,
621 chain_identifier,
622 pruner_db,
623 Some(checkpoint_progress_tracker.clone()),
624 )
625 .await;
626
627 if epoch_store.epoch() == 0 {
629 let genesis_tx = &genesis.transaction();
630 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
631 Self::execute_transaction_immediately_at_zero_epoch(
633 &state,
634 &epoch_store,
635 genesis_tx,
636 span,
637 )
638 .await;
639
640 if let Some(migration_tx_data) = migration_tx_data {
642 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
643 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
644 Self::execute_transaction_immediately_at_zero_epoch(
645 &state,
646 &epoch_store,
647 tx,
648 span,
649 )
650 .await;
651 }
652 }
653 }
654
655 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
658
659 if config
660 .expensive_safety_check_config
661 .enable_secondary_index_checks()
662 {
663 if let Some(indexes) = state.indexes.clone() {
664 iota_core::verify_indexes::verify_indexes(
665 state.get_global_state_hash_store().as_ref(),
666 indexes,
667 )
668 .expect("secondary indexes are inconsistent");
669 }
670 }
671
672 let (end_of_epoch_channel, end_of_epoch_receiver) =
673 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
674
675 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
676 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
677 auth_agg.load_full(),
678 state.clone(),
679 end_of_epoch_receiver,
680 &config.db_path(),
681 &prometheus_registry,
682 )))
683 } else {
684 None
685 };
686
687 let http_server = build_http_server(
688 state.clone(),
689 &transaction_orchestrator.clone(),
690 &config,
691 &prometheus_registry,
692 )
693 .await?;
694
695 let global_state_hasher = Arc::new(GlobalStateHasher::new(
696 cache_traits.global_state_hash_store.clone(),
697 GlobalStateHashMetrics::new(&prometheus_registry),
698 ));
699
700 let authority_names_to_peer_ids = epoch_store
701 .epoch_start_state()
702 .get_authority_names_to_peer_ids();
703
704 let network_connection_metrics =
705 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
706
707 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
708
709 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
710 p2p_network.downgrade(),
711 network_connection_metrics,
712 HashMap::new(),
713 None,
714 );
715
716 let connection_monitor_status = ConnectionMonitorStatus {
717 connection_statuses,
718 authority_names_to_peer_ids,
719 };
720
721 let connection_monitor_status = Arc::new(connection_monitor_status);
722 let iota_node_metrics =
723 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
724
725 iota_node_metrics
726 .binary_max_protocol_version
727 .set(ProtocolVersion::MAX.as_u64() as i64);
728 iota_node_metrics
729 .configured_max_protocol_version
730 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
731
732 let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
736 transaction_orchestrator
737 .clone()
738 .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
739
740 let grpc_server_handle = build_grpc_server(
741 &config,
742 state.clone(),
743 state_sync_store.clone(),
744 executor,
745 &prometheus_registry,
746 server_version,
747 )
748 .await?;
749
750 let validator_components = if state.is_committee_validator(&epoch_store) {
751 let (components, _) = futures::join!(
752 Self::construct_validator_components(
753 config.clone(),
754 state.clone(),
755 committee,
756 epoch_store.clone(),
757 checkpoint_store.clone(),
758 state_sync_handle.clone(),
759 randomness_handle.clone(),
760 Arc::downgrade(&global_state_hasher),
761 backpressure_manager.clone(),
762 connection_monitor_status.clone(),
763 ®istry_service,
764 ),
765 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
766 );
767 let mut components = components?;
768
769 components.consensus_adapter.submit_recovered(&epoch_store);
770
771 components.validator_server_handle = components.validator_server_handle.start().await;
773
774 Some(components)
775 } else {
776 None
777 };
778
779 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
781
782 let node = Self {
783 config,
784 validator_components: Mutex::new(validator_components),
785 _http_server: http_server,
786 state,
787 transaction_orchestrator,
788 registry_service,
789 metrics: iota_node_metrics,
790
791 _discovery: discovery_handle,
792 state_sync_handle,
793 randomness_handle,
794 checkpoint_store,
795 global_state_hasher: Mutex::new(Some(global_state_hasher)),
796 end_of_epoch_channel,
797 connection_monitor_status,
798 trusted_peer_change_tx,
799 backpressure_manager,
800 checkpoint_progress_tracker: checkpoint_progress_tracker.clone(),
801
802 _db_checkpoint_handle: db_checkpoint_handle,
803
804 #[cfg(msim)]
805 sim_state: Default::default(),
806
807 _state_archive_handle: state_archive_handle,
808 _state_snapshot_uploader_handle: state_snapshot_handle,
809 shutdown_channel_tx: shutdown_channel,
810
811 grpc_server_handle: Mutex::new(grpc_server_handle),
812
813 auth_agg,
814 };
815
816 info!("IotaNode started!");
817 let node = Arc::new(node);
818 let node_copy = node.clone();
819 spawn_monitored_task!(async move {
820 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
821 if let Err(error) = result {
822 warn!("Reconfiguration finished with error {:?}", error);
823 }
824 });
825
826 node.checkpoint_progress_tracker
827 .spawn_logging_task(node.checkpoint_store.clone(), perpetual_tables_for_progress);
828
829 Ok(node)
830 }
831
832 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
833 self.end_of_epoch_channel.subscribe()
834 }
835
836 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
837 self.shutdown_channel_tx.subscribe()
838 }
839
840 pub fn current_epoch_for_testing(&self) -> EpochId {
841 self.state.current_epoch_for_testing()
842 }
843
844 pub fn db_checkpoint_path(&self) -> PathBuf {
845 self.config.db_checkpoint_path()
846 }
847
848 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
850 info!("close_epoch (current epoch = {})", epoch_store.epoch());
851 self.validator_components
852 .lock()
853 .await
854 .as_ref()
855 .ok_or_else(|| IotaError::from("Node is not a validator"))?
856 .consensus_adapter
857 .close_epoch(epoch_store);
858 Ok(())
859 }
860
861 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
862 self.state
863 .clear_override_protocol_upgrade_buffer_stake(epoch)
864 }
865
866 pub fn set_override_protocol_upgrade_buffer_stake(
867 &self,
868 epoch: EpochId,
869 buffer_stake_bps: u64,
870 ) -> IotaResult {
871 self.state
872 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
873 }
874
875 pub async fn close_epoch_for_testing(&self) -> IotaResult {
878 let epoch_store = self.state.epoch_store_for_testing();
879 self.close_epoch(&epoch_store).await
880 }
881
882 async fn start_state_archival(
883 config: &NodeConfig,
884 prometheus_registry: &Registry,
885 state_sync_store: RocksDbStore,
886 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
887 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
888 let local_store_config = ObjectStoreConfig {
889 object_store: Some(ObjectStoreType::File),
890 directory: Some(config.archive_path()),
891 ..Default::default()
892 };
893 let archive_writer = ArchiveWriter::new(
894 local_store_config,
895 remote_store_config.clone(),
896 FileCompression::Zstd,
897 StorageFormat::Blob,
898 Duration::from_secs(600),
899 256 * 1024 * 1024,
900 prometheus_registry,
901 )
902 .await?;
903 Ok(Some(archive_writer.start(state_sync_store).await?))
904 } else {
905 Ok(None)
906 }
907 }
908
909 fn start_state_snapshot(
912 config: &NodeConfig,
913 prometheus_registry: &Registry,
914 checkpoint_store: Arc<CheckpointStore>,
915 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
916 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
917 let snapshot_uploader = StateSnapshotUploader::new(
918 &config.db_checkpoint_path(),
919 &config.snapshot_path(),
920 remote_store_config.clone(),
921 60,
922 prometheus_registry,
923 checkpoint_store,
924 )?;
925 Ok(Some(snapshot_uploader.start()))
926 } else {
927 Ok(None)
928 }
929 }
930
931 fn start_db_checkpoint(
932 config: &NodeConfig,
933 prometheus_registry: &Registry,
934 state_snapshot_enabled: bool,
935 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
936 ) -> Result<(
937 DBCheckpointConfig,
938 Option<tokio::sync::broadcast::Sender<()>>,
939 )> {
940 let checkpoint_path = Some(
941 config
942 .db_checkpoint_config
943 .checkpoint_path
944 .clone()
945 .unwrap_or_else(|| config.db_checkpoint_path()),
946 );
947 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
948 DBCheckpointConfig {
949 checkpoint_path,
950 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
951 true
952 } else {
953 config
954 .db_checkpoint_config
955 .perform_db_checkpoints_at_epoch_end
956 },
957 ..config.db_checkpoint_config.clone()
958 }
959 } else {
960 config.db_checkpoint_config.clone()
961 };
962
963 match (
964 db_checkpoint_config.object_store_config.as_ref(),
965 state_snapshot_enabled,
966 ) {
967 (None, false) => Ok((db_checkpoint_config, None)),
972 (_, _) => {
973 let handler = DBCheckpointHandler::new(
974 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
975 db_checkpoint_config.object_store_config.as_ref(),
976 60,
977 db_checkpoint_config
978 .prune_and_compact_before_upload
979 .unwrap_or(true),
980 config.authority_store_pruning_config.clone(),
981 prometheus_registry,
982 state_snapshot_enabled,
983 checkpoint_progress_tracker,
984 )?;
985 Ok((
986 db_checkpoint_config,
987 Some(DBCheckpointHandler::start(handler)),
988 ))
989 }
990 }
991 }
992
993 fn create_p2p_network(
994 config: &NodeConfig,
995 state_sync_store: RocksDbStore,
996 chain_identifier: ChainIdentifier,
997 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
998 archive_readers: ArchiveReaderBalancer,
999 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1000 prometheus_registry: &Registry,
1001 ) -> Result<(
1002 Network,
1003 discovery::Handle,
1004 state_sync::Handle,
1005 randomness::Handle,
1006 )> {
1007 let (state_sync, state_sync_server) = state_sync::Builder::new()
1008 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1009 .store(state_sync_store)
1010 .archive_readers(archive_readers)
1011 .with_metrics(prometheus_registry)
1012 .build();
1013
1014 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1015 .config(config.p2p_config.clone())
1016 .build();
1017
1018 let (randomness, randomness_router) =
1019 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1020 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1021 .with_metrics(prometheus_registry)
1022 .build();
1023
1024 let p2p_network = {
1025 let routes = anemo::Router::new()
1026 .add_rpc_service(discovery_server)
1027 .add_rpc_service(state_sync_server);
1028 let routes = routes.merge(randomness_router);
1029
1030 let inbound_network_metrics =
1031 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1032 let outbound_network_metrics =
1033 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1034
1035 let service = ServiceBuilder::new()
1036 .layer(
1037 TraceLayer::new_for_server_errors()
1038 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1039 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1040 )
1041 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1042 Arc::new(inbound_network_metrics),
1043 config.p2p_config.excessive_message_size(),
1044 )))
1045 .service(routes);
1046
1047 let outbound_layer = ServiceBuilder::new()
1048 .layer(
1049 TraceLayer::new_for_client_and_server_errors()
1050 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1051 .on_failure(DefaultOnFailure::new().level(tracing::Level::DEBUG)),
1052 )
1053 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1054 Arc::new(outbound_network_metrics),
1055 config.p2p_config.excessive_message_size(),
1056 )))
1057 .into_inner();
1058
1059 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1060 anemo_config.max_frame_size = Some(1 << 30);
1063
1064 let mut quic_config = anemo_config.quic.unwrap_or_default();
1067 if quic_config.socket_send_buffer_size.is_none() {
1068 quic_config.socket_send_buffer_size = Some(20 << 20);
1069 }
1070 if quic_config.socket_receive_buffer_size.is_none() {
1071 quic_config.socket_receive_buffer_size = Some(20 << 20);
1072 }
1073 quic_config.allow_failed_socket_buffer_size_setting = true;
1074
1075 if quic_config.max_concurrent_bidi_streams.is_none() {
1078 quic_config.max_concurrent_bidi_streams = Some(500);
1079 }
1080 if quic_config.max_concurrent_uni_streams.is_none() {
1081 quic_config.max_concurrent_uni_streams = Some(500);
1082 }
1083 if quic_config.stream_receive_window.is_none() {
1084 quic_config.stream_receive_window = Some(100 << 20);
1085 }
1086 if quic_config.receive_window.is_none() {
1087 quic_config.receive_window = Some(200 << 20);
1088 }
1089 if quic_config.send_window.is_none() {
1090 quic_config.send_window = Some(200 << 20);
1091 }
1092 if quic_config.crypto_buffer_size.is_none() {
1093 quic_config.crypto_buffer_size = Some(1 << 20);
1094 }
1095 if quic_config.max_idle_timeout_ms.is_none() {
1096 quic_config.max_idle_timeout_ms = Some(30_000);
1097 }
1098 if quic_config.keep_alive_interval_ms.is_none() {
1099 quic_config.keep_alive_interval_ms = Some(5_000);
1100 }
1101 anemo_config.quic = Some(quic_config);
1102
1103 let server_name = format!("iota-{chain_identifier}");
1104 let network = Network::bind(config.p2p_config.listen_address)
1105 .server_name(&server_name)
1106 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1107 .config(anemo_config)
1108 .outbound_request_layer(outbound_layer)
1109 .start(service)?;
1110 info!(
1111 server_name = server_name,
1112 "P2p network started on {}",
1113 network.local_addr()
1114 );
1115
1116 network
1117 };
1118
1119 let discovery_handle =
1120 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1121 let state_sync_handle = state_sync.start(p2p_network.clone());
1122 let randomness_handle = randomness.start(p2p_network.clone());
1123
1124 Ok((
1125 p2p_network,
1126 discovery_handle,
1127 state_sync_handle,
1128 randomness_handle,
1129 ))
1130 }
1131
1132 async fn construct_validator_components(
1135 config: NodeConfig,
1136 state: Arc<AuthorityState>,
1137 committee: Arc<Committee>,
1138 epoch_store: Arc<AuthorityPerEpochStore>,
1139 checkpoint_store: Arc<CheckpointStore>,
1140 state_sync_handle: state_sync::Handle,
1141 randomness_handle: randomness::Handle,
1142 global_state_hasher: Weak<GlobalStateHasher>,
1143 backpressure_manager: Arc<BackpressureManager>,
1144 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1145 registry_service: &RegistryService,
1146 ) -> Result<ValidatorComponents> {
1147 let mut config_clone = config.clone();
1148 let consensus_config = config_clone
1149 .consensus_config
1150 .as_mut()
1151 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1152 let validator_registry = Registry::new();
1153 let validator_registry_id = registry_service.add(validator_registry.clone());
1154
1155 let client = Arc::new(UpdatableConsensusClient::new());
1156 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1157 &committee,
1158 consensus_config,
1159 state.name,
1160 connection_monitor_status.clone(),
1161 &validator_registry,
1162 client.clone(),
1163 checkpoint_store.clone(),
1164 ));
1165 let consensus_manager = ConsensusManager::new(
1166 &config,
1167 consensus_config,
1168 registry_service,
1169 &validator_registry,
1170 client,
1171 );
1172
1173 let consensus_store_pruner = ConsensusStorePruner::new(
1176 consensus_manager.get_storage_base_path(),
1177 consensus_config.db_retention_epochs(),
1178 consensus_config.db_pruner_period(),
1179 &validator_registry,
1180 );
1181
1182 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1183 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1184
1185 let validator_server_handle = Self::start_grpc_validator_service(
1186 &config,
1187 state.clone(),
1188 consensus_adapter.clone(),
1189 &validator_registry,
1190 )
1191 .await?;
1192
1193 let validator_overload_monitor_handle = if config
1196 .authority_overload_config
1197 .max_load_shedding_percentage
1198 > 0
1199 {
1200 let authority_state = Arc::downgrade(&state);
1201 let overload_config = config.authority_overload_config.clone();
1202 fail_point!("starting_overload_monitor");
1203 Some(spawn_monitored_task!(overload_monitor(
1204 authority_state,
1205 overload_config,
1206 )))
1207 } else {
1208 None
1209 };
1210
1211 Self::start_epoch_specific_validator_components(
1212 &config,
1213 state.clone(),
1214 consensus_adapter,
1215 checkpoint_store,
1216 epoch_store,
1217 state_sync_handle,
1218 randomness_handle,
1219 consensus_manager,
1220 consensus_store_pruner,
1221 global_state_hasher,
1222 backpressure_manager,
1223 validator_server_handle,
1224 validator_overload_monitor_handle,
1225 checkpoint_metrics,
1226 iota_tx_validator_metrics,
1227 validator_registry_id,
1228 )
1229 .await
1230 }
1231
1232 async fn start_epoch_specific_validator_components(
1235 config: &NodeConfig,
1236 state: Arc<AuthorityState>,
1237 consensus_adapter: Arc<ConsensusAdapter>,
1238 checkpoint_store: Arc<CheckpointStore>,
1239 epoch_store: Arc<AuthorityPerEpochStore>,
1240 state_sync_handle: state_sync::Handle,
1241 randomness_handle: randomness::Handle,
1242 consensus_manager: ConsensusManager,
1243 consensus_store_pruner: ConsensusStorePruner,
1244 global_state_hasher: Weak<GlobalStateHasher>,
1245 backpressure_manager: Arc<BackpressureManager>,
1246 validator_server_handle: SpawnOnce,
1247 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1248 checkpoint_metrics: Arc<CheckpointMetrics>,
1249 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1250 validator_registry_id: RegistryID,
1251 ) -> Result<ValidatorComponents> {
1252 let checkpoint_service = Self::build_checkpoint_service(
1253 config,
1254 consensus_adapter.clone(),
1255 checkpoint_store.clone(),
1256 epoch_store.clone(),
1257 state.clone(),
1258 state_sync_handle,
1259 global_state_hasher,
1260 checkpoint_metrics.clone(),
1261 );
1262
1263 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1268
1269 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1270
1271 let randomness_manager = RandomnessManager::try_new(
1272 Arc::downgrade(&epoch_store),
1273 Box::new(consensus_adapter.clone()),
1274 randomness_handle,
1275 config.authority_key_pair(),
1276 )
1277 .await;
1278 if let Some(randomness_manager) = randomness_manager {
1279 epoch_store
1280 .set_randomness_manager(randomness_manager)
1281 .await?;
1282 }
1283
1284 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1285 state.clone(),
1286 checkpoint_service.clone(),
1287 epoch_store.clone(),
1288 low_scoring_authorities,
1289 backpressure_manager,
1290 );
1291
1292 info!("Starting consensus manager");
1293
1294 consensus_manager
1295 .start(
1296 config,
1297 epoch_store.clone(),
1298 consensus_handler_initializer,
1299 IotaTxValidator::new(
1300 epoch_store.clone(),
1301 checkpoint_service.clone(),
1302 state.transaction_manager().clone(),
1303 iota_tx_validator_metrics.clone(),
1304 ),
1305 )
1306 .await;
1307
1308 info!("Spawning checkpoint service");
1309 let checkpoint_service_tasks = checkpoint_service.spawn().await;
1310
1311 Ok(ValidatorComponents {
1312 validator_server_handle,
1313 validator_overload_monitor_handle,
1314 consensus_manager,
1315 consensus_store_pruner,
1316 consensus_adapter,
1317 checkpoint_service_tasks,
1318 checkpoint_metrics,
1319 iota_tx_validator_metrics,
1320 validator_registry_id,
1321 })
1322 }
1323
1324 fn build_checkpoint_service(
1331 config: &NodeConfig,
1332 consensus_adapter: Arc<ConsensusAdapter>,
1333 checkpoint_store: Arc<CheckpointStore>,
1334 epoch_store: Arc<AuthorityPerEpochStore>,
1335 state: Arc<AuthorityState>,
1336 state_sync_handle: state_sync::Handle,
1337 global_state_hasher: Weak<GlobalStateHasher>,
1338 checkpoint_metrics: Arc<CheckpointMetrics>,
1339 ) -> Arc<CheckpointService> {
1340 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1341 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1342
1343 debug!(
1344 "Starting checkpoint service with epoch start timestamp {}
1345 and epoch duration {}",
1346 epoch_start_timestamp_ms, epoch_duration_ms
1347 );
1348
1349 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1350 sender: consensus_adapter,
1351 signer: state.secret.clone(),
1352 authority: config.authority_public_key(),
1353 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1354 .checked_add(epoch_duration_ms)
1355 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1356 metrics: checkpoint_metrics.clone(),
1357 });
1358
1359 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1360 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1361 let max_checkpoint_size_bytes =
1362 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1363
1364 CheckpointService::build(
1365 state.clone(),
1366 checkpoint_store,
1367 epoch_store,
1368 state.get_transaction_cache_reader().clone(),
1369 global_state_hasher,
1370 checkpoint_output,
1371 Box::new(certified_checkpoint_output),
1372 checkpoint_metrics,
1373 max_tx_per_checkpoint,
1374 max_checkpoint_size_bytes,
1375 )
1376 }
1377
1378 fn construct_consensus_adapter(
1379 committee: &Committee,
1380 consensus_config: &ConsensusConfig,
1381 authority: AuthorityName,
1382 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1383 prometheus_registry: &Registry,
1384 consensus_client: Arc<dyn ConsensusClient>,
1385 checkpoint_store: Arc<CheckpointStore>,
1386 ) -> ConsensusAdapter {
1387 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1388 ConsensusAdapter::new(
1392 consensus_client,
1393 checkpoint_store,
1394 authority,
1395 connection_monitor_status,
1396 consensus_config.max_pending_transactions(),
1397 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1398 consensus_config.max_submit_position,
1399 consensus_config.submit_delay_step_override(),
1400 ca_metrics,
1401 )
1402 }
1403
1404 async fn start_grpc_validator_service(
1405 config: &NodeConfig,
1406 state: Arc<AuthorityState>,
1407 consensus_adapter: Arc<ConsensusAdapter>,
1408 prometheus_registry: &Registry,
1409 ) -> Result<SpawnOnce> {
1410 let validator_service = ValidatorService::new(
1411 state,
1412 consensus_adapter,
1413 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1414 TrafficControllerMetrics::new(prometheus_registry),
1415 config.policy_config.clone(),
1416 config.firewall_config.clone(),
1417 );
1418
1419 let mut server_conf = iota_network_stack::config::Config::new();
1420 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1421 server_conf.load_shed = config.grpc_load_shed;
1422 let server_builder =
1423 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1424 .add_service(ValidatorServer::new(validator_service));
1425
1426 let tls_config = iota_tls::create_rustls_server_config(
1427 config.network_key_pair().copy().private(),
1428 IOTA_TLS_SERVER_NAME.to_string(),
1429 );
1430
1431 let network_address = config.network_address().clone();
1432
1433 let bind_future = async move {
1434 let server = server_builder
1435 .bind(&network_address, Some(tls_config))
1436 .await
1437 .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1438
1439 let local_addr = server.local_addr();
1440 info!("Listening to traffic on {local_addr}");
1441
1442 Ok(server)
1443 };
1444
1445 Ok(SpawnOnce::new(bind_future))
1446 }
1447
1448 async fn reexecute_pending_consensus_certs(
1466 epoch_store: &Arc<AuthorityPerEpochStore>,
1467 state: &Arc<AuthorityState>,
1468 ) {
1469 let mut pending_consensus_certificates = Vec::new();
1470 let mut additional_certs = Vec::new();
1471
1472 for tx in epoch_store.get_all_pending_consensus_transactions() {
1473 match tx.kind {
1474 ConsensusTransactionKind::CertifiedTransaction(tx)
1477 if !tx.contains_shared_object() =>
1478 {
1479 let tx = *tx;
1480 let tx = VerifiedExecutableTransaction::new_from_certificate(
1483 VerifiedCertificate::new_unchecked(tx),
1484 );
1485 if let Some(fx_digest) = epoch_store
1488 .get_signed_effects_digest(tx.digest())
1489 .expect("db error")
1490 {
1491 pending_consensus_certificates.push((tx, fx_digest));
1492 } else {
1493 additional_certs.push(tx);
1494 }
1495 }
1496 _ => (),
1497 }
1498 }
1499
1500 let digests = pending_consensus_certificates
1501 .iter()
1502 .map(|(tx, _)| *tx.digest())
1503 .collect::<Vec<_>>();
1504
1505 info!(
1506 "reexecuting {} pending consensus certificates: {:?}",
1507 digests.len(),
1508 digests
1509 );
1510
1511 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1512 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1513
1514 let timeout = if cfg!(msim) { 120 } else { 60 };
1520 if tokio::time::timeout(
1521 std::time::Duration::from_secs(timeout),
1522 state
1523 .get_transaction_cache_reader()
1524 .try_notify_read_executed_effects_digests(&digests),
1525 )
1526 .await
1527 .is_err()
1528 {
1529 if let Ok(executed_effects_digests) = state
1531 .get_transaction_cache_reader()
1532 .try_multi_get_executed_effects_digests(&digests)
1533 {
1534 let pending_digests = digests
1535 .iter()
1536 .zip(executed_effects_digests.iter())
1537 .filter_map(|(digest, executed_effects_digest)| {
1538 if executed_effects_digest.is_none() {
1539 Some(digest)
1540 } else {
1541 None
1542 }
1543 })
1544 .collect::<Vec<_>>();
1545 debug_fatal!(
1546 "Timed out waiting for effects digests to be executed: {:?}",
1547 pending_digests
1548 );
1549 } else {
1550 debug_fatal!(
1551 "Timed out waiting for effects digests to be executed, digests not found"
1552 );
1553 }
1554 }
1555 }
1556
1557 pub fn state(&self) -> Arc<AuthorityState> {
1558 self.state.clone()
1559 }
1560
1561 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1563 self.state.reference_gas_price_for_testing()
1564 }
1565
1566 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1567 self.state.committee_store().clone()
1568 }
1569
1570 pub fn clone_authority_aggregator(
1580 &self,
1581 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1582 self.transaction_orchestrator
1583 .as_ref()
1584 .map(|to| to.clone_authority_aggregator())
1585 }
1586
1587 pub fn transaction_orchestrator(
1588 &self,
1589 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1590 self.transaction_orchestrator.clone()
1591 }
1592
1593 pub fn subscribe_to_transaction_orchestrator_effects(
1594 &self,
1595 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1596 self.transaction_orchestrator
1597 .as_ref()
1598 .map(|to| to.subscribe_to_effects_queue())
1599 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1600 }
1601
1602 pub async fn monitor_reconfiguration(
1608 self: Arc<Self>,
1609 mut epoch_store: Arc<AuthorityPerEpochStore>,
1610 ) -> Result<()> {
1611 let checkpoint_executor_metrics =
1612 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1613
1614 loop {
1615 let mut hasher_guard = self.global_state_hasher.lock().await;
1616 let hasher = hasher_guard.take().unwrap();
1617 info!(
1618 "Creating checkpoint executor for epoch {}",
1619 epoch_store.epoch()
1620 );
1621
1622 let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1624 guard.as_ref().map(|handle| {
1625 let tx = handle.checkpoint_data_broadcaster().clone();
1626 Box::new(move |data: &CheckpointData| {
1627 tx.send_traced(data);
1628 }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1629 })
1630 } else {
1631 None
1632 };
1633
1634 let checkpoint_executor = CheckpointExecutor::new(
1635 epoch_store.clone(),
1636 self.checkpoint_store.clone(),
1637 self.state.clone(),
1638 hasher.clone(),
1639 self.backpressure_manager.clone(),
1640 self.config.checkpoint_executor_config.clone(),
1641 checkpoint_executor_metrics.clone(),
1642 data_sender,
1643 Some(self.checkpoint_progress_tracker.clone()),
1644 );
1645
1646 let run_with_range = self.config.run_with_range;
1647
1648 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1649
1650 self.metrics
1652 .current_protocol_version
1653 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1654
1655 if let Some(components) = &*self.validator_components.lock().await {
1657 tokio::time::sleep(Duration::from_millis(1)).await;
1659
1660 let config = cur_epoch_store.protocol_config();
1661 let binary_config = to_binary_config(config);
1662 let transaction = ConsensusTransaction::new_capability_notification_v1(
1663 AuthorityCapabilitiesV1::new(
1664 self.state.name,
1665 cur_epoch_store.get_chain(),
1666 self.config
1667 .supported_protocol_versions
1668 .expect("Supported versions should be populated")
1669 .truncate_below(config.version),
1671 self.state
1672 .get_available_system_packages(&binary_config)
1673 .await,
1674 ),
1675 );
1676 info!(?transaction, "submitting capabilities to consensus");
1677 components
1678 .consensus_adapter
1679 .submit(transaction, None, &cur_epoch_store)?;
1680 } else if self.state.is_active_validator(&cur_epoch_store)
1681 && cur_epoch_store
1682 .protocol_config()
1683 .track_non_committee_eligible_validators()
1684 {
1685 let epoch_store = cur_epoch_store.clone();
1689 let node_clone = self.clone();
1690 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1691 node_clone
1692 .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1693 .instrument(trace_span!(
1694 "send_signed_capability_notification_to_committee_with_retry"
1695 ))
1696 .await;
1697 }));
1698 }
1699
1700 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1701
1702 if stop_condition == StopReason::RunWithRangeCondition {
1703 IotaNode::shutdown(&self).await;
1704 self.shutdown_channel_tx
1705 .send(run_with_range)
1706 .expect("RunWithRangeCondition met but failed to send shutdown message");
1707 return Ok(());
1708 }
1709
1710 let latest_system_state = self
1712 .state
1713 .get_object_cache_reader()
1714 .try_get_iota_system_state_object_unsafe()
1715 .expect("Read IOTA System State object cannot fail");
1716
1717 #[cfg(msim)]
1718 if !self
1719 .sim_state
1720 .sim_safe_mode_expected
1721 .load(Ordering::Relaxed)
1722 {
1723 debug_assert!(!latest_system_state.safe_mode());
1724 }
1725
1726 #[cfg(not(msim))]
1727 debug_assert!(!latest_system_state.safe_mode());
1728
1729 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1730 if self.state.is_fullnode(&cur_epoch_store) {
1731 warn!(
1732 "Failed to send end of epoch notification to subscriber: {:?}",
1733 err
1734 );
1735 }
1736 }
1737
1738 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1739 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1740
1741 self.auth_agg.store(Arc::new(
1742 self.auth_agg
1743 .load()
1744 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1745 ));
1746
1747 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1748 let next_epoch = next_epoch_committee.epoch();
1749 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1750
1751 info!(
1752 next_epoch,
1753 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1754 );
1755
1756 fail_point_async!("reconfig_delay");
1757
1758 let authority_names_to_peer_ids =
1763 new_epoch_start_state.get_authority_names_to_peer_ids();
1764 self.connection_monitor_status
1765 .update_mapping_for_epoch(authority_names_to_peer_ids);
1766
1767 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1768
1769 send_trusted_peer_change(
1770 &self.config,
1771 &self.trusted_peer_change_tx,
1772 &new_epoch_start_state,
1773 );
1774
1775 let mut validator_components_lock_guard = self.validator_components.lock().await;
1776
1777 let new_epoch_store = self
1781 .reconfigure_state(
1782 &self.state,
1783 &cur_epoch_store,
1784 next_epoch_committee.clone(),
1785 new_epoch_start_state,
1786 hasher.clone(),
1787 )
1788 .await?;
1789
1790 let new_validator_components = if let Some(ValidatorComponents {
1791 validator_server_handle,
1792 validator_overload_monitor_handle,
1793 consensus_manager,
1794 consensus_store_pruner,
1795 consensus_adapter,
1796 mut checkpoint_service_tasks,
1797 checkpoint_metrics,
1798 iota_tx_validator_metrics,
1799 validator_registry_id,
1800 }) = validator_components_lock_guard.take()
1801 {
1802 info!("Reconfiguring the validator.");
1803 checkpoint_service_tasks.abort_all();
1808 while let Some(result) = checkpoint_service_tasks.join_next().await {
1809 if let Err(err) = result {
1810 if err.is_panic() {
1811 std::panic::resume_unwind(err.into_panic());
1812 }
1813 warn!("Error in checkpoint service task: {:?}", err);
1814 }
1815 }
1816 info!("Checkpoint service has shut down.");
1817
1818 consensus_manager.shutdown().await;
1819 info!("Consensus has shut down.");
1820
1821 info!("Epoch store finished reconfiguration.");
1822
1823 let global_state_hasher_metrics = Arc::into_inner(hasher)
1826 .expect("Object state hasher should have no other references at this point")
1827 .metrics();
1828 let new_hasher = Arc::new(GlobalStateHasher::new(
1829 self.state.get_global_state_hash_store().clone(),
1830 global_state_hasher_metrics,
1831 ));
1832 let weak_hasher = Arc::downgrade(&new_hasher);
1833 *hasher_guard = Some(new_hasher);
1834
1835 consensus_store_pruner.prune(next_epoch).await;
1836
1837 if self.state.is_committee_validator(&new_epoch_store) {
1838 Some(
1840 Self::start_epoch_specific_validator_components(
1841 &self.config,
1842 self.state.clone(),
1843 consensus_adapter,
1844 self.checkpoint_store.clone(),
1845 new_epoch_store.clone(),
1846 self.state_sync_handle.clone(),
1847 self.randomness_handle.clone(),
1848 consensus_manager,
1849 consensus_store_pruner,
1850 weak_hasher,
1851 self.backpressure_manager.clone(),
1852 validator_server_handle,
1853 validator_overload_monitor_handle,
1854 checkpoint_metrics,
1855 iota_tx_validator_metrics,
1856 validator_registry_id,
1857 )
1858 .await?,
1859 )
1860 } else {
1861 info!("This node is no longer a validator after reconfiguration");
1862 if self.registry_service.remove(validator_registry_id) {
1863 debug!("Removed validator metrics registry");
1864 } else {
1865 warn!("Failed to remove validator metrics registry");
1866 }
1867 validator_server_handle.shutdown();
1868 debug!("Validator grpc server shutdown triggered");
1869
1870 None
1871 }
1872 } else {
1873 let global_state_hasher_metrics = Arc::into_inner(hasher)
1876 .expect("Object state hasher should have no other references at this point")
1877 .metrics();
1878 let new_hasher = Arc::new(GlobalStateHasher::new(
1879 self.state.get_global_state_hash_store().clone(),
1880 global_state_hasher_metrics,
1881 ));
1882 let weak_hasher = Arc::downgrade(&new_hasher);
1883 *hasher_guard = Some(new_hasher);
1884
1885 if self.state.is_committee_validator(&new_epoch_store) {
1886 info!("Promoting the node from fullnode to validator, starting grpc server");
1887
1888 let mut components = Self::construct_validator_components(
1889 self.config.clone(),
1890 self.state.clone(),
1891 Arc::new(next_epoch_committee.clone()),
1892 new_epoch_store.clone(),
1893 self.checkpoint_store.clone(),
1894 self.state_sync_handle.clone(),
1895 self.randomness_handle.clone(),
1896 weak_hasher,
1897 self.backpressure_manager.clone(),
1898 self.connection_monitor_status.clone(),
1899 &self.registry_service,
1900 )
1901 .await?;
1902
1903 components.validator_server_handle =
1904 components.validator_server_handle.start().await;
1905
1906 Some(components)
1907 } else {
1908 None
1909 }
1910 };
1911 *validator_components_lock_guard = new_validator_components;
1912
1913 cur_epoch_store.release_db_handles();
1916
1917 drop(cur_epoch_store);
1921
1922 self.state.epoch_db_pruner().prune_old_epoch_dbs().await;
1926
1927 if cfg!(msim)
1928 && !matches!(
1929 self.config
1930 .authority_store_pruning_config
1931 .num_epochs_to_retain_for_checkpoints(),
1932 None | Some(u64::MAX) | Some(0)
1933 )
1934 {
1935 self.state
1936 .prune_checkpoints_for_eligible_epochs_for_testing(
1937 self.config.clone(),
1938 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1939 )
1940 .await?;
1941 }
1942
1943 epoch_store = new_epoch_store;
1944 info!("Reconfiguration finished");
1945 }
1946 }
1947
1948 async fn shutdown(&self) {
1949 if let Some(validator_components) = &*self.validator_components.lock().await {
1950 validator_components.consensus_manager.shutdown().await;
1951 }
1952
1953 if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
1955 info!("Shutting down gRPC server");
1956 if let Err(e) = grpc_handle.shutdown().await {
1957 warn!("Failed to gracefully shutdown gRPC server: {e}");
1958 }
1959 }
1960 }
1961
1962 async fn reconfigure_state(
1965 &self,
1966 state: &Arc<AuthorityState>,
1967 cur_epoch_store: &AuthorityPerEpochStore,
1968 next_epoch_committee: Committee,
1969 next_epoch_start_system_state: EpochStartSystemState,
1970 global_state_hasher: Arc<GlobalStateHasher>,
1971 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
1972 let next_epoch = next_epoch_committee.epoch();
1973
1974 let last_checkpoint = self
1975 .checkpoint_store
1976 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1977 .expect("Error loading last checkpoint for current epoch")
1978 .expect("Could not load last checkpoint for current epoch");
1979 let epoch_supply_change = last_checkpoint
1980 .end_of_epoch_data
1981 .as_ref()
1982 .ok_or_else(|| {
1983 IotaError::from("last checkpoint in epoch should contain end of epoch data")
1984 })?
1985 .epoch_supply_change;
1986
1987 let last_checkpoint_seq = *last_checkpoint.sequence_number();
1988
1989 assert_eq!(
1990 Some(last_checkpoint_seq),
1991 self.checkpoint_store
1992 .get_highest_executed_checkpoint_seq_number()
1993 .expect("Error loading highest executed checkpoint sequence number")
1994 );
1995
1996 let epoch_start_configuration = EpochStartConfiguration::new(
1997 next_epoch_start_system_state,
1998 *last_checkpoint.digest(),
1999 state.get_object_store().as_ref(),
2000 EpochFlag::default_flags_for_new_epoch(&state.config),
2001 )
2002 .expect("EpochStartConfiguration construction cannot fail");
2003
2004 let new_epoch_store = self
2005 .state
2006 .reconfigure(
2007 cur_epoch_store,
2008 self.config.supported_protocol_versions.unwrap(),
2009 next_epoch_committee,
2010 epoch_start_configuration,
2011 global_state_hasher,
2012 &self.config.expensive_safety_check_config,
2013 epoch_supply_change,
2014 last_checkpoint_seq,
2015 )
2016 .await
2017 .expect("Reconfigure authority state cannot fail");
2018 info!(next_epoch, "Node State has been reconfigured");
2019 assert_eq!(next_epoch, new_epoch_store.epoch());
2020 self.state.get_reconfig_api().update_epoch_flags_metrics(
2021 cur_epoch_store.epoch_start_config().flags(),
2022 new_epoch_store.epoch_start_config().flags(),
2023 );
2024
2025 Ok(new_epoch_store)
2026 }
2027
2028 pub fn get_config(&self) -> &NodeConfig {
2029 &self.config
2030 }
2031
2032 async fn execute_transaction_immediately_at_zero_epoch(
2033 state: &Arc<AuthorityState>,
2034 epoch_store: &Arc<AuthorityPerEpochStore>,
2035 tx: &Transaction,
2036 span: tracing::Span,
2037 ) {
2038 let _guard = span.enter();
2039 let transaction =
2040 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2041 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2042 tx.data().clone(),
2043 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2044 ),
2045 );
2046 state
2047 .try_execute_immediately(&transaction, None, epoch_store)
2048 .unwrap();
2049 }
2050
2051 pub fn randomness_handle(&self) -> randomness::Handle {
2052 self.randomness_handle.clone()
2053 }
2054
2055 async fn send_signed_capability_notification_to_committee_with_retry(
2061 &self,
2062 epoch_store: &Arc<AuthorityPerEpochStore>,
2063 ) {
2064 const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2065 const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2066 const MAX_RETRY_INTERVAL_SECS: u64 = 300; let config = epoch_store.protocol_config();
2070 let binary_config = to_binary_config(config);
2071
2072 let capabilities = AuthorityCapabilitiesV1::new(
2074 self.state.name,
2075 epoch_store.get_chain(),
2076 self.config
2077 .supported_protocol_versions
2078 .expect("Supported versions should be populated")
2079 .truncate_below(config.version),
2080 self.state
2081 .get_available_system_packages(&binary_config)
2082 .await,
2083 );
2084
2085 let signature = AuthoritySignature::new_secure(
2087 &IntentMessage::new(
2088 Intent::iota_app(IntentScope::AuthorityCapabilities),
2089 &capabilities,
2090 ),
2091 &epoch_store.epoch(),
2092 self.config.authority_key_pair(),
2093 );
2094
2095 let request = HandleCapabilityNotificationRequestV1 {
2096 message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2097 };
2098
2099 let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2100
2101 loop {
2102 let auth_agg = self.auth_agg.load();
2103 match auth_agg
2104 .send_capability_notification_to_quorum(request.clone())
2105 .await
2106 {
2107 Ok(_) => {
2108 info!("Successfully sent capability notification to committee");
2109 break;
2110 }
2111 Err(err) => {
2112 match &err {
2113 AggregatorSendCapabilityNotificationError::RetryableNotification {
2114 errors,
2115 } => {
2116 warn!(
2117 "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2118 retry_interval, errors
2119 );
2120 }
2121 AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2122 errors,
2123 } => {
2124 error!(
2125 "Failed to send capability notification to committee (non-retryable error): {:?}",
2126 errors
2127 );
2128 break;
2129 }
2130 };
2131
2132 tokio::time::sleep(retry_interval).await;
2134
2135 retry_interval = std::cmp::min(
2137 retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2138 Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2139 );
2140 }
2141 }
2142 }
2143 }
2144}
2145
2146#[cfg(msim)]
2147impl IotaNode {
2148 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2149 self.sim_state.sim_node.id()
2150 }
2151
2152 pub fn set_safe_mode_expected(&self, new_value: bool) {
2153 info!("Setting safe mode expected to {}", new_value);
2154 self.sim_state
2155 .sim_safe_mode_expected
2156 .store(new_value, Ordering::Relaxed);
2157 }
2158}
2159
2160enum SpawnOnce {
2161 Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2163 #[allow(unused)]
2164 Started(iota_http::ServerHandle),
2165}
2166
2167impl SpawnOnce {
2168 pub fn new(
2169 future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2170 ) -> Self {
2171 Self::Unstarted(Mutex::new(Box::pin(future)))
2172 }
2173
2174 pub async fn start(self) -> Self {
2175 match self {
2176 Self::Unstarted(future) => {
2177 let server = future
2178 .into_inner()
2179 .await
2180 .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2181 let handle = server.handle().clone();
2182 tokio::spawn(async move {
2183 if let Err(err) = server.serve().await {
2184 info!("Server stopped: {err}");
2185 }
2186 info!("Server stopped");
2187 });
2188 Self::Started(handle)
2189 }
2190 Self::Started(_) => self,
2191 }
2192 }
2193
2194 pub fn shutdown(self) {
2195 if let SpawnOnce::Started(handle) = self {
2196 handle.trigger_shutdown();
2197 }
2198 }
2199}
2200
2201fn send_trusted_peer_change(
2204 config: &NodeConfig,
2205 sender: &watch::Sender<TrustedPeerChangeEvent>,
2206 new_epoch_start_state: &EpochStartSystemState,
2207) {
2208 let new_committee =
2209 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2210
2211 sender.send_modify(|event| {
2212 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2213 event.new_committee = new_committee;
2214 })
2215}
2216
2217fn build_kv_store(
2218 state: &Arc<AuthorityState>,
2219 config: &NodeConfig,
2220 registry: &Registry,
2221) -> Result<Arc<TransactionKeyValueStore>> {
2222 let metrics = KeyValueStoreMetrics::new(registry);
2223 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2224
2225 let base_url = &config.transaction_kv_store_read_config.base_url;
2226
2227 if base_url.is_empty() {
2228 info!("no http kv store url provided, using local db only");
2229 return Ok(Arc::new(db_store));
2230 }
2231
2232 base_url.parse::<url::Url>().tap_err(|e| {
2233 error!(
2234 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2235 base_url, e
2236 )
2237 })?;
2238
2239 let http_store = HttpKVStore::new_kv(
2240 base_url,
2241 config.transaction_kv_store_read_config.cache_size,
2242 metrics.clone(),
2243 )?;
2244 info!("using local key-value store with fallback to http key-value store");
2245 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2246 db_store,
2247 http_store,
2248 metrics,
2249 "json_rpc_fallback",
2250 )))
2251}
2252
2253async fn build_grpc_server(
2268 config: &NodeConfig,
2269 state: Arc<AuthorityState>,
2270 state_sync_store: RocksDbStore,
2271 executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2272 prometheus_registry: &Registry,
2273 server_version: ServerVersion,
2274) -> Result<Option<GrpcServerHandle>> {
2275 if config.consensus_config().is_some() || !config.enable_grpc_api {
2277 return Ok(None);
2278 }
2279
2280 let Some(grpc_config) = &config.grpc_api_config else {
2281 return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2282 };
2283
2284 let chain_id = state.get_chain_identifier();
2286
2287 let grpc_read_store = Arc::new(GrpcReadStore::new(state.clone(), state_sync_store));
2288
2289 let shutdown_token = CancellationToken::new();
2291
2292 let grpc_reader = Arc::new(GrpcReader::new(
2294 grpc_read_store,
2295 Some(server_version.to_string()),
2296 ));
2297
2298 let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2300
2301 let handle = start_grpc_server(
2302 grpc_reader,
2303 executor,
2304 grpc_config.clone(),
2305 shutdown_token,
2306 chain_id,
2307 Some(grpc_server_metrics),
2308 )
2309 .await?;
2310
2311 Ok(Some(handle))
2312}
2313
2314pub async fn build_http_server(
2329 state: Arc<AuthorityState>,
2330 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2331 config: &NodeConfig,
2332 prometheus_registry: &Registry,
2333) -> Result<Option<iota_http::ServerHandle>> {
2334 if config.consensus_config().is_some() {
2336 return Ok(None);
2337 }
2338
2339 let mut router = axum::Router::new();
2340
2341 let json_rpc_router = {
2342 let mut server = JsonRpcServerBuilder::new(
2343 env!("CARGO_PKG_VERSION"),
2344 prometheus_registry,
2345 config.policy_config.clone(),
2346 config.firewall_config.clone(),
2347 );
2348
2349 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2350
2351 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2352 server.register_module(ReadApi::new(
2353 state.clone(),
2354 kv_store.clone(),
2355 metrics.clone(),
2356 ))?;
2357 server.register_module(CoinReadApi::new(
2358 state.clone(),
2359 kv_store.clone(),
2360 metrics.clone(),
2361 )?)?;
2362
2363 if config.run_with_range.is_none() {
2366 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2367 }
2368 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2369
2370 if let Some(transaction_orchestrator) = transaction_orchestrator {
2371 server.register_module(TransactionExecutionApi::new(
2372 state.clone(),
2373 transaction_orchestrator.clone(),
2374 metrics.clone(),
2375 ))?;
2376 }
2377
2378 let iota_names_config = config
2379 .iota_names_config
2380 .clone()
2381 .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2382
2383 server.register_module(IndexerApi::new(
2384 state.clone(),
2385 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2386 kv_store,
2387 metrics,
2388 iota_names_config,
2389 config.indexer_max_subscriptions,
2390 ))?;
2391 server.register_module(MoveUtils::new(state.clone()))?;
2392
2393 let server_type = config.jsonrpc_server_type();
2394
2395 server.to_router(server_type).await?
2396 };
2397
2398 router = router.merge(json_rpc_router);
2399
2400 router = router
2401 .route("/health", axum::routing::get(health_check_handler))
2402 .route_layer(axum::Extension(state));
2403
2404 let layers = ServiceBuilder::new()
2405 .map_request(|mut request: axum::http::Request<_>| {
2406 if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2407 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2408 request.extensions_mut().insert(axum_connect_info);
2409 }
2410 request
2411 })
2412 .layer(axum::middleware::from_fn(server_timing_middleware));
2413
2414 router = router.layer(layers);
2415
2416 let handle = iota_http::Builder::new()
2417 .serve(&config.json_rpc_address, router)
2418 .map_err(|e| anyhow::anyhow!("{e}"))?;
2419 info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2420
2421 Ok(Some(handle))
2422}
2423
2424#[derive(Debug, serde::Serialize, serde::Deserialize)]
2425pub struct Threshold {
2426 pub threshold_seconds: Option<u32>,
2427}
2428
2429async fn health_check_handler(
2430 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2431 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2432) -> impl axum::response::IntoResponse {
2433 if let Some(threshold_seconds) = threshold_seconds {
2434 let summary = match state
2436 .get_checkpoint_store()
2437 .get_highest_executed_checkpoint()
2438 {
2439 Ok(Some(summary)) => summary,
2440 Ok(None) => {
2441 warn!("Highest executed checkpoint not found");
2442 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2443 }
2444 Err(err) => {
2445 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2446 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2447 }
2448 };
2449
2450 let latest_chain_time = summary.timestamp();
2452 let threshold =
2453 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2454
2455 if latest_chain_time < threshold {
2457 warn!(
2458 ?latest_chain_time,
2459 ?threshold,
2460 "failing health check due to checkpoint lag"
2461 );
2462 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2463 }
2464 }
2465 (axum::http::StatusCode::OK, "up")
2467}
2468
2469#[cfg(not(test))]
2470fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2471 protocol_config.max_transactions_per_checkpoint() as usize
2472}
2473
2474#[cfg(test)]
2475fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2476 2
2477}