1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::HashMap,
9 fmt,
10 future::Future,
11 path::PathBuf,
12 sync::{Arc, Weak},
13 time::Duration,
14};
15
16use anemo::Network;
17use anemo_tower::{
18 callback::CallbackLayer,
19 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
20};
21use anyhow::{Result, anyhow};
22use arc_swap::ArcSwap;
23use futures::future::BoxFuture;
24pub use handle::IotaNodeHandle;
25use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
26use iota_common::debug_fatal;
27use iota_config::{
28 ConsensusConfig, NodeConfig,
29 node::{DBCheckpointConfig, RunWithRange},
30 node_config_metrics::NodeConfigMetrics,
31 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
32};
33use iota_core::{
34 authority::{
35 AuthorityState, AuthorityStore, RandomnessRoundReceiver,
36 authority_per_epoch_store::AuthorityPerEpochStore,
37 authority_store_pruner::ObjectsCompactionFilter,
38 authority_store_tables::{
39 AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
40 },
41 backpressure::BackpressureManager,
42 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
43 },
44 authority_aggregator::{
45 AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
46 },
47 authority_client::NetworkAuthorityClient,
48 authority_server::{ValidatorService, ValidatorServiceMetrics},
49 checkpoints::{
50 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
51 SubmitCheckpointToConsensus,
52 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
53 },
54 connection_monitor::ConnectionMonitor,
55 consensus_adapter::{
56 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
57 ConsensusClient,
58 },
59 consensus_handler::ConsensusHandlerInitializer,
60 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
61 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
62 db_checkpoint_handler::DBCheckpointHandler,
63 epoch::{
64 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
65 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
66 reconfiguration::ReconfigurationInitiator,
67 },
68 execution_cache::build_execution_cache,
69 global_state_hasher::{GlobalStateHashMetrics, GlobalStateHasher},
70 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
71 jsonrpc_index::IndexStore,
72 module_cache_metrics::ResolverMetrics,
73 overload_monitor::overload_monitor,
74 safe_client::SafeClientMetricsBase,
75 signature_verifier::SignatureVerifierMetrics,
76 storage::{GrpcReadStore, RocksDbStore},
77 traffic_controller::metrics::TrafficControllerMetrics,
78 transaction_orchestrator::TransactionOrchestrator,
79 validator_tx_finalizer::ValidatorTxFinalizer,
80};
81use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
82use iota_json_rpc::{
83 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
84 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
85 transaction_builder_api::TransactionBuilderApi,
86 transaction_execution_api::TransactionExecutionApi,
87};
88use iota_json_rpc_api::JsonRpcMetrics;
89use iota_macros::{fail_point, fail_point_async, replay_log};
90use iota_metrics::{
91 RegistryID, RegistryService,
92 hardware_metrics::register_hardware_metrics,
93 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
94 server_timing_middleware, spawn_monitored_task,
95};
96use iota_names::config::IotaNamesConfig;
97use iota_network::{
98 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
99};
100use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
101use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
102use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
103use iota_snapshot::uploader::StateSnapshotUploader;
104use iota_storage::{
105 FileCompression, StorageFormat,
106 http_key_value_store::HttpKVStore,
107 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
108 key_value_store_metrics::KeyValueStoreMetrics,
109};
110use iota_types::{
111 base_types::{AuthorityName, ConciseableName, EpochId},
112 committee::Committee,
113 crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
114 digests::ChainIdentifier,
115 error::{IotaError, IotaResult},
116 executable_transaction::VerifiedExecutableTransaction,
117 execution_config_utils::to_binary_config,
118 full_checkpoint_content::CheckpointData,
119 iota_system_state::{
120 IotaSystemState, IotaSystemStateTrait,
121 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
122 },
123 messages_consensus::{
124 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
125 SignedAuthorityCapabilitiesV1,
126 },
127 messages_grpc::HandleCapabilityNotificationRequestV1,
128 quorum_driver_types::QuorumDriverEffectsQueueResult,
129 supported_protocol_versions::SupportedProtocolVersions,
130 transaction::{Transaction, VerifiedCertificate},
131};
132use prometheus::Registry;
133#[cfg(msim)]
134use simulator::*;
135use tap::tap::TapFallible;
136use tokio::{
137 sync::{Mutex, broadcast, mpsc, watch},
138 task::{JoinHandle, JoinSet},
139};
140use tokio_util::sync::CancellationToken;
141use tower::ServiceBuilder;
142use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
143use typed_store::{
144 DBMetrics,
145 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
146};
147
148use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
149
150pub mod admin;
151mod handle;
152pub mod metrics;
153
154pub struct ValidatorComponents {
155 validator_server_handle: SpawnOnce,
156 validator_overload_monitor_handle: Option<JoinHandle<()>>,
157 consensus_manager: ConsensusManager,
158 consensus_store_pruner: ConsensusStorePruner,
159 consensus_adapter: Arc<ConsensusAdapter>,
160 checkpoint_service_tasks: JoinSet<()>,
162 checkpoint_metrics: Arc<CheckpointMetrics>,
163 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
164 validator_registry_id: RegistryID,
165}
166
167#[cfg(msim)]
168mod simulator {
169 use std::sync::atomic::AtomicBool;
170
171 use super::*;
172
173 pub(super) struct SimState {
174 pub sim_node: iota_simulator::runtime::NodeHandle,
175 pub sim_safe_mode_expected: AtomicBool,
176 _leak_detector: iota_simulator::NodeLeakDetector,
177 }
178
179 impl Default for SimState {
180 fn default() -> Self {
181 Self {
182 sim_node: iota_simulator::runtime::NodeHandle::current(),
183 sim_safe_mode_expected: AtomicBool::new(false),
184 _leak_detector: iota_simulator::NodeLeakDetector::new(),
185 }
186 }
187 }
188}
189
190#[derive(Clone)]
191pub struct ServerVersion {
192 pub bin: &'static str,
193 pub version: &'static str,
194}
195
196impl ServerVersion {
197 pub fn new(bin: &'static str, version: &'static str) -> Self {
198 Self { bin, version }
199 }
200}
201
202impl std::fmt::Display for ServerVersion {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.write_str(self.bin)?;
205 f.write_str("/")?;
206 f.write_str(self.version)
207 }
208}
209
210pub struct IotaNode {
211 config: NodeConfig,
212 validator_components: Mutex<Option<ValidatorComponents>>,
213 _http_server: Option<iota_http::ServerHandle>,
215 state: Arc<AuthorityState>,
216 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
217 registry_service: RegistryService,
218 metrics: Arc<IotaNodeMetrics>,
219
220 _discovery: discovery::Handle,
221 state_sync_handle: state_sync::Handle,
222 randomness_handle: randomness::Handle,
223 checkpoint_store: Arc<CheckpointStore>,
224 global_state_hasher: Mutex<Option<Arc<GlobalStateHasher>>>,
225 connection_monitor_status: Arc<ConnectionMonitorStatus>,
226
227 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
229
230 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
233
234 backpressure_manager: Arc<BackpressureManager>,
235
236 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
237
238 #[cfg(msim)]
239 sim_state: SimState,
240
241 _state_archive_handle: Option<broadcast::Sender<()>>,
242
243 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
244 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
246
247 grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
249
250 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
256}
257
258impl fmt::Debug for IotaNode {
259 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
260 f.debug_struct("IotaNode")
261 .field("name", &self.state.name.concise())
262 .finish()
263 }
264}
265
266impl IotaNode {
267 pub async fn start(
268 config: NodeConfig,
269 registry_service: RegistryService,
270 ) -> Result<Arc<IotaNode>> {
271 Self::start_async(
272 config,
273 registry_service,
274 ServerVersion::new("iota-node", "unknown"),
275 )
276 .await
277 }
278
279 pub async fn start_async(
280 config: NodeConfig,
281 registry_service: RegistryService,
282 server_version: ServerVersion,
283 ) -> Result<Arc<IotaNode>> {
284 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
285 let mut config = config.clone();
286 if config.supported_protocol_versions.is_none() {
287 info!(
288 "populating config.supported_protocol_versions with default {:?}",
289 SupportedProtocolVersions::SYSTEM_DEFAULT
290 );
291 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
292 }
293
294 let run_with_range = config.run_with_range;
295 let is_validator = config.consensus_config().is_some();
296 let is_full_node = !is_validator;
297 let prometheus_registry = registry_service.default_registry();
298
299 info!(node =? config.authority_public_key(),
300 "Initializing iota-node listening on {}", config.network_address
301 );
302
303 let genesis = config.genesis()?.clone();
304
305 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
306 info!("IOTA chain identifier: {chain_identifier}");
307
308 let db_corrupted_path = &config.db_path().join("status");
310 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
311 panic!("Failed to check database corruption: {err}");
312 }
313
314 DBMetrics::init(&prometheus_registry);
316
317 iota_metrics::init_metrics(&prometheus_registry);
319 #[cfg(not(msim))]
322 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
323
324 register_hardware_metrics(®istry_service, &config.db_path)
326 .expect("Failed registering hardware metrics");
327 prometheus_registry
329 .register(iota_metrics::uptime_metric(
330 if is_validator {
331 "validator"
332 } else {
333 "fullnode"
334 },
335 server_version.version,
336 &chain_identifier.to_string(),
337 ))
338 .expect("Failed registering uptime metric");
339
340 let migration_tx_data = if genesis.contains_migrations() {
343 Some(config.load_migration_tx_data()?)
346 } else {
347 None
348 };
349
350 let secret = Arc::pin(config.authority_key_pair().copy());
351 let genesis_committee = genesis.committee()?;
352 let committee_store = Arc::new(CommitteeStore::new(
353 config.db_path().join("epochs"),
354 &genesis_committee,
355 None,
356 ));
357
358 let mut pruner_db = None;
359 if config
360 .authority_store_pruning_config
361 .enable_compaction_filter
362 {
363 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
364 &config.db_path().join("store"),
365 )));
366 }
367 let compaction_filter = pruner_db
368 .clone()
369 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
370
371 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
373 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
374 enable_write_stall,
375 compaction_filter,
376 };
377 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
378 &config.db_path().join("store"),
379 Some(perpetual_tables_options),
380 ));
381 let is_genesis = perpetual_tables
382 .database_is_empty()
383 .expect("Database read should not fail at init.");
384 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
385 let backpressure_manager =
386 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
387
388 let store = AuthorityStore::open(
389 perpetual_tables,
390 &genesis,
391 &config,
392 &prometheus_registry,
393 migration_tx_data.as_ref(),
394 )
395 .await?;
396
397 let cur_epoch = store.get_recovery_epoch_at_restart()?;
398 let committee = committee_store
399 .get_committee(&cur_epoch)?
400 .expect("Committee of the current epoch must exist");
401 let epoch_start_configuration = store
402 .get_epoch_start_configuration()?
403 .expect("EpochStartConfiguration of the current epoch must exist");
404 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
405 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
406
407 let cache_traits = build_execution_cache(
408 &config.execution_cache_config,
409 &epoch_start_configuration,
410 &prometheus_registry,
411 &store,
412 backpressure_manager.clone(),
413 );
414
415 let auth_agg = {
416 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
417 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
418 Arc::new(ArcSwap::new(Arc::new(
419 AuthorityAggregator::new_from_epoch_start_state(
420 epoch_start_configuration.epoch_start_state(),
421 &committee_store,
422 safe_client_metrics_base,
423 auth_agg_metrics,
424 ),
425 )))
426 };
427
428 let chain = match config.chain_override_for_testing {
429 Some(chain) => chain,
430 None => chain_identifier.chain(),
431 };
432
433 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
434 let epoch_store = AuthorityPerEpochStore::new(
435 config.authority_public_key(),
436 committee.clone(),
437 &config.db_path().join("store"),
438 Some(epoch_options.options),
439 EpochMetrics::new(®istry_service.default_registry()),
440 epoch_start_configuration,
441 cache_traits.backing_package_store.clone(),
442 cache_metrics,
443 signature_verifier_metrics,
444 &config.expensive_safety_check_config,
445 (chain_identifier, chain),
446 checkpoint_store
447 .get_highest_executed_checkpoint_seq_number()
448 .expect("checkpoint store read cannot fail")
449 .unwrap_or(0),
450 )?;
451
452 info!("created epoch store");
453
454 replay_log!(
455 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
456 epoch_store.epoch(),
457 epoch_store.protocol_config()
458 );
459
460 if is_genesis {
462 info!("checking IOTA conservation at genesis");
463 cache_traits
468 .reconfig_api
469 .try_expensive_check_iota_conservation(&epoch_store, None)
470 .expect("IOTA conservation check cannot fail at genesis");
471 }
472
473 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
474 let default_buffer_stake = epoch_store
475 .protocol_config()
476 .buffer_stake_for_protocol_upgrade_bps();
477 if effective_buffer_stake != default_buffer_stake {
478 warn!(
479 ?effective_buffer_stake,
480 ?default_buffer_stake,
481 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
482 );
483 }
484
485 checkpoint_store.insert_genesis_checkpoint(
486 genesis.checkpoint(),
487 genesis.checkpoint_contents().clone(),
488 &epoch_store,
489 );
490
491 unmark_db_corruption(db_corrupted_path)?;
493
494 info!("creating state sync store");
495 let state_sync_store = RocksDbStore::new(
496 cache_traits.clone(),
497 committee_store.clone(),
498 checkpoint_store.clone(),
499 );
500
501 let index_store = if is_full_node && config.enable_index_processing {
502 info!("creating index store");
503 Some(Arc::new(IndexStore::new(
504 config.db_path().join("indexes"),
505 &prometheus_registry,
506 epoch_store
507 .protocol_config()
508 .max_move_identifier_len_as_option(),
509 )))
510 } else {
511 None
512 };
513
514 let grpc_indexes_store = if is_full_node && config.enable_grpc_api {
515 Some(Arc::new(
516 GrpcIndexesStore::new(
517 config.db_path().join(GRPC_INDEXES_DIR),
518 Arc::clone(&store),
519 &checkpoint_store,
520 )
521 .await,
522 ))
523 } else {
524 None
525 };
526
527 info!("creating archive reader");
528 let archive_readers =
533 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
534 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
535 let (randomness_tx, randomness_rx) = mpsc::channel(
536 config
537 .p2p_config
538 .randomness
539 .clone()
540 .unwrap_or_default()
541 .mailbox_capacity(),
542 );
543 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
544 Self::create_p2p_network(
545 &config,
546 state_sync_store.clone(),
547 chain_identifier,
548 trusted_peer_change_rx,
549 archive_readers.clone(),
550 randomness_tx,
551 &prometheus_registry,
552 )?;
553
554 send_trusted_peer_change(
557 &config,
558 &trusted_peer_change_tx,
559 epoch_store.epoch_start_state(),
560 );
561
562 info!("start state archival");
563 let state_archive_handle =
565 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
566 .await?;
567
568 info!("start snapshot upload");
569 let state_snapshot_handle =
571 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
572
573 info!("start db checkpoint");
575 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
576 &config,
577 &prometheus_registry,
578 state_snapshot_handle.is_some(),
579 )?;
580
581 let mut genesis_objects = genesis.objects().to_vec();
582 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
583 genesis_objects.extend(migration_tx_data.get_objects());
584 }
585
586 let authority_name = config.authority_public_key();
587 let validator_tx_finalizer =
588 config
589 .enable_validator_tx_finalizer
590 .then_some(Arc::new(ValidatorTxFinalizer::new(
591 auth_agg.clone(),
592 authority_name,
593 &prometheus_registry,
594 )));
595
596 info!("create authority state");
597 let state = AuthorityState::new(
598 authority_name,
599 secret,
600 config.supported_protocol_versions.unwrap(),
601 store.clone(),
602 cache_traits.clone(),
603 epoch_store.clone(),
604 committee_store.clone(),
605 index_store.clone(),
606 grpc_indexes_store,
607 checkpoint_store.clone(),
608 &prometheus_registry,
609 &genesis_objects,
610 &db_checkpoint_config,
611 config.clone(),
612 archive_readers,
613 validator_tx_finalizer,
614 chain_identifier,
615 pruner_db,
616 )
617 .await;
618
619 if epoch_store.epoch() == 0 {
621 let genesis_tx = &genesis.transaction();
622 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
623 Self::execute_transaction_immediately_at_zero_epoch(
625 &state,
626 &epoch_store,
627 genesis_tx,
628 span,
629 )
630 .await;
631
632 if let Some(migration_tx_data) = migration_tx_data {
634 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
635 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
636 Self::execute_transaction_immediately_at_zero_epoch(
637 &state,
638 &epoch_store,
639 tx,
640 span,
641 )
642 .await;
643 }
644 }
645 }
646
647 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
650
651 if config
652 .expensive_safety_check_config
653 .enable_secondary_index_checks()
654 {
655 if let Some(indexes) = state.indexes.clone() {
656 iota_core::verify_indexes::verify_indexes(
657 state.get_global_state_hash_store().as_ref(),
658 indexes,
659 )
660 .expect("secondary indexes are inconsistent");
661 }
662 }
663
664 let (end_of_epoch_channel, end_of_epoch_receiver) =
665 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
666
667 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
668 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
669 auth_agg.load_full(),
670 state.clone(),
671 end_of_epoch_receiver,
672 &config.db_path(),
673 &prometheus_registry,
674 )))
675 } else {
676 None
677 };
678
679 let http_server = build_http_server(
680 state.clone(),
681 &transaction_orchestrator.clone(),
682 &config,
683 &prometheus_registry,
684 )
685 .await?;
686
687 let global_state_hasher = Arc::new(GlobalStateHasher::new(
688 cache_traits.global_state_hash_store.clone(),
689 GlobalStateHashMetrics::new(&prometheus_registry),
690 ));
691
692 let authority_names_to_peer_ids = epoch_store
693 .epoch_start_state()
694 .get_authority_names_to_peer_ids();
695
696 let network_connection_metrics =
697 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
698
699 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
700
701 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
702 p2p_network.downgrade(),
703 network_connection_metrics,
704 HashMap::new(),
705 None,
706 );
707
708 let connection_monitor_status = ConnectionMonitorStatus {
709 connection_statuses,
710 authority_names_to_peer_ids,
711 };
712
713 let connection_monitor_status = Arc::new(connection_monitor_status);
714 let iota_node_metrics =
715 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
716
717 iota_node_metrics
718 .binary_max_protocol_version
719 .set(ProtocolVersion::MAX.as_u64() as i64);
720 iota_node_metrics
721 .configured_max_protocol_version
722 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
723
724 let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
728 transaction_orchestrator
729 .clone()
730 .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
731
732 let grpc_server_handle = build_grpc_server(
733 &config,
734 state.clone(),
735 state_sync_store.clone(),
736 executor,
737 &prometheus_registry,
738 server_version,
739 )
740 .await?;
741
742 let validator_components = if state.is_committee_validator(&epoch_store) {
743 let (components, _) = futures::join!(
744 Self::construct_validator_components(
745 config.clone(),
746 state.clone(),
747 committee,
748 epoch_store.clone(),
749 checkpoint_store.clone(),
750 state_sync_handle.clone(),
751 randomness_handle.clone(),
752 Arc::downgrade(&global_state_hasher),
753 backpressure_manager.clone(),
754 connection_monitor_status.clone(),
755 ®istry_service,
756 ),
757 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
758 );
759 let mut components = components?;
760
761 components.consensus_adapter.submit_recovered(&epoch_store);
762
763 components.validator_server_handle = components.validator_server_handle.start().await;
765
766 Some(components)
767 } else {
768 None
769 };
770
771 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
773
774 let node = Self {
775 config,
776 validator_components: Mutex::new(validator_components),
777 _http_server: http_server,
778 state,
779 transaction_orchestrator,
780 registry_service,
781 metrics: iota_node_metrics,
782
783 _discovery: discovery_handle,
784 state_sync_handle,
785 randomness_handle,
786 checkpoint_store,
787 global_state_hasher: Mutex::new(Some(global_state_hasher)),
788 end_of_epoch_channel,
789 connection_monitor_status,
790 trusted_peer_change_tx,
791 backpressure_manager,
792
793 _db_checkpoint_handle: db_checkpoint_handle,
794
795 #[cfg(msim)]
796 sim_state: Default::default(),
797
798 _state_archive_handle: state_archive_handle,
799 _state_snapshot_uploader_handle: state_snapshot_handle,
800 shutdown_channel_tx: shutdown_channel,
801
802 grpc_server_handle: Mutex::new(grpc_server_handle),
803
804 auth_agg,
805 };
806
807 info!("IotaNode started!");
808 let node = Arc::new(node);
809 let node_copy = node.clone();
810 spawn_monitored_task!(async move {
811 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
812 if let Err(error) = result {
813 warn!("Reconfiguration finished with error {:?}", error);
814 }
815 });
816
817 Ok(node)
818 }
819
820 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
821 self.end_of_epoch_channel.subscribe()
822 }
823
824 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
825 self.shutdown_channel_tx.subscribe()
826 }
827
828 pub fn current_epoch_for_testing(&self) -> EpochId {
829 self.state.current_epoch_for_testing()
830 }
831
832 pub fn db_checkpoint_path(&self) -> PathBuf {
833 self.config.db_checkpoint_path()
834 }
835
836 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
838 info!("close_epoch (current epoch = {})", epoch_store.epoch());
839 self.validator_components
840 .lock()
841 .await
842 .as_ref()
843 .ok_or_else(|| IotaError::from("Node is not a validator"))?
844 .consensus_adapter
845 .close_epoch(epoch_store);
846 Ok(())
847 }
848
849 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
850 self.state
851 .clear_override_protocol_upgrade_buffer_stake(epoch)
852 }
853
854 pub fn set_override_protocol_upgrade_buffer_stake(
855 &self,
856 epoch: EpochId,
857 buffer_stake_bps: u64,
858 ) -> IotaResult {
859 self.state
860 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
861 }
862
863 pub async fn close_epoch_for_testing(&self) -> IotaResult {
866 let epoch_store = self.state.epoch_store_for_testing();
867 self.close_epoch(&epoch_store).await
868 }
869
870 async fn start_state_archival(
871 config: &NodeConfig,
872 prometheus_registry: &Registry,
873 state_sync_store: RocksDbStore,
874 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
875 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
876 let local_store_config = ObjectStoreConfig {
877 object_store: Some(ObjectStoreType::File),
878 directory: Some(config.archive_path()),
879 ..Default::default()
880 };
881 let archive_writer = ArchiveWriter::new(
882 local_store_config,
883 remote_store_config.clone(),
884 FileCompression::Zstd,
885 StorageFormat::Blob,
886 Duration::from_secs(600),
887 256 * 1024 * 1024,
888 prometheus_registry,
889 )
890 .await?;
891 Ok(Some(archive_writer.start(state_sync_store).await?))
892 } else {
893 Ok(None)
894 }
895 }
896
897 fn start_state_snapshot(
900 config: &NodeConfig,
901 prometheus_registry: &Registry,
902 checkpoint_store: Arc<CheckpointStore>,
903 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
904 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
905 let snapshot_uploader = StateSnapshotUploader::new(
906 &config.db_checkpoint_path(),
907 &config.snapshot_path(),
908 remote_store_config.clone(),
909 60,
910 prometheus_registry,
911 checkpoint_store,
912 )?;
913 Ok(Some(snapshot_uploader.start()))
914 } else {
915 Ok(None)
916 }
917 }
918
919 fn start_db_checkpoint(
920 config: &NodeConfig,
921 prometheus_registry: &Registry,
922 state_snapshot_enabled: bool,
923 ) -> Result<(
924 DBCheckpointConfig,
925 Option<tokio::sync::broadcast::Sender<()>>,
926 )> {
927 let checkpoint_path = Some(
928 config
929 .db_checkpoint_config
930 .checkpoint_path
931 .clone()
932 .unwrap_or_else(|| config.db_checkpoint_path()),
933 );
934 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
935 DBCheckpointConfig {
936 checkpoint_path,
937 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
938 true
939 } else {
940 config
941 .db_checkpoint_config
942 .perform_db_checkpoints_at_epoch_end
943 },
944 ..config.db_checkpoint_config.clone()
945 }
946 } else {
947 config.db_checkpoint_config.clone()
948 };
949
950 match (
951 db_checkpoint_config.object_store_config.as_ref(),
952 state_snapshot_enabled,
953 ) {
954 (None, false) => Ok((db_checkpoint_config, None)),
959 (_, _) => {
960 let handler = DBCheckpointHandler::new(
961 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
962 db_checkpoint_config.object_store_config.as_ref(),
963 60,
964 db_checkpoint_config
965 .prune_and_compact_before_upload
966 .unwrap_or(true),
967 config.authority_store_pruning_config.clone(),
968 prometheus_registry,
969 state_snapshot_enabled,
970 )?;
971 Ok((
972 db_checkpoint_config,
973 Some(DBCheckpointHandler::start(handler)),
974 ))
975 }
976 }
977 }
978
979 fn create_p2p_network(
980 config: &NodeConfig,
981 state_sync_store: RocksDbStore,
982 chain_identifier: ChainIdentifier,
983 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
984 archive_readers: ArchiveReaderBalancer,
985 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
986 prometheus_registry: &Registry,
987 ) -> Result<(
988 Network,
989 discovery::Handle,
990 state_sync::Handle,
991 randomness::Handle,
992 )> {
993 let (state_sync, state_sync_server) = state_sync::Builder::new()
994 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
995 .store(state_sync_store)
996 .archive_readers(archive_readers)
997 .with_metrics(prometheus_registry)
998 .build();
999
1000 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1001 .config(config.p2p_config.clone())
1002 .build();
1003
1004 let (randomness, randomness_router) =
1005 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1006 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1007 .with_metrics(prometheus_registry)
1008 .build();
1009
1010 let p2p_network = {
1011 let routes = anemo::Router::new()
1012 .add_rpc_service(discovery_server)
1013 .add_rpc_service(state_sync_server);
1014 let routes = routes.merge(randomness_router);
1015
1016 let inbound_network_metrics =
1017 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1018 let outbound_network_metrics =
1019 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1020
1021 let service = ServiceBuilder::new()
1022 .layer(
1023 TraceLayer::new_for_server_errors()
1024 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1025 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1026 )
1027 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1028 Arc::new(inbound_network_metrics),
1029 config.p2p_config.excessive_message_size(),
1030 )))
1031 .service(routes);
1032
1033 let outbound_layer = ServiceBuilder::new()
1034 .layer(
1035 TraceLayer::new_for_client_and_server_errors()
1036 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1037 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1038 )
1039 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1040 Arc::new(outbound_network_metrics),
1041 config.p2p_config.excessive_message_size(),
1042 )))
1043 .into_inner();
1044
1045 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1046 anemo_config.max_frame_size = Some(1 << 30);
1049
1050 let mut quic_config = anemo_config.quic.unwrap_or_default();
1053 if quic_config.socket_send_buffer_size.is_none() {
1054 quic_config.socket_send_buffer_size = Some(20 << 20);
1055 }
1056 if quic_config.socket_receive_buffer_size.is_none() {
1057 quic_config.socket_receive_buffer_size = Some(20 << 20);
1058 }
1059 quic_config.allow_failed_socket_buffer_size_setting = true;
1060
1061 if quic_config.max_concurrent_bidi_streams.is_none() {
1064 quic_config.max_concurrent_bidi_streams = Some(500);
1065 }
1066 if quic_config.max_concurrent_uni_streams.is_none() {
1067 quic_config.max_concurrent_uni_streams = Some(500);
1068 }
1069 if quic_config.stream_receive_window.is_none() {
1070 quic_config.stream_receive_window = Some(100 << 20);
1071 }
1072 if quic_config.receive_window.is_none() {
1073 quic_config.receive_window = Some(200 << 20);
1074 }
1075 if quic_config.send_window.is_none() {
1076 quic_config.send_window = Some(200 << 20);
1077 }
1078 if quic_config.crypto_buffer_size.is_none() {
1079 quic_config.crypto_buffer_size = Some(1 << 20);
1080 }
1081 if quic_config.max_idle_timeout_ms.is_none() {
1082 quic_config.max_idle_timeout_ms = Some(30_000);
1083 }
1084 if quic_config.keep_alive_interval_ms.is_none() {
1085 quic_config.keep_alive_interval_ms = Some(5_000);
1086 }
1087 anemo_config.quic = Some(quic_config);
1088
1089 let server_name = format!("iota-{chain_identifier}");
1090 let network = Network::bind(config.p2p_config.listen_address)
1091 .server_name(&server_name)
1092 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1093 .config(anemo_config)
1094 .outbound_request_layer(outbound_layer)
1095 .start(service)?;
1096 info!(
1097 server_name = server_name,
1098 "P2p network started on {}",
1099 network.local_addr()
1100 );
1101
1102 network
1103 };
1104
1105 let discovery_handle =
1106 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1107 let state_sync_handle = state_sync.start(p2p_network.clone());
1108 let randomness_handle = randomness.start(p2p_network.clone());
1109
1110 Ok((
1111 p2p_network,
1112 discovery_handle,
1113 state_sync_handle,
1114 randomness_handle,
1115 ))
1116 }
1117
1118 async fn construct_validator_components(
1121 config: NodeConfig,
1122 state: Arc<AuthorityState>,
1123 committee: Arc<Committee>,
1124 epoch_store: Arc<AuthorityPerEpochStore>,
1125 checkpoint_store: Arc<CheckpointStore>,
1126 state_sync_handle: state_sync::Handle,
1127 randomness_handle: randomness::Handle,
1128 global_state_hasher: Weak<GlobalStateHasher>,
1129 backpressure_manager: Arc<BackpressureManager>,
1130 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1131 registry_service: &RegistryService,
1132 ) -> Result<ValidatorComponents> {
1133 let mut config_clone = config.clone();
1134 let consensus_config = config_clone
1135 .consensus_config
1136 .as_mut()
1137 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1138 let validator_registry = Registry::new();
1139 let validator_registry_id = registry_service.add(validator_registry.clone());
1140
1141 let client = Arc::new(UpdatableConsensusClient::new());
1142 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1143 &committee,
1144 consensus_config,
1145 state.name,
1146 connection_monitor_status.clone(),
1147 &validator_registry,
1148 client.clone(),
1149 checkpoint_store.clone(),
1150 ));
1151 let consensus_manager = ConsensusManager::new(
1152 &config,
1153 consensus_config,
1154 registry_service,
1155 &validator_registry,
1156 client,
1157 );
1158
1159 let consensus_store_pruner = ConsensusStorePruner::new(
1162 consensus_manager.get_storage_base_path(),
1163 consensus_config.db_retention_epochs(),
1164 consensus_config.db_pruner_period(),
1165 &validator_registry,
1166 );
1167
1168 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1169 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1170
1171 let validator_server_handle = Self::start_grpc_validator_service(
1172 &config,
1173 state.clone(),
1174 consensus_adapter.clone(),
1175 &validator_registry,
1176 )
1177 .await?;
1178
1179 let validator_overload_monitor_handle = if config
1182 .authority_overload_config
1183 .max_load_shedding_percentage
1184 > 0
1185 {
1186 let authority_state = Arc::downgrade(&state);
1187 let overload_config = config.authority_overload_config.clone();
1188 fail_point!("starting_overload_monitor");
1189 Some(spawn_monitored_task!(overload_monitor(
1190 authority_state,
1191 overload_config,
1192 )))
1193 } else {
1194 None
1195 };
1196
1197 Self::start_epoch_specific_validator_components(
1198 &config,
1199 state.clone(),
1200 consensus_adapter,
1201 checkpoint_store,
1202 epoch_store,
1203 state_sync_handle,
1204 randomness_handle,
1205 consensus_manager,
1206 consensus_store_pruner,
1207 global_state_hasher,
1208 backpressure_manager,
1209 validator_server_handle,
1210 validator_overload_monitor_handle,
1211 checkpoint_metrics,
1212 iota_tx_validator_metrics,
1213 validator_registry_id,
1214 )
1215 .await
1216 }
1217
1218 async fn start_epoch_specific_validator_components(
1221 config: &NodeConfig,
1222 state: Arc<AuthorityState>,
1223 consensus_adapter: Arc<ConsensusAdapter>,
1224 checkpoint_store: Arc<CheckpointStore>,
1225 epoch_store: Arc<AuthorityPerEpochStore>,
1226 state_sync_handle: state_sync::Handle,
1227 randomness_handle: randomness::Handle,
1228 consensus_manager: ConsensusManager,
1229 consensus_store_pruner: ConsensusStorePruner,
1230 global_state_hasher: Weak<GlobalStateHasher>,
1231 backpressure_manager: Arc<BackpressureManager>,
1232 validator_server_handle: SpawnOnce,
1233 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1234 checkpoint_metrics: Arc<CheckpointMetrics>,
1235 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1236 validator_registry_id: RegistryID,
1237 ) -> Result<ValidatorComponents> {
1238 let checkpoint_service = Self::build_checkpoint_service(
1239 config,
1240 consensus_adapter.clone(),
1241 checkpoint_store.clone(),
1242 epoch_store.clone(),
1243 state.clone(),
1244 state_sync_handle,
1245 global_state_hasher,
1246 checkpoint_metrics.clone(),
1247 );
1248
1249 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1254
1255 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1256
1257 let randomness_manager = RandomnessManager::try_new(
1258 Arc::downgrade(&epoch_store),
1259 Box::new(consensus_adapter.clone()),
1260 randomness_handle,
1261 config.authority_key_pair(),
1262 )
1263 .await;
1264 if let Some(randomness_manager) = randomness_manager {
1265 epoch_store
1266 .set_randomness_manager(randomness_manager)
1267 .await?;
1268 }
1269
1270 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1271 state.clone(),
1272 checkpoint_service.clone(),
1273 epoch_store.clone(),
1274 low_scoring_authorities,
1275 backpressure_manager,
1276 );
1277
1278 info!("Starting consensus manager");
1279
1280 consensus_manager
1281 .start(
1282 config,
1283 epoch_store.clone(),
1284 consensus_handler_initializer,
1285 IotaTxValidator::new(
1286 epoch_store.clone(),
1287 checkpoint_service.clone(),
1288 state.transaction_manager().clone(),
1289 iota_tx_validator_metrics.clone(),
1290 ),
1291 )
1292 .await;
1293
1294 info!("Spawning checkpoint service");
1295 let checkpoint_service_tasks = checkpoint_service.spawn().await;
1296
1297 Ok(ValidatorComponents {
1298 validator_server_handle,
1299 validator_overload_monitor_handle,
1300 consensus_manager,
1301 consensus_store_pruner,
1302 consensus_adapter,
1303 checkpoint_service_tasks,
1304 checkpoint_metrics,
1305 iota_tx_validator_metrics,
1306 validator_registry_id,
1307 })
1308 }
1309
1310 fn build_checkpoint_service(
1317 config: &NodeConfig,
1318 consensus_adapter: Arc<ConsensusAdapter>,
1319 checkpoint_store: Arc<CheckpointStore>,
1320 epoch_store: Arc<AuthorityPerEpochStore>,
1321 state: Arc<AuthorityState>,
1322 state_sync_handle: state_sync::Handle,
1323 global_state_hasher: Weak<GlobalStateHasher>,
1324 checkpoint_metrics: Arc<CheckpointMetrics>,
1325 ) -> Arc<CheckpointService> {
1326 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1327 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1328
1329 debug!(
1330 "Starting checkpoint service with epoch start timestamp {}
1331 and epoch duration {}",
1332 epoch_start_timestamp_ms, epoch_duration_ms
1333 );
1334
1335 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1336 sender: consensus_adapter,
1337 signer: state.secret.clone(),
1338 authority: config.authority_public_key(),
1339 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1340 .checked_add(epoch_duration_ms)
1341 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1342 metrics: checkpoint_metrics.clone(),
1343 });
1344
1345 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1346 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1347 let max_checkpoint_size_bytes =
1348 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1349
1350 CheckpointService::build(
1351 state.clone(),
1352 checkpoint_store,
1353 epoch_store,
1354 state.get_transaction_cache_reader().clone(),
1355 global_state_hasher,
1356 checkpoint_output,
1357 Box::new(certified_checkpoint_output),
1358 checkpoint_metrics,
1359 max_tx_per_checkpoint,
1360 max_checkpoint_size_bytes,
1361 )
1362 }
1363
1364 fn construct_consensus_adapter(
1365 committee: &Committee,
1366 consensus_config: &ConsensusConfig,
1367 authority: AuthorityName,
1368 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1369 prometheus_registry: &Registry,
1370 consensus_client: Arc<dyn ConsensusClient>,
1371 checkpoint_store: Arc<CheckpointStore>,
1372 ) -> ConsensusAdapter {
1373 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1374 ConsensusAdapter::new(
1378 consensus_client,
1379 checkpoint_store,
1380 authority,
1381 connection_monitor_status,
1382 consensus_config.max_pending_transactions(),
1383 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1384 consensus_config.max_submit_position,
1385 consensus_config.submit_delay_step_override(),
1386 ca_metrics,
1387 )
1388 }
1389
1390 async fn start_grpc_validator_service(
1391 config: &NodeConfig,
1392 state: Arc<AuthorityState>,
1393 consensus_adapter: Arc<ConsensusAdapter>,
1394 prometheus_registry: &Registry,
1395 ) -> Result<SpawnOnce> {
1396 let validator_service = ValidatorService::new(
1397 state,
1398 consensus_adapter,
1399 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1400 TrafficControllerMetrics::new(prometheus_registry),
1401 config.policy_config.clone(),
1402 config.firewall_config.clone(),
1403 );
1404
1405 let mut server_conf = iota_network_stack::config::Config::new();
1406 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1407 server_conf.load_shed = config.grpc_load_shed;
1408 let server_builder =
1409 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1410 .add_service(ValidatorServer::new(validator_service));
1411
1412 let tls_config = iota_tls::create_rustls_server_config(
1413 config.network_key_pair().copy().private(),
1414 IOTA_TLS_SERVER_NAME.to_string(),
1415 );
1416
1417 let network_address = config.network_address().clone();
1418
1419 let bind_future = async move {
1420 let server = server_builder
1421 .bind(&network_address, Some(tls_config))
1422 .await
1423 .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1424
1425 let local_addr = server.local_addr();
1426 info!("Listening to traffic on {local_addr}");
1427
1428 Ok(server)
1429 };
1430
1431 Ok(SpawnOnce::new(bind_future))
1432 }
1433
1434 async fn reexecute_pending_consensus_certs(
1452 epoch_store: &Arc<AuthorityPerEpochStore>,
1453 state: &Arc<AuthorityState>,
1454 ) {
1455 let mut pending_consensus_certificates = Vec::new();
1456 let mut additional_certs = Vec::new();
1457
1458 for tx in epoch_store.get_all_pending_consensus_transactions() {
1459 match tx.kind {
1460 ConsensusTransactionKind::CertifiedTransaction(tx)
1463 if !tx.contains_shared_object() =>
1464 {
1465 let tx = *tx;
1466 let tx = VerifiedExecutableTransaction::new_from_certificate(
1469 VerifiedCertificate::new_unchecked(tx),
1470 );
1471 if let Some(fx_digest) = epoch_store
1474 .get_signed_effects_digest(tx.digest())
1475 .expect("db error")
1476 {
1477 pending_consensus_certificates.push((tx, fx_digest));
1478 } else {
1479 additional_certs.push(tx);
1480 }
1481 }
1482 _ => (),
1483 }
1484 }
1485
1486 let digests = pending_consensus_certificates
1487 .iter()
1488 .map(|(tx, _)| *tx.digest())
1489 .collect::<Vec<_>>();
1490
1491 info!(
1492 "reexecuting {} pending consensus certificates: {:?}",
1493 digests.len(),
1494 digests
1495 );
1496
1497 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1498 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1499
1500 let timeout = if cfg!(msim) { 120 } else { 60 };
1506 if tokio::time::timeout(
1507 std::time::Duration::from_secs(timeout),
1508 state
1509 .get_transaction_cache_reader()
1510 .try_notify_read_executed_effects_digests(&digests),
1511 )
1512 .await
1513 .is_err()
1514 {
1515 if let Ok(executed_effects_digests) = state
1517 .get_transaction_cache_reader()
1518 .try_multi_get_executed_effects_digests(&digests)
1519 {
1520 let pending_digests = digests
1521 .iter()
1522 .zip(executed_effects_digests.iter())
1523 .filter_map(|(digest, executed_effects_digest)| {
1524 if executed_effects_digest.is_none() {
1525 Some(digest)
1526 } else {
1527 None
1528 }
1529 })
1530 .collect::<Vec<_>>();
1531 debug_fatal!(
1532 "Timed out waiting for effects digests to be executed: {:?}",
1533 pending_digests
1534 );
1535 } else {
1536 debug_fatal!(
1537 "Timed out waiting for effects digests to be executed, digests not found"
1538 );
1539 }
1540 }
1541 }
1542
1543 pub fn state(&self) -> Arc<AuthorityState> {
1544 self.state.clone()
1545 }
1546
1547 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1549 self.state.reference_gas_price_for_testing()
1550 }
1551
1552 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1553 self.state.committee_store().clone()
1554 }
1555
1556 pub fn clone_authority_aggregator(
1566 &self,
1567 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1568 self.transaction_orchestrator
1569 .as_ref()
1570 .map(|to| to.clone_authority_aggregator())
1571 }
1572
1573 pub fn transaction_orchestrator(
1574 &self,
1575 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1576 self.transaction_orchestrator.clone()
1577 }
1578
1579 pub fn subscribe_to_transaction_orchestrator_effects(
1580 &self,
1581 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1582 self.transaction_orchestrator
1583 .as_ref()
1584 .map(|to| to.subscribe_to_effects_queue())
1585 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1586 }
1587
1588 pub async fn monitor_reconfiguration(
1594 self: Arc<Self>,
1595 mut epoch_store: Arc<AuthorityPerEpochStore>,
1596 ) -> Result<()> {
1597 let checkpoint_executor_metrics =
1598 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1599
1600 loop {
1601 let mut hasher_guard = self.global_state_hasher.lock().await;
1602 let hasher = hasher_guard.take().unwrap();
1603 info!(
1604 "Creating checkpoint executor for epoch {}",
1605 epoch_store.epoch()
1606 );
1607
1608 let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1610 guard.as_ref().map(|handle| {
1611 let tx = handle.checkpoint_data_broadcaster().clone();
1612 Box::new(move |data: &CheckpointData| {
1613 tx.send_traced(data);
1614 }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1615 })
1616 } else {
1617 None
1618 };
1619
1620 let checkpoint_executor = CheckpointExecutor::new(
1621 epoch_store.clone(),
1622 self.checkpoint_store.clone(),
1623 self.state.clone(),
1624 hasher.clone(),
1625 self.backpressure_manager.clone(),
1626 self.config.checkpoint_executor_config.clone(),
1627 checkpoint_executor_metrics.clone(),
1628 data_sender,
1629 );
1630
1631 let run_with_range = self.config.run_with_range;
1632
1633 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1634
1635 self.metrics
1637 .current_protocol_version
1638 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1639
1640 if let Some(components) = &*self.validator_components.lock().await {
1642 tokio::time::sleep(Duration::from_millis(1)).await;
1644
1645 let config = cur_epoch_store.protocol_config();
1646 let binary_config = to_binary_config(config);
1647 let transaction = ConsensusTransaction::new_capability_notification_v1(
1648 AuthorityCapabilitiesV1::new(
1649 self.state.name,
1650 cur_epoch_store.get_chain_identifier().chain(),
1651 self.config
1652 .supported_protocol_versions
1653 .expect("Supported versions should be populated")
1654 .truncate_below(config.version),
1656 self.state
1657 .get_available_system_packages(&binary_config)
1658 .await,
1659 ),
1660 );
1661 info!(?transaction, "submitting capabilities to consensus");
1662 components
1663 .consensus_adapter
1664 .submit(transaction, None, &cur_epoch_store)?;
1665 } else if self.state.is_active_validator(&cur_epoch_store)
1666 && cur_epoch_store
1667 .protocol_config()
1668 .track_non_committee_eligible_validators()
1669 {
1670 let epoch_store = cur_epoch_store.clone();
1674 let node_clone = self.clone();
1675 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1676 node_clone
1677 .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1678 .instrument(trace_span!(
1679 "send_signed_capability_notification_to_committee_with_retry"
1680 ))
1681 .await;
1682 }));
1683 }
1684
1685 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1686
1687 if stop_condition == StopReason::RunWithRangeCondition {
1688 IotaNode::shutdown(&self).await;
1689 self.shutdown_channel_tx
1690 .send(run_with_range)
1691 .expect("RunWithRangeCondition met but failed to send shutdown message");
1692 return Ok(());
1693 }
1694
1695 let latest_system_state = self
1697 .state
1698 .get_object_cache_reader()
1699 .try_get_iota_system_state_object_unsafe()
1700 .expect("Read IOTA System State object cannot fail");
1701
1702 #[cfg(msim)]
1703 if !self
1704 .sim_state
1705 .sim_safe_mode_expected
1706 .load(Ordering::Relaxed)
1707 {
1708 debug_assert!(!latest_system_state.safe_mode());
1709 }
1710
1711 #[cfg(not(msim))]
1712 debug_assert!(!latest_system_state.safe_mode());
1713
1714 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1715 if self.state.is_fullnode(&cur_epoch_store) {
1716 warn!(
1717 "Failed to send end of epoch notification to subscriber: {:?}",
1718 err
1719 );
1720 }
1721 }
1722
1723 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1724 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1725
1726 self.auth_agg.store(Arc::new(
1727 self.auth_agg
1728 .load()
1729 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1730 ));
1731
1732 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1733 let next_epoch = next_epoch_committee.epoch();
1734 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1735
1736 info!(
1737 next_epoch,
1738 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1739 );
1740
1741 fail_point_async!("reconfig_delay");
1742
1743 let authority_names_to_peer_ids =
1748 new_epoch_start_state.get_authority_names_to_peer_ids();
1749 self.connection_monitor_status
1750 .update_mapping_for_epoch(authority_names_to_peer_ids);
1751
1752 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1753
1754 send_trusted_peer_change(
1755 &self.config,
1756 &self.trusted_peer_change_tx,
1757 &new_epoch_start_state,
1758 );
1759
1760 let mut validator_components_lock_guard = self.validator_components.lock().await;
1761
1762 let new_epoch_store = self
1766 .reconfigure_state(
1767 &self.state,
1768 &cur_epoch_store,
1769 next_epoch_committee.clone(),
1770 new_epoch_start_state,
1771 hasher.clone(),
1772 )
1773 .await?;
1774
1775 let new_validator_components = if let Some(ValidatorComponents {
1776 validator_server_handle,
1777 validator_overload_monitor_handle,
1778 consensus_manager,
1779 consensus_store_pruner,
1780 consensus_adapter,
1781 mut checkpoint_service_tasks,
1782 checkpoint_metrics,
1783 iota_tx_validator_metrics,
1784 validator_registry_id,
1785 }) = validator_components_lock_guard.take()
1786 {
1787 info!("Reconfiguring the validator.");
1788 checkpoint_service_tasks.abort_all();
1793 while let Some(result) = checkpoint_service_tasks.join_next().await {
1794 if let Err(err) = result {
1795 if err.is_panic() {
1796 std::panic::resume_unwind(err.into_panic());
1797 }
1798 warn!("Error in checkpoint service task: {:?}", err);
1799 }
1800 }
1801 info!("Checkpoint service has shut down.");
1802
1803 consensus_manager.shutdown().await;
1804 info!("Consensus has shut down.");
1805
1806 info!("Epoch store finished reconfiguration.");
1807
1808 let global_state_hasher_metrics = Arc::into_inner(hasher)
1811 .expect("Object state hasher should have no other references at this point")
1812 .metrics();
1813 let new_hasher = Arc::new(GlobalStateHasher::new(
1814 self.state.get_global_state_hash_store().clone(),
1815 global_state_hasher_metrics,
1816 ));
1817 let weak_hasher = Arc::downgrade(&new_hasher);
1818 *hasher_guard = Some(new_hasher);
1819
1820 consensus_store_pruner.prune(next_epoch).await;
1821
1822 if self.state.is_committee_validator(&new_epoch_store) {
1823 Some(
1825 Self::start_epoch_specific_validator_components(
1826 &self.config,
1827 self.state.clone(),
1828 consensus_adapter,
1829 self.checkpoint_store.clone(),
1830 new_epoch_store.clone(),
1831 self.state_sync_handle.clone(),
1832 self.randomness_handle.clone(),
1833 consensus_manager,
1834 consensus_store_pruner,
1835 weak_hasher,
1836 self.backpressure_manager.clone(),
1837 validator_server_handle,
1838 validator_overload_monitor_handle,
1839 checkpoint_metrics,
1840 iota_tx_validator_metrics,
1841 validator_registry_id,
1842 )
1843 .await?,
1844 )
1845 } else {
1846 info!("This node is no longer a validator after reconfiguration");
1847 if self.registry_service.remove(validator_registry_id) {
1848 debug!("Removed validator metrics registry");
1849 } else {
1850 warn!("Failed to remove validator metrics registry");
1851 }
1852 validator_server_handle.shutdown();
1853 debug!("Validator grpc server shutdown triggered");
1854
1855 None
1856 }
1857 } else {
1858 let global_state_hasher_metrics = Arc::into_inner(hasher)
1861 .expect("Object state hasher should have no other references at this point")
1862 .metrics();
1863 let new_hasher = Arc::new(GlobalStateHasher::new(
1864 self.state.get_global_state_hash_store().clone(),
1865 global_state_hasher_metrics,
1866 ));
1867 let weak_hasher = Arc::downgrade(&new_hasher);
1868 *hasher_guard = Some(new_hasher);
1869
1870 if self.state.is_committee_validator(&new_epoch_store) {
1871 info!("Promoting the node from fullnode to validator, starting grpc server");
1872
1873 let mut components = Self::construct_validator_components(
1874 self.config.clone(),
1875 self.state.clone(),
1876 Arc::new(next_epoch_committee.clone()),
1877 new_epoch_store.clone(),
1878 self.checkpoint_store.clone(),
1879 self.state_sync_handle.clone(),
1880 self.randomness_handle.clone(),
1881 weak_hasher,
1882 self.backpressure_manager.clone(),
1883 self.connection_monitor_status.clone(),
1884 &self.registry_service,
1885 )
1886 .await?;
1887
1888 components.validator_server_handle =
1889 components.validator_server_handle.start().await;
1890
1891 Some(components)
1892 } else {
1893 None
1894 }
1895 };
1896 *validator_components_lock_guard = new_validator_components;
1897
1898 cur_epoch_store.release_db_handles();
1901
1902 if cfg!(msim)
1903 && !matches!(
1904 self.config
1905 .authority_store_pruning_config
1906 .num_epochs_to_retain_for_checkpoints(),
1907 None | Some(u64::MAX) | Some(0)
1908 )
1909 {
1910 self.state
1911 .prune_checkpoints_for_eligible_epochs_for_testing(
1912 self.config.clone(),
1913 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1914 )
1915 .await?;
1916 }
1917
1918 epoch_store = new_epoch_store;
1919 info!("Reconfiguration finished");
1920 }
1921 }
1922
1923 async fn shutdown(&self) {
1924 if let Some(validator_components) = &*self.validator_components.lock().await {
1925 validator_components.consensus_manager.shutdown().await;
1926 }
1927
1928 if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
1930 info!("Shutting down gRPC server");
1931 if let Err(e) = grpc_handle.shutdown().await {
1932 warn!("Failed to gracefully shutdown gRPC server: {e}");
1933 }
1934 }
1935 }
1936
1937 async fn reconfigure_state(
1940 &self,
1941 state: &Arc<AuthorityState>,
1942 cur_epoch_store: &AuthorityPerEpochStore,
1943 next_epoch_committee: Committee,
1944 next_epoch_start_system_state: EpochStartSystemState,
1945 global_state_hasher: Arc<GlobalStateHasher>,
1946 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
1947 let next_epoch = next_epoch_committee.epoch();
1948
1949 let last_checkpoint = self
1950 .checkpoint_store
1951 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1952 .expect("Error loading last checkpoint for current epoch")
1953 .expect("Could not load last checkpoint for current epoch");
1954 let epoch_supply_change = last_checkpoint
1955 .end_of_epoch_data
1956 .as_ref()
1957 .ok_or_else(|| {
1958 IotaError::from("last checkpoint in epoch should contain end of epoch data")
1959 })?
1960 .epoch_supply_change;
1961
1962 let last_checkpoint_seq = *last_checkpoint.sequence_number();
1963
1964 assert_eq!(
1965 Some(last_checkpoint_seq),
1966 self.checkpoint_store
1967 .get_highest_executed_checkpoint_seq_number()
1968 .expect("Error loading highest executed checkpoint sequence number")
1969 );
1970
1971 let epoch_start_configuration = EpochStartConfiguration::new(
1972 next_epoch_start_system_state,
1973 *last_checkpoint.digest(),
1974 state.get_object_store().as_ref(),
1975 EpochFlag::default_flags_for_new_epoch(&state.config),
1976 )
1977 .expect("EpochStartConfiguration construction cannot fail");
1978
1979 let new_epoch_store = self
1980 .state
1981 .reconfigure(
1982 cur_epoch_store,
1983 self.config.supported_protocol_versions.unwrap(),
1984 next_epoch_committee,
1985 epoch_start_configuration,
1986 global_state_hasher,
1987 &self.config.expensive_safety_check_config,
1988 epoch_supply_change,
1989 last_checkpoint_seq,
1990 )
1991 .await
1992 .expect("Reconfigure authority state cannot fail");
1993 info!(next_epoch, "Node State has been reconfigured");
1994 assert_eq!(next_epoch, new_epoch_store.epoch());
1995 self.state.get_reconfig_api().update_epoch_flags_metrics(
1996 cur_epoch_store.epoch_start_config().flags(),
1997 new_epoch_store.epoch_start_config().flags(),
1998 );
1999
2000 Ok(new_epoch_store)
2001 }
2002
2003 pub fn get_config(&self) -> &NodeConfig {
2004 &self.config
2005 }
2006
2007 async fn execute_transaction_immediately_at_zero_epoch(
2008 state: &Arc<AuthorityState>,
2009 epoch_store: &Arc<AuthorityPerEpochStore>,
2010 tx: &Transaction,
2011 span: tracing::Span,
2012 ) {
2013 let _guard = span.enter();
2014 let transaction =
2015 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2016 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2017 tx.data().clone(),
2018 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2019 ),
2020 );
2021 state
2022 .try_execute_immediately(&transaction, None, epoch_store)
2023 .unwrap();
2024 }
2025
2026 pub fn randomness_handle(&self) -> randomness::Handle {
2027 self.randomness_handle.clone()
2028 }
2029
2030 async fn send_signed_capability_notification_to_committee_with_retry(
2036 &self,
2037 epoch_store: &Arc<AuthorityPerEpochStore>,
2038 ) {
2039 const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2040 const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2041 const MAX_RETRY_INTERVAL_SECS: u64 = 300; let config = epoch_store.protocol_config();
2045 let binary_config = to_binary_config(config);
2046
2047 let capabilities = AuthorityCapabilitiesV1::new(
2049 self.state.name,
2050 epoch_store.get_chain_identifier().chain(),
2051 self.config
2052 .supported_protocol_versions
2053 .expect("Supported versions should be populated")
2054 .truncate_below(config.version),
2055 self.state
2056 .get_available_system_packages(&binary_config)
2057 .await,
2058 );
2059
2060 let signature = AuthoritySignature::new_secure(
2062 &IntentMessage::new(
2063 Intent::iota_app(IntentScope::AuthorityCapabilities),
2064 &capabilities,
2065 ),
2066 &epoch_store.epoch(),
2067 self.config.authority_key_pair(),
2068 );
2069
2070 let request = HandleCapabilityNotificationRequestV1 {
2071 message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2072 };
2073
2074 let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2075
2076 loop {
2077 let auth_agg = self.auth_agg.load();
2078 match auth_agg
2079 .send_capability_notification_to_quorum(request.clone())
2080 .await
2081 {
2082 Ok(_) => {
2083 info!("Successfully sent capability notification to committee");
2084 break;
2085 }
2086 Err(err) => {
2087 match &err {
2088 AggregatorSendCapabilityNotificationError::RetryableNotification {
2089 errors,
2090 } => {
2091 warn!(
2092 "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2093 retry_interval, errors
2094 );
2095 }
2096 AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2097 errors,
2098 } => {
2099 error!(
2100 "Failed to send capability notification to committee (non-retryable error): {:?}",
2101 errors
2102 );
2103 break;
2104 }
2105 };
2106
2107 tokio::time::sleep(retry_interval).await;
2109
2110 retry_interval = std::cmp::min(
2112 retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2113 Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2114 );
2115 }
2116 }
2117 }
2118 }
2119}
2120
2121#[cfg(msim)]
2122impl IotaNode {
2123 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2124 self.sim_state.sim_node.id()
2125 }
2126
2127 pub fn set_safe_mode_expected(&self, new_value: bool) {
2128 info!("Setting safe mode expected to {}", new_value);
2129 self.sim_state
2130 .sim_safe_mode_expected
2131 .store(new_value, Ordering::Relaxed);
2132 }
2133}
2134
2135enum SpawnOnce {
2136 Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2138 #[allow(unused)]
2139 Started(iota_http::ServerHandle),
2140}
2141
2142impl SpawnOnce {
2143 pub fn new(
2144 future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2145 ) -> Self {
2146 Self::Unstarted(Mutex::new(Box::pin(future)))
2147 }
2148
2149 pub async fn start(self) -> Self {
2150 match self {
2151 Self::Unstarted(future) => {
2152 let server = future
2153 .into_inner()
2154 .await
2155 .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2156 let handle = server.handle().clone();
2157 tokio::spawn(async move {
2158 if let Err(err) = server.serve().await {
2159 info!("Server stopped: {err}");
2160 }
2161 info!("Server stopped");
2162 });
2163 Self::Started(handle)
2164 }
2165 Self::Started(_) => self,
2166 }
2167 }
2168
2169 pub fn shutdown(self) {
2170 if let SpawnOnce::Started(handle) = self {
2171 handle.trigger_shutdown();
2172 }
2173 }
2174}
2175
2176fn send_trusted_peer_change(
2179 config: &NodeConfig,
2180 sender: &watch::Sender<TrustedPeerChangeEvent>,
2181 new_epoch_start_state: &EpochStartSystemState,
2182) {
2183 let new_committee =
2184 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2185
2186 sender.send_modify(|event| {
2187 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2188 event.new_committee = new_committee;
2189 })
2190}
2191
2192fn build_kv_store(
2193 state: &Arc<AuthorityState>,
2194 config: &NodeConfig,
2195 registry: &Registry,
2196) -> Result<Arc<TransactionKeyValueStore>> {
2197 let metrics = KeyValueStoreMetrics::new(registry);
2198 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2199
2200 let base_url = &config.transaction_kv_store_read_config.base_url;
2201
2202 if base_url.is_empty() {
2203 info!("no http kv store url provided, using local db only");
2204 return Ok(Arc::new(db_store));
2205 }
2206
2207 base_url.parse::<url::Url>().tap_err(|e| {
2208 error!(
2209 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2210 base_url, e
2211 )
2212 })?;
2213
2214 let http_store = HttpKVStore::new_kv(
2215 base_url,
2216 config.transaction_kv_store_read_config.cache_size,
2217 metrics.clone(),
2218 )?;
2219 info!("using local key-value store with fallback to http key-value store");
2220 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2221 db_store,
2222 http_store,
2223 metrics,
2224 "json_rpc_fallback",
2225 )))
2226}
2227
2228async fn build_grpc_server(
2243 config: &NodeConfig,
2244 state: Arc<AuthorityState>,
2245 state_sync_store: RocksDbStore,
2246 executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2247 prometheus_registry: &Registry,
2248 server_version: ServerVersion,
2249) -> Result<Option<GrpcServerHandle>> {
2250 if config.consensus_config().is_some() || !config.enable_grpc_api {
2252 return Ok(None);
2253 }
2254
2255 let Some(grpc_config) = &config.grpc_api_config else {
2256 return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2257 };
2258
2259 let chain_id = state.get_chain_identifier();
2261
2262 let grpc_read_store = Arc::new(GrpcReadStore::new(state.clone(), state_sync_store));
2263
2264 let shutdown_token = CancellationToken::new();
2266
2267 let grpc_reader = Arc::new(GrpcReader::new(
2269 grpc_read_store,
2270 Some(server_version.to_string()),
2271 ));
2272
2273 let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2275
2276 let handle = start_grpc_server(
2277 grpc_reader,
2278 executor,
2279 grpc_config.clone(),
2280 shutdown_token,
2281 chain_id,
2282 Some(grpc_server_metrics),
2283 )
2284 .await?;
2285
2286 Ok(Some(handle))
2287}
2288
2289pub async fn build_http_server(
2304 state: Arc<AuthorityState>,
2305 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2306 config: &NodeConfig,
2307 prometheus_registry: &Registry,
2308) -> Result<Option<iota_http::ServerHandle>> {
2309 if config.consensus_config().is_some() {
2311 return Ok(None);
2312 }
2313
2314 let mut router = axum::Router::new();
2315
2316 let json_rpc_router = {
2317 let mut server = JsonRpcServerBuilder::new(
2318 env!("CARGO_PKG_VERSION"),
2319 prometheus_registry,
2320 config.policy_config.clone(),
2321 config.firewall_config.clone(),
2322 );
2323
2324 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2325
2326 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2327 server.register_module(ReadApi::new(
2328 state.clone(),
2329 kv_store.clone(),
2330 metrics.clone(),
2331 ))?;
2332 server.register_module(CoinReadApi::new(
2333 state.clone(),
2334 kv_store.clone(),
2335 metrics.clone(),
2336 )?)?;
2337
2338 if config.run_with_range.is_none() {
2341 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2342 }
2343 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2344
2345 if let Some(transaction_orchestrator) = transaction_orchestrator {
2346 server.register_module(TransactionExecutionApi::new(
2347 state.clone(),
2348 transaction_orchestrator.clone(),
2349 metrics.clone(),
2350 ))?;
2351 }
2352
2353 let iota_names_config = config
2354 .iota_names_config
2355 .clone()
2356 .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2357
2358 server.register_module(IndexerApi::new(
2359 state.clone(),
2360 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2361 kv_store,
2362 metrics,
2363 iota_names_config,
2364 config.indexer_max_subscriptions,
2365 ))?;
2366 server.register_module(MoveUtils::new(state.clone()))?;
2367
2368 let server_type = config.jsonrpc_server_type();
2369
2370 server.to_router(server_type).await?
2371 };
2372
2373 router = router.merge(json_rpc_router);
2374
2375 router = router
2376 .route("/health", axum::routing::get(health_check_handler))
2377 .route_layer(axum::Extension(state));
2378
2379 let layers = ServiceBuilder::new()
2380 .map_request(|mut request: axum::http::Request<_>| {
2381 if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2382 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2383 request.extensions_mut().insert(axum_connect_info);
2384 }
2385 request
2386 })
2387 .layer(axum::middleware::from_fn(server_timing_middleware));
2388
2389 router = router.layer(layers);
2390
2391 let handle = iota_http::Builder::new()
2392 .serve(&config.json_rpc_address, router)
2393 .map_err(|e| anyhow::anyhow!("{e}"))?;
2394 info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2395
2396 Ok(Some(handle))
2397}
2398
2399#[derive(Debug, serde::Serialize, serde::Deserialize)]
2400pub struct Threshold {
2401 pub threshold_seconds: Option<u32>,
2402}
2403
2404async fn health_check_handler(
2405 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2406 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2407) -> impl axum::response::IntoResponse {
2408 if let Some(threshold_seconds) = threshold_seconds {
2409 let summary = match state
2411 .get_checkpoint_store()
2412 .get_highest_executed_checkpoint()
2413 {
2414 Ok(Some(summary)) => summary,
2415 Ok(None) => {
2416 warn!("Highest executed checkpoint not found");
2417 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2418 }
2419 Err(err) => {
2420 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2421 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2422 }
2423 };
2424
2425 let latest_chain_time = summary.timestamp();
2427 let threshold =
2428 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2429
2430 if latest_chain_time < threshold {
2432 warn!(
2433 ?latest_chain_time,
2434 ?threshold,
2435 "failing health check due to checkpoint lag"
2436 );
2437 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2438 }
2439 }
2440 (axum::http::StatusCode::OK, "up")
2442}
2443
2444#[cfg(not(test))]
2445fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2446 protocol_config.max_transactions_per_checkpoint() as usize
2447}
2448
2449#[cfg(test)]
2450fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2451 2
2452}