1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::{BTreeSet, HashMap, HashSet},
9 fmt,
10 future::Future,
11 path::PathBuf,
12 str::FromStr,
13 sync::{Arc, Weak},
14 time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19 callback::CallbackLayer,
20 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::future::BoxFuture;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30 ConsensusConfig, NodeConfig,
31 node::{DBCheckpointConfig, RunWithRange},
32 node_config_metrics::NodeConfigMetrics,
33 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36 authority::{
37 AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38 authority_per_epoch_store::AuthorityPerEpochStore,
39 authority_store_pruner::ObjectsCompactionFilter,
40 authority_store_tables::{
41 AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42 },
43 backpressure::BackpressureManager,
44 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45 },
46 authority_aggregator::{
47 AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
48 },
49 authority_client::NetworkAuthorityClient,
50 authority_server::{ValidatorService, ValidatorServiceMetrics},
51 checkpoints::{
52 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
53 SubmitCheckpointToConsensus,
54 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
55 },
56 connection_monitor::ConnectionMonitor,
57 consensus_adapter::{
58 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
59 ConsensusClient,
60 },
61 consensus_handler::ConsensusHandlerInitializer,
62 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
63 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
64 db_checkpoint_handler::DBCheckpointHandler,
65 epoch::{
66 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
67 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
68 reconfiguration::ReconfigurationInitiator,
69 },
70 execution_cache::build_execution_cache,
71 jsonrpc_index::IndexStore,
72 module_cache_metrics::ResolverMetrics,
73 overload_monitor::overload_monitor,
74 rest_index::RestIndexStore,
75 safe_client::SafeClientMetricsBase,
76 signature_verifier::SignatureVerifierMetrics,
77 state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
78 storage::{RestReadStore, RocksDbStore},
79 traffic_controller::metrics::TrafficControllerMetrics,
80 transaction_orchestrator::TransactionOrchestrator,
81 validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87 transaction_builder_api::TransactionBuilderApi,
88 transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93 RegistryID, RegistryService,
94 hardware_metrics::register_hardware_metrics,
95 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96 server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
101};
102use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
103use iota_protocol_config::ProtocolConfig;
104use iota_rest_api::RestMetrics;
105use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
106use iota_snapshot::uploader::StateSnapshotUploader;
107use iota_storage::{
108 FileCompression, StorageFormat,
109 http_key_value_store::HttpKVStore,
110 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
111 key_value_store_metrics::KeyValueStoreMetrics,
112};
113use iota_types::{
114 base_types::{AuthorityName, ConciseableName, EpochId},
115 committee::Committee,
116 crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
117 digests::ChainIdentifier,
118 error::{IotaError, IotaResult},
119 executable_transaction::VerifiedExecutableTransaction,
120 execution_config_utils::to_binary_config,
121 full_checkpoint_content::CheckpointData,
122 iota_system_state::{
123 IotaSystemState, IotaSystemStateTrait,
124 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
125 },
126 messages_checkpoint::CertifiedCheckpointSummary,
127 messages_consensus::{
128 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
129 SignedAuthorityCapabilitiesV1, check_total_jwk_size,
130 },
131 messages_grpc::HandleCapabilityNotificationRequestV1,
132 quorum_driver_types::QuorumDriverEffectsQueueResult,
133 supported_protocol_versions::SupportedProtocolVersions,
134 transaction::{Transaction, VerifiedCertificate},
135};
136use prometheus::Registry;
137#[cfg(msim)]
138pub use simulator::set_jwk_injector;
139#[cfg(msim)]
140use simulator::*;
141use tap::tap::TapFallible;
142use tokio::{
143 runtime::Handle,
144 sync::{Mutex, broadcast, mpsc, watch},
145 task::{JoinHandle, JoinSet},
146};
147use tokio_util::sync::CancellationToken;
148use tower::ServiceBuilder;
149use tracing::{Instrument, debug, error, error_span, info, warn};
150use typed_store::{
151 DBMetrics,
152 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
153};
154
155use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
156
157pub mod admin;
158mod handle;
159pub mod metrics;
160
161pub struct ValidatorComponents {
162 validator_server_handle: SpawnOnce,
163 validator_overload_monitor_handle: Option<JoinHandle<()>>,
164 consensus_manager: ConsensusManager,
165 consensus_store_pruner: ConsensusStorePruner,
166 consensus_adapter: Arc<ConsensusAdapter>,
167 checkpoint_service_tasks: JoinSet<()>,
169 checkpoint_metrics: Arc<CheckpointMetrics>,
170 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
171 validator_registry_id: RegistryID,
172}
173
174#[cfg(msim)]
175mod simulator {
176 use std::sync::atomic::AtomicBool;
177
178 use super::*;
179
180 pub(super) struct SimState {
181 pub sim_node: iota_simulator::runtime::NodeHandle,
182 pub sim_safe_mode_expected: AtomicBool,
183 _leak_detector: iota_simulator::NodeLeakDetector,
184 }
185
186 impl Default for SimState {
187 fn default() -> Self {
188 Self {
189 sim_node: iota_simulator::runtime::NodeHandle::current(),
190 sim_safe_mode_expected: AtomicBool::new(false),
191 _leak_detector: iota_simulator::NodeLeakDetector::new(),
192 }
193 }
194 }
195
196 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
197 + Send
198 + Sync
199 + 'static;
200
201 fn default_fetch_jwks(
202 _authority: AuthorityName,
203 _provider: &OIDCProvider,
204 ) -> IotaResult<Vec<(JwkId, JWK)>> {
205 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
206 parse_jwks(
208 iota_types::zk_login_util::DEFAULT_JWK_BYTES,
209 &OIDCProvider::Twitch,
210 )
211 .map_err(|_| IotaError::JWKRetrieval)
212 }
213
214 thread_local! {
215 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
216 }
217
218 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
219 JWK_INJECTOR.with(|injector| injector.borrow().clone())
220 }
221
222 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
223 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
224 }
225}
226
227pub struct IotaNode {
228 config: NodeConfig,
229 validator_components: Mutex<Option<ValidatorComponents>>,
230 _http_server: Option<iota_http::ServerHandle>,
233 state: Arc<AuthorityState>,
234 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
235 registry_service: RegistryService,
236 metrics: Arc<IotaNodeMetrics>,
237
238 _discovery: discovery::Handle,
239 state_sync_handle: state_sync::Handle,
240 randomness_handle: randomness::Handle,
241 checkpoint_store: Arc<CheckpointStore>,
242 accumulator: Mutex<Option<Arc<StateAccumulator>>>,
243 connection_monitor_status: Arc<ConnectionMonitorStatus>,
244
245 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
247
248 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
251
252 backpressure_manager: Arc<BackpressureManager>,
253
254 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
255
256 #[cfg(msim)]
257 sim_state: SimState,
258
259 _state_archive_handle: Option<broadcast::Sender<()>>,
260
261 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
262 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
264
265 grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
267
268 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
274}
275
276impl fmt::Debug for IotaNode {
277 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
278 f.debug_struct("IotaNode")
279 .field("name", &self.state.name.concise())
280 .finish()
281 }
282}
283
284static MAX_JWK_KEYS_PER_FETCH: usize = 100;
285
286impl IotaNode {
287 pub async fn start(
288 config: NodeConfig,
289 registry_service: RegistryService,
290 custom_rpc_runtime: Option<Handle>,
291 ) -> Result<Arc<IotaNode>> {
292 Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
293 }
294
295 fn start_jwk_updater(
300 config: &NodeConfig,
301 metrics: Arc<IotaNodeMetrics>,
302 authority: AuthorityName,
303 epoch_store: Arc<AuthorityPerEpochStore>,
304 consensus_adapter: Arc<ConsensusAdapter>,
305 ) {
306 let epoch = epoch_store.epoch();
307
308 let supported_providers = config
309 .zklogin_oauth_providers
310 .get(&epoch_store.get_chain_identifier().chain())
311 .unwrap_or(&BTreeSet::new())
312 .iter()
313 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
314 .collect::<Vec<_>>();
315
316 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
317
318 info!(
319 ?fetch_interval,
320 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
321 );
322
323 fn validate_jwk(
324 metrics: &Arc<IotaNodeMetrics>,
325 provider: &OIDCProvider,
326 id: &JwkId,
327 jwk: &JWK,
328 ) -> bool {
329 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
330 warn!(
331 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
332 id.iss, provider
333 );
334 metrics
335 .invalid_jwks
336 .with_label_values(&[&provider.to_string()])
337 .inc();
338 return false;
339 };
340
341 if iss_provider != *provider {
342 warn!(
343 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
344 id.iss, provider, iss_provider
345 );
346 metrics
347 .invalid_jwks
348 .with_label_values(&[&provider.to_string()])
349 .inc();
350 return false;
351 }
352
353 if !check_total_jwk_size(id, jwk) {
354 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
355 metrics
356 .invalid_jwks
357 .with_label_values(&[&provider.to_string()])
358 .inc();
359 return false;
360 }
361
362 true
363 }
364
365 for p in supported_providers.into_iter() {
374 let provider_str = p.to_string();
375 let epoch_store = epoch_store.clone();
376 let consensus_adapter = consensus_adapter.clone();
377 let metrics = metrics.clone();
378 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
379 async move {
380 let mut seen = HashSet::new();
383 loop {
384 info!("fetching JWK for provider {:?}", p);
385 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
386 match Self::fetch_jwks(authority, &p).await {
387 Err(e) => {
388 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
389 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
390 tokio::time::sleep(Duration::from_secs(30)).await;
392 continue;
393 }
394 Ok(mut keys) => {
395 metrics.total_jwks
396 .with_label_values(&[&provider_str])
397 .inc_by(keys.len() as u64);
398
399 keys.retain(|(id, jwk)| {
400 validate_jwk(&metrics, &p, id, jwk) &&
401 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
402 seen.insert((id.clone(), jwk.clone()))
403 });
404
405 metrics.unique_jwks
406 .with_label_values(&[&provider_str])
407 .inc_by(keys.len() as u64);
408
409 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
412 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
413 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
414 }
415
416 for (id, jwk) in keys.into_iter() {
417 info!("Submitting JWK to consensus: {:?}", id);
418
419 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
420 consensus_adapter.submit(txn, None, &epoch_store)
421 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
422 .ok();
423 }
424 }
425 }
426 tokio::time::sleep(fetch_interval).await;
427 }
428 }
429 .instrument(error_span!("jwk_updater_task", epoch)),
430 ));
431 }
432 }
433
434 pub async fn start_async(
435 config: NodeConfig,
436 registry_service: RegistryService,
437 custom_rpc_runtime: Option<Handle>,
438 software_version: &'static str,
439 ) -> Result<Arc<IotaNode>> {
440 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
441 let mut config = config.clone();
442 if config.supported_protocol_versions.is_none() {
443 info!(
444 "populating config.supported_protocol_versions with default {:?}",
445 SupportedProtocolVersions::SYSTEM_DEFAULT
446 );
447 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
448 }
449
450 let run_with_range = config.run_with_range;
451 let is_validator = config.consensus_config().is_some();
452 let is_full_node = !is_validator;
453 let prometheus_registry = registry_service.default_registry();
454
455 info!(node =? config.authority_public_key(),
456 "Initializing iota-node listening on {}", config.network_address
457 );
458
459 let genesis = config.genesis()?.clone();
460
461 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
462 info!("IOTA chain identifier: {chain_identifier}");
463
464 let db_corrupted_path = &config.db_path().join("status");
466 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
467 panic!("Failed to check database corruption: {err}");
468 }
469
470 DBMetrics::init(&prometheus_registry);
472
473 iota_metrics::init_metrics(&prometheus_registry);
475 #[cfg(not(msim))]
478 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
479
480 register_hardware_metrics(®istry_service, &config.db_path)
482 .expect("Failed registering hardware metrics");
483 prometheus_registry
485 .register(iota_metrics::uptime_metric(
486 if is_validator {
487 "validator"
488 } else {
489 "fullnode"
490 },
491 software_version,
492 &chain_identifier.to_string(),
493 ))
494 .expect("Failed registering uptime metric");
495
496 let migration_tx_data = if genesis.contains_migrations() {
499 Some(config.load_migration_tx_data()?)
502 } else {
503 None
504 };
505
506 let secret = Arc::pin(config.authority_key_pair().copy());
507 let genesis_committee = genesis.committee()?;
508 let committee_store = Arc::new(CommitteeStore::new(
509 config.db_path().join("epochs"),
510 &genesis_committee,
511 None,
512 ));
513
514 let mut pruner_db = None;
515 if config
516 .authority_store_pruning_config
517 .enable_compaction_filter
518 {
519 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
520 &config.db_path().join("store"),
521 )));
522 }
523 let compaction_filter = pruner_db
524 .clone()
525 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
526
527 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
529 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
530 enable_write_stall,
531 compaction_filter,
532 };
533 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
534 &config.db_path().join("store"),
535 Some(perpetual_tables_options),
536 ));
537 let is_genesis = perpetual_tables
538 .database_is_empty()
539 .expect("Database read should not fail at init.");
540 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
541 let backpressure_manager =
542 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
543
544 let store = AuthorityStore::open(
545 perpetual_tables,
546 &genesis,
547 &config,
548 &prometheus_registry,
549 migration_tx_data.as_ref(),
550 )
551 .await?;
552
553 let cur_epoch = store.get_recovery_epoch_at_restart()?;
554 let committee = committee_store
555 .get_committee(&cur_epoch)?
556 .expect("Committee of the current epoch must exist");
557 let epoch_start_configuration = store
558 .get_epoch_start_configuration()?
559 .expect("EpochStartConfiguration of the current epoch must exist");
560 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
561 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
562
563 let cache_traits = build_execution_cache(
564 &config.execution_cache_config,
565 &epoch_start_configuration,
566 &prometheus_registry,
567 &store,
568 backpressure_manager.clone(),
569 );
570
571 let auth_agg = {
572 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
573 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
574 Arc::new(ArcSwap::new(Arc::new(
575 AuthorityAggregator::new_from_epoch_start_state(
576 epoch_start_configuration.epoch_start_state(),
577 &committee_store,
578 safe_client_metrics_base,
579 auth_agg_metrics,
580 ),
581 )))
582 };
583
584 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
585 let chain = match config.chain_override_for_testing {
586 Some(chain) => chain,
587 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
588 };
589
590 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
591 let epoch_store = AuthorityPerEpochStore::new(
592 config.authority_public_key(),
593 committee.clone(),
594 &config.db_path().join("store"),
595 Some(epoch_options.options),
596 EpochMetrics::new(®istry_service.default_registry()),
597 epoch_start_configuration,
598 cache_traits.backing_package_store.clone(),
599 cache_traits.object_store.clone(),
600 cache_metrics,
601 signature_verifier_metrics,
602 &config.expensive_safety_check_config,
603 (chain_id, chain),
604 checkpoint_store
605 .get_highest_executed_checkpoint_seq_number()
606 .expect("checkpoint store read cannot fail")
607 .unwrap_or(0),
608 )?;
609
610 info!("created epoch store");
611
612 replay_log!(
613 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
614 epoch_store.epoch(),
615 epoch_store.protocol_config()
616 );
617
618 if is_genesis {
620 info!("checking IOTA conservation at genesis");
621 cache_traits
626 .reconfig_api
627 .try_expensive_check_iota_conservation(&epoch_store, None)
628 .expect("IOTA conservation check cannot fail at genesis");
629 }
630
631 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
632 let default_buffer_stake = epoch_store
633 .protocol_config()
634 .buffer_stake_for_protocol_upgrade_bps();
635 if effective_buffer_stake != default_buffer_stake {
636 warn!(
637 ?effective_buffer_stake,
638 ?default_buffer_stake,
639 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
640 );
641 }
642
643 checkpoint_store.insert_genesis_checkpoint(
644 genesis.checkpoint(),
645 genesis.checkpoint_contents().clone(),
646 &epoch_store,
647 );
648
649 unmark_db_corruption(db_corrupted_path)?;
651
652 info!("creating state sync store");
653 let state_sync_store = RocksDbStore::new(
654 cache_traits.clone(),
655 committee_store.clone(),
656 checkpoint_store.clone(),
657 );
658
659 let index_store = if is_full_node && config.enable_index_processing {
660 info!("creating index store");
661 Some(Arc::new(IndexStore::new(
662 config.db_path().join("indexes"),
663 &prometheus_registry,
664 epoch_store
665 .protocol_config()
666 .max_move_identifier_len_as_option(),
667 config.remove_deprecated_tables,
668 )))
669 } else {
670 None
671 };
672
673 let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
674 {
675 Some(Arc::new(RestIndexStore::new(
676 config.db_path().join("rest_index"),
677 &store,
678 &checkpoint_store,
679 &epoch_store,
680 &cache_traits.backing_package_store,
681 )))
682 } else {
683 None
684 };
685
686 info!("creating archive reader");
687 let archive_readers =
692 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
693 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
694 let (randomness_tx, randomness_rx) = mpsc::channel(
695 config
696 .p2p_config
697 .randomness
698 .clone()
699 .unwrap_or_default()
700 .mailbox_capacity(),
701 );
702 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
703 Self::create_p2p_network(
704 &config,
705 state_sync_store.clone(),
706 chain_identifier,
707 trusted_peer_change_rx,
708 archive_readers.clone(),
709 randomness_tx,
710 &prometheus_registry,
711 )?;
712
713 send_trusted_peer_change(
716 &config,
717 &trusted_peer_change_tx,
718 epoch_store.epoch_start_state(),
719 );
720
721 info!("start state archival");
722 let state_archive_handle =
724 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
725 .await?;
726
727 info!("start snapshot upload");
728 let state_snapshot_handle =
730 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
731
732 info!("start db checkpoint");
734 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
735 &config,
736 &prometheus_registry,
737 state_snapshot_handle.is_some(),
738 )?;
739
740 let mut genesis_objects = genesis.objects().to_vec();
741 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
742 genesis_objects.extend(migration_tx_data.get_objects());
743 }
744
745 let authority_name = config.authority_public_key();
746 let validator_tx_finalizer =
747 config
748 .enable_validator_tx_finalizer
749 .then_some(Arc::new(ValidatorTxFinalizer::new(
750 auth_agg.clone(),
751 authority_name,
752 &prometheus_registry,
753 )));
754
755 info!("create authority state");
756 let state = AuthorityState::new(
757 authority_name,
758 secret,
759 config.supported_protocol_versions.unwrap(),
760 store.clone(),
761 cache_traits.clone(),
762 epoch_store.clone(),
763 committee_store.clone(),
764 index_store.clone(),
765 rest_index,
766 checkpoint_store.clone(),
767 &prometheus_registry,
768 &genesis_objects,
769 &db_checkpoint_config,
770 config.clone(),
771 archive_readers,
772 validator_tx_finalizer,
773 chain_identifier,
774 pruner_db,
775 )
776 .await;
777
778 if epoch_store.epoch() == 0 {
780 let genesis_tx = &genesis.transaction();
781 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
782 Self::execute_transaction_immediately_at_zero_epoch(
784 &state,
785 &epoch_store,
786 genesis_tx,
787 span,
788 )
789 .await;
790
791 if let Some(migration_tx_data) = migration_tx_data {
793 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
794 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
795 Self::execute_transaction_immediately_at_zero_epoch(
796 &state,
797 &epoch_store,
798 tx,
799 span,
800 )
801 .await;
802 }
803 }
804 }
805
806 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
809
810 if config
811 .expensive_safety_check_config
812 .enable_secondary_index_checks()
813 {
814 if let Some(indexes) = state.indexes.clone() {
815 iota_core::verify_indexes::verify_indexes(
816 state.get_accumulator_store().as_ref(),
817 indexes,
818 )
819 .expect("secondary indexes are inconsistent");
820 }
821 }
822
823 let (end_of_epoch_channel, end_of_epoch_receiver) =
824 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
825
826 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
827 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
828 auth_agg.load_full(),
829 state.clone(),
830 end_of_epoch_receiver,
831 &config.db_path(),
832 &prometheus_registry,
833 )))
834 } else {
835 None
836 };
837
838 let http_server = build_http_server(
839 state.clone(),
840 state_sync_store.clone(),
841 &transaction_orchestrator.clone(),
842 &config,
843 &prometheus_registry,
844 custom_rpc_runtime,
845 software_version,
846 )
847 .await?;
848
849 let accumulator = Arc::new(StateAccumulator::new(
850 cache_traits.accumulator_store.clone(),
851 StateAccumulatorMetrics::new(&prometheus_registry),
852 ));
853
854 let authority_names_to_peer_ids = epoch_store
855 .epoch_start_state()
856 .get_authority_names_to_peer_ids();
857
858 let network_connection_metrics =
859 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
860
861 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
862
863 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
864 p2p_network.downgrade(),
865 network_connection_metrics,
866 HashMap::new(),
867 None,
868 );
869
870 let connection_monitor_status = ConnectionMonitorStatus {
871 connection_statuses,
872 authority_names_to_peer_ids,
873 };
874
875 let connection_monitor_status = Arc::new(connection_monitor_status);
876 let iota_node_metrics =
877 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
878
879 let grpc_server_handle =
880 build_grpc_server(&config, state.clone(), state_sync_store.clone()).await?;
881
882 let validator_components = if state.is_committee_validator(&epoch_store) {
883 let (components, _) = futures::join!(
884 Self::construct_validator_components(
885 config.clone(),
886 state.clone(),
887 committee,
888 epoch_store.clone(),
889 checkpoint_store.clone(),
890 state_sync_handle.clone(),
891 randomness_handle.clone(),
892 Arc::downgrade(&accumulator),
893 backpressure_manager.clone(),
894 connection_monitor_status.clone(),
895 ®istry_service,
896 iota_node_metrics.clone(),
897 ),
898 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
899 );
900 let mut components = components?;
901
902 components.consensus_adapter.submit_recovered(&epoch_store);
903
904 components.validator_server_handle = components.validator_server_handle.start().await;
906
907 Some(components)
908 } else {
909 None
910 };
911
912 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
914
915 let node = Self {
916 config,
917 validator_components: Mutex::new(validator_components),
918 _http_server: http_server,
919 state,
920 transaction_orchestrator,
921 registry_service,
922 metrics: iota_node_metrics,
923
924 _discovery: discovery_handle,
925 state_sync_handle,
926 randomness_handle,
927 checkpoint_store,
928 accumulator: Mutex::new(Some(accumulator)),
929 end_of_epoch_channel,
930 connection_monitor_status,
931 trusted_peer_change_tx,
932 backpressure_manager,
933
934 _db_checkpoint_handle: db_checkpoint_handle,
935
936 #[cfg(msim)]
937 sim_state: Default::default(),
938
939 _state_archive_handle: state_archive_handle,
940 _state_snapshot_uploader_handle: state_snapshot_handle,
941 shutdown_channel_tx: shutdown_channel,
942
943 grpc_server_handle: Mutex::new(grpc_server_handle),
944
945 auth_agg,
946 };
947
948 info!("IotaNode started!");
949 let node = Arc::new(node);
950 let node_copy = node.clone();
951 spawn_monitored_task!(async move {
952 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
953 if let Err(error) = result {
954 warn!("Reconfiguration finished with error {:?}", error);
955 }
956 });
957
958 Ok(node)
959 }
960
961 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
962 self.end_of_epoch_channel.subscribe()
963 }
964
965 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
966 self.shutdown_channel_tx.subscribe()
967 }
968
969 pub fn current_epoch_for_testing(&self) -> EpochId {
970 self.state.current_epoch_for_testing()
971 }
972
973 pub fn db_checkpoint_path(&self) -> PathBuf {
974 self.config.db_checkpoint_path()
975 }
976
977 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
979 info!("close_epoch (current epoch = {})", epoch_store.epoch());
980 self.validator_components
981 .lock()
982 .await
983 .as_ref()
984 .ok_or_else(|| IotaError::from("Node is not a validator"))?
985 .consensus_adapter
986 .close_epoch(epoch_store);
987 Ok(())
988 }
989
990 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
991 self.state
992 .clear_override_protocol_upgrade_buffer_stake(epoch)
993 }
994
995 pub fn set_override_protocol_upgrade_buffer_stake(
996 &self,
997 epoch: EpochId,
998 buffer_stake_bps: u64,
999 ) -> IotaResult {
1000 self.state
1001 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1002 }
1003
1004 pub async fn close_epoch_for_testing(&self) -> IotaResult {
1007 let epoch_store = self.state.epoch_store_for_testing();
1008 self.close_epoch(&epoch_store).await
1009 }
1010
1011 async fn start_state_archival(
1012 config: &NodeConfig,
1013 prometheus_registry: &Registry,
1014 state_sync_store: RocksDbStore,
1015 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1016 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1017 let local_store_config = ObjectStoreConfig {
1018 object_store: Some(ObjectStoreType::File),
1019 directory: Some(config.archive_path()),
1020 ..Default::default()
1021 };
1022 let archive_writer = ArchiveWriter::new(
1023 local_store_config,
1024 remote_store_config.clone(),
1025 FileCompression::Zstd,
1026 StorageFormat::Blob,
1027 Duration::from_secs(600),
1028 256 * 1024 * 1024,
1029 prometheus_registry,
1030 )
1031 .await?;
1032 Ok(Some(archive_writer.start(state_sync_store).await?))
1033 } else {
1034 Ok(None)
1035 }
1036 }
1037
1038 fn start_state_snapshot(
1041 config: &NodeConfig,
1042 prometheus_registry: &Registry,
1043 checkpoint_store: Arc<CheckpointStore>,
1044 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1045 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1046 let snapshot_uploader = StateSnapshotUploader::new(
1047 &config.db_checkpoint_path(),
1048 &config.snapshot_path(),
1049 remote_store_config.clone(),
1050 60,
1051 prometheus_registry,
1052 checkpoint_store,
1053 )?;
1054 Ok(Some(snapshot_uploader.start()))
1055 } else {
1056 Ok(None)
1057 }
1058 }
1059
1060 fn start_db_checkpoint(
1061 config: &NodeConfig,
1062 prometheus_registry: &Registry,
1063 state_snapshot_enabled: bool,
1064 ) -> Result<(
1065 DBCheckpointConfig,
1066 Option<tokio::sync::broadcast::Sender<()>>,
1067 )> {
1068 let checkpoint_path = Some(
1069 config
1070 .db_checkpoint_config
1071 .checkpoint_path
1072 .clone()
1073 .unwrap_or_else(|| config.db_checkpoint_path()),
1074 );
1075 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1076 DBCheckpointConfig {
1077 checkpoint_path,
1078 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1079 true
1080 } else {
1081 config
1082 .db_checkpoint_config
1083 .perform_db_checkpoints_at_epoch_end
1084 },
1085 ..config.db_checkpoint_config.clone()
1086 }
1087 } else {
1088 config.db_checkpoint_config.clone()
1089 };
1090
1091 match (
1092 db_checkpoint_config.object_store_config.as_ref(),
1093 state_snapshot_enabled,
1094 ) {
1095 (None, false) => Ok((db_checkpoint_config, None)),
1100 (_, _) => {
1101 let handler = DBCheckpointHandler::new(
1102 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1103 db_checkpoint_config.object_store_config.as_ref(),
1104 60,
1105 db_checkpoint_config
1106 .prune_and_compact_before_upload
1107 .unwrap_or(true),
1108 config.authority_store_pruning_config.clone(),
1109 prometheus_registry,
1110 state_snapshot_enabled,
1111 )?;
1112 Ok((
1113 db_checkpoint_config,
1114 Some(DBCheckpointHandler::start(handler)),
1115 ))
1116 }
1117 }
1118 }
1119
1120 fn create_p2p_network(
1121 config: &NodeConfig,
1122 state_sync_store: RocksDbStore,
1123 chain_identifier: ChainIdentifier,
1124 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1125 archive_readers: ArchiveReaderBalancer,
1126 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1127 prometheus_registry: &Registry,
1128 ) -> Result<(
1129 Network,
1130 discovery::Handle,
1131 state_sync::Handle,
1132 randomness::Handle,
1133 )> {
1134 let (state_sync, state_sync_server) = state_sync::Builder::new()
1135 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1136 .store(state_sync_store)
1137 .archive_readers(archive_readers)
1138 .with_metrics(prometheus_registry)
1139 .build();
1140
1141 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1142 .config(config.p2p_config.clone())
1143 .build();
1144
1145 let (randomness, randomness_router) =
1146 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1147 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1148 .with_metrics(prometheus_registry)
1149 .build();
1150
1151 let p2p_network = {
1152 let routes = anemo::Router::new()
1153 .add_rpc_service(discovery_server)
1154 .add_rpc_service(state_sync_server);
1155 let routes = routes.merge(randomness_router);
1156
1157 let inbound_network_metrics =
1158 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1159 let outbound_network_metrics =
1160 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1161
1162 let service = ServiceBuilder::new()
1163 .layer(
1164 TraceLayer::new_for_server_errors()
1165 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1166 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1167 )
1168 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1169 Arc::new(inbound_network_metrics),
1170 config.p2p_config.excessive_message_size(),
1171 )))
1172 .service(routes);
1173
1174 let outbound_layer = ServiceBuilder::new()
1175 .layer(
1176 TraceLayer::new_for_client_and_server_errors()
1177 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1178 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1179 )
1180 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1181 Arc::new(outbound_network_metrics),
1182 config.p2p_config.excessive_message_size(),
1183 )))
1184 .into_inner();
1185
1186 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1187 anemo_config.max_frame_size = Some(1 << 30);
1190
1191 let mut quic_config = anemo_config.quic.unwrap_or_default();
1194 if quic_config.socket_send_buffer_size.is_none() {
1195 quic_config.socket_send_buffer_size = Some(20 << 20);
1196 }
1197 if quic_config.socket_receive_buffer_size.is_none() {
1198 quic_config.socket_receive_buffer_size = Some(20 << 20);
1199 }
1200 quic_config.allow_failed_socket_buffer_size_setting = true;
1201
1202 if quic_config.max_concurrent_bidi_streams.is_none() {
1205 quic_config.max_concurrent_bidi_streams = Some(500);
1206 }
1207 if quic_config.max_concurrent_uni_streams.is_none() {
1208 quic_config.max_concurrent_uni_streams = Some(500);
1209 }
1210 if quic_config.stream_receive_window.is_none() {
1211 quic_config.stream_receive_window = Some(100 << 20);
1212 }
1213 if quic_config.receive_window.is_none() {
1214 quic_config.receive_window = Some(200 << 20);
1215 }
1216 if quic_config.send_window.is_none() {
1217 quic_config.send_window = Some(200 << 20);
1218 }
1219 if quic_config.crypto_buffer_size.is_none() {
1220 quic_config.crypto_buffer_size = Some(1 << 20);
1221 }
1222 if quic_config.max_idle_timeout_ms.is_none() {
1223 quic_config.max_idle_timeout_ms = Some(30_000);
1224 }
1225 if quic_config.keep_alive_interval_ms.is_none() {
1226 quic_config.keep_alive_interval_ms = Some(5_000);
1227 }
1228 anemo_config.quic = Some(quic_config);
1229
1230 let server_name = format!("iota-{chain_identifier}");
1231 let network = Network::bind(config.p2p_config.listen_address)
1232 .server_name(&server_name)
1233 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1234 .config(anemo_config)
1235 .outbound_request_layer(outbound_layer)
1236 .start(service)?;
1237 info!(
1238 server_name = server_name,
1239 "P2p network started on {}",
1240 network.local_addr()
1241 );
1242
1243 network
1244 };
1245
1246 let discovery_handle =
1247 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1248 let state_sync_handle = state_sync.start(p2p_network.clone());
1249 let randomness_handle = randomness.start(p2p_network.clone());
1250
1251 Ok((
1252 p2p_network,
1253 discovery_handle,
1254 state_sync_handle,
1255 randomness_handle,
1256 ))
1257 }
1258
1259 async fn construct_validator_components(
1262 config: NodeConfig,
1263 state: Arc<AuthorityState>,
1264 committee: Arc<Committee>,
1265 epoch_store: Arc<AuthorityPerEpochStore>,
1266 checkpoint_store: Arc<CheckpointStore>,
1267 state_sync_handle: state_sync::Handle,
1268 randomness_handle: randomness::Handle,
1269 accumulator: Weak<StateAccumulator>,
1270 backpressure_manager: Arc<BackpressureManager>,
1271 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1272 registry_service: &RegistryService,
1273 iota_node_metrics: Arc<IotaNodeMetrics>,
1274 ) -> Result<ValidatorComponents> {
1275 let mut config_clone = config.clone();
1276 let consensus_config = config_clone
1277 .consensus_config
1278 .as_mut()
1279 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1280 let validator_registry = Registry::new();
1281 let validator_registry_id = registry_service.add(validator_registry.clone());
1282
1283 let client = Arc::new(UpdatableConsensusClient::new());
1284 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1285 &committee,
1286 consensus_config,
1287 state.name,
1288 connection_monitor_status.clone(),
1289 &validator_registry,
1290 client.clone(),
1291 checkpoint_store.clone(),
1292 ));
1293 let consensus_manager = ConsensusManager::new(
1294 &config,
1295 consensus_config,
1296 registry_service,
1297 &validator_registry,
1298 client,
1299 );
1300
1301 let consensus_store_pruner = ConsensusStorePruner::new(
1304 consensus_manager.get_storage_base_path(),
1305 consensus_config.db_retention_epochs(),
1306 consensus_config.db_pruner_period(),
1307 &validator_registry,
1308 );
1309
1310 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1311 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1312
1313 let validator_server_handle = Self::start_grpc_validator_service(
1314 &config,
1315 state.clone(),
1316 consensus_adapter.clone(),
1317 &validator_registry,
1318 )
1319 .await?;
1320
1321 let validator_overload_monitor_handle = if config
1324 .authority_overload_config
1325 .max_load_shedding_percentage
1326 > 0
1327 {
1328 let authority_state = Arc::downgrade(&state);
1329 let overload_config = config.authority_overload_config.clone();
1330 fail_point!("starting_overload_monitor");
1331 Some(spawn_monitored_task!(overload_monitor(
1332 authority_state,
1333 overload_config,
1334 )))
1335 } else {
1336 None
1337 };
1338
1339 Self::start_epoch_specific_validator_components(
1340 &config,
1341 state.clone(),
1342 consensus_adapter,
1343 checkpoint_store,
1344 epoch_store,
1345 state_sync_handle,
1346 randomness_handle,
1347 consensus_manager,
1348 consensus_store_pruner,
1349 accumulator,
1350 backpressure_manager,
1351 validator_server_handle,
1352 validator_overload_monitor_handle,
1353 checkpoint_metrics,
1354 iota_node_metrics,
1355 iota_tx_validator_metrics,
1356 validator_registry_id,
1357 )
1358 .await
1359 }
1360
1361 async fn start_epoch_specific_validator_components(
1364 config: &NodeConfig,
1365 state: Arc<AuthorityState>,
1366 consensus_adapter: Arc<ConsensusAdapter>,
1367 checkpoint_store: Arc<CheckpointStore>,
1368 epoch_store: Arc<AuthorityPerEpochStore>,
1369 state_sync_handle: state_sync::Handle,
1370 randomness_handle: randomness::Handle,
1371 consensus_manager: ConsensusManager,
1372 consensus_store_pruner: ConsensusStorePruner,
1373 accumulator: Weak<StateAccumulator>,
1374 backpressure_manager: Arc<BackpressureManager>,
1375 validator_server_handle: SpawnOnce,
1376 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1377 checkpoint_metrics: Arc<CheckpointMetrics>,
1378 iota_node_metrics: Arc<IotaNodeMetrics>,
1379 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1380 validator_registry_id: RegistryID,
1381 ) -> Result<ValidatorComponents> {
1382 let checkpoint_service = Self::build_checkpoint_service(
1383 config,
1384 consensus_adapter.clone(),
1385 checkpoint_store.clone(),
1386 epoch_store.clone(),
1387 state.clone(),
1388 state_sync_handle,
1389 accumulator,
1390 checkpoint_metrics.clone(),
1391 );
1392
1393 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1398
1399 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1400
1401 let randomness_manager = RandomnessManager::try_new(
1402 Arc::downgrade(&epoch_store),
1403 Box::new(consensus_adapter.clone()),
1404 randomness_handle,
1405 config.authority_key_pair(),
1406 )
1407 .await;
1408 if let Some(randomness_manager) = randomness_manager {
1409 epoch_store
1410 .set_randomness_manager(randomness_manager)
1411 .await?;
1412 }
1413
1414 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1415 state.clone(),
1416 checkpoint_service.clone(),
1417 epoch_store.clone(),
1418 low_scoring_authorities,
1419 backpressure_manager,
1420 );
1421
1422 info!("Starting consensus manager");
1423
1424 consensus_manager
1425 .start(
1426 config,
1427 epoch_store.clone(),
1428 consensus_handler_initializer,
1429 IotaTxValidator::new(
1430 epoch_store.clone(),
1431 checkpoint_service.clone(),
1432 state.transaction_manager().clone(),
1433 iota_tx_validator_metrics.clone(),
1434 ),
1435 )
1436 .await;
1437
1438 info!("Spawning checkpoint service");
1439 let checkpoint_service_tasks = checkpoint_service.spawn().await;
1440
1441 if epoch_store.authenticator_state_enabled() {
1442 Self::start_jwk_updater(
1443 config,
1444 iota_node_metrics,
1445 state.name,
1446 epoch_store.clone(),
1447 consensus_adapter.clone(),
1448 );
1449 }
1450
1451 Ok(ValidatorComponents {
1452 validator_server_handle,
1453 validator_overload_monitor_handle,
1454 consensus_manager,
1455 consensus_store_pruner,
1456 consensus_adapter,
1457 checkpoint_service_tasks,
1458 checkpoint_metrics,
1459 iota_tx_validator_metrics,
1460 validator_registry_id,
1461 })
1462 }
1463
1464 fn build_checkpoint_service(
1471 config: &NodeConfig,
1472 consensus_adapter: Arc<ConsensusAdapter>,
1473 checkpoint_store: Arc<CheckpointStore>,
1474 epoch_store: Arc<AuthorityPerEpochStore>,
1475 state: Arc<AuthorityState>,
1476 state_sync_handle: state_sync::Handle,
1477 accumulator: Weak<StateAccumulator>,
1478 checkpoint_metrics: Arc<CheckpointMetrics>,
1479 ) -> Arc<CheckpointService> {
1480 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1481 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1482
1483 debug!(
1484 "Starting checkpoint service with epoch start timestamp {}
1485 and epoch duration {}",
1486 epoch_start_timestamp_ms, epoch_duration_ms
1487 );
1488
1489 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1490 sender: consensus_adapter,
1491 signer: state.secret.clone(),
1492 authority: config.authority_public_key(),
1493 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1494 .checked_add(epoch_duration_ms)
1495 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1496 metrics: checkpoint_metrics.clone(),
1497 });
1498
1499 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1500 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1501 let max_checkpoint_size_bytes =
1502 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1503
1504 CheckpointService::build(
1505 state.clone(),
1506 checkpoint_store,
1507 epoch_store,
1508 state.get_transaction_cache_reader().clone(),
1509 accumulator,
1510 checkpoint_output,
1511 Box::new(certified_checkpoint_output),
1512 checkpoint_metrics,
1513 max_tx_per_checkpoint,
1514 max_checkpoint_size_bytes,
1515 )
1516 }
1517
1518 fn construct_consensus_adapter(
1519 committee: &Committee,
1520 consensus_config: &ConsensusConfig,
1521 authority: AuthorityName,
1522 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1523 prometheus_registry: &Registry,
1524 consensus_client: Arc<dyn ConsensusClient>,
1525 checkpoint_store: Arc<CheckpointStore>,
1526 ) -> ConsensusAdapter {
1527 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1528 ConsensusAdapter::new(
1532 consensus_client,
1533 checkpoint_store,
1534 authority,
1535 connection_monitor_status,
1536 consensus_config.max_pending_transactions(),
1537 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1538 consensus_config.max_submit_position,
1539 consensus_config.submit_delay_step_override(),
1540 ca_metrics,
1541 )
1542 }
1543
1544 async fn start_grpc_validator_service(
1545 config: &NodeConfig,
1546 state: Arc<AuthorityState>,
1547 consensus_adapter: Arc<ConsensusAdapter>,
1548 prometheus_registry: &Registry,
1549 ) -> Result<SpawnOnce> {
1550 let validator_service = ValidatorService::new(
1551 state.clone(),
1552 consensus_adapter,
1553 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1554 TrafficControllerMetrics::new(prometheus_registry),
1555 config.policy_config.clone(),
1556 config.firewall_config.clone(),
1557 );
1558
1559 let mut server_conf = iota_network_stack::config::Config::new();
1560 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1561 server_conf.load_shed = config.grpc_load_shed;
1562 let server_builder =
1563 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1564 .add_service(ValidatorServer::new(validator_service));
1565
1566 let tls_config = iota_tls::create_rustls_server_config(
1567 config.network_key_pair().copy().private(),
1568 IOTA_TLS_SERVER_NAME.to_string(),
1569 );
1570
1571 let network_address = config.network_address().clone();
1572
1573 let bind_future = async move {
1574 let server = server_builder
1575 .bind(&network_address, Some(tls_config))
1576 .await
1577 .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1578
1579 let local_addr = server.local_addr();
1580 info!("Listening to traffic on {local_addr}");
1581
1582 Ok(server)
1583 };
1584
1585 Ok(SpawnOnce::new(bind_future))
1586 }
1587
1588 async fn reexecute_pending_consensus_certs(
1606 epoch_store: &Arc<AuthorityPerEpochStore>,
1607 state: &Arc<AuthorityState>,
1608 ) {
1609 let mut pending_consensus_certificates = Vec::new();
1610 let mut additional_certs = Vec::new();
1611
1612 for tx in epoch_store.get_all_pending_consensus_transactions() {
1613 match tx.kind {
1614 ConsensusTransactionKind::CertifiedTransaction(tx)
1617 if !tx.contains_shared_object() =>
1618 {
1619 let tx = *tx;
1620 let tx = VerifiedExecutableTransaction::new_from_certificate(
1623 VerifiedCertificate::new_unchecked(tx),
1624 );
1625 if let Some(fx_digest) = epoch_store
1628 .get_signed_effects_digest(tx.digest())
1629 .expect("db error")
1630 {
1631 pending_consensus_certificates.push((tx, fx_digest));
1632 } else {
1633 additional_certs.push(tx);
1634 }
1635 }
1636 _ => (),
1637 }
1638 }
1639
1640 let digests = pending_consensus_certificates
1641 .iter()
1642 .map(|(tx, _)| *tx.digest())
1643 .collect::<Vec<_>>();
1644
1645 info!(
1646 "reexecuting {} pending consensus certificates: {:?}",
1647 digests.len(),
1648 digests
1649 );
1650
1651 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1652 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1653
1654 let timeout = if cfg!(msim) { 120 } else { 60 };
1660 if tokio::time::timeout(
1661 std::time::Duration::from_secs(timeout),
1662 state
1663 .get_transaction_cache_reader()
1664 .try_notify_read_executed_effects_digests(&digests),
1665 )
1666 .await
1667 .is_err()
1668 {
1669 if let Ok(executed_effects_digests) = state
1671 .get_transaction_cache_reader()
1672 .try_multi_get_executed_effects_digests(&digests)
1673 {
1674 let pending_digests = digests
1675 .iter()
1676 .zip(executed_effects_digests.iter())
1677 .filter_map(|(digest, executed_effects_digest)| {
1678 if executed_effects_digest.is_none() {
1679 Some(digest)
1680 } else {
1681 None
1682 }
1683 })
1684 .collect::<Vec<_>>();
1685 debug_fatal!(
1686 "Timed out waiting for effects digests to be executed: {:?}",
1687 pending_digests
1688 );
1689 } else {
1690 debug_fatal!(
1691 "Timed out waiting for effects digests to be executed, digests not found"
1692 );
1693 }
1694 }
1695 }
1696
1697 pub fn state(&self) -> Arc<AuthorityState> {
1698 self.state.clone()
1699 }
1700
1701 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1703 self.state.reference_gas_price_for_testing()
1704 }
1705
1706 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1707 self.state.committee_store().clone()
1708 }
1709
1710 pub fn clone_authority_aggregator(
1720 &self,
1721 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1722 self.transaction_orchestrator
1723 .as_ref()
1724 .map(|to| to.clone_authority_aggregator())
1725 }
1726
1727 pub fn transaction_orchestrator(
1728 &self,
1729 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1730 self.transaction_orchestrator.clone()
1731 }
1732
1733 pub fn subscribe_to_transaction_orchestrator_effects(
1734 &self,
1735 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1736 self.transaction_orchestrator
1737 .as_ref()
1738 .map(|to| to.subscribe_to_effects_queue())
1739 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1740 }
1741
1742 pub async fn monitor_reconfiguration(
1748 self: Arc<Self>,
1749 mut epoch_store: Arc<AuthorityPerEpochStore>,
1750 ) -> Result<()> {
1751 let checkpoint_executor_metrics =
1752 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1753
1754 loop {
1755 let mut accumulator_guard = self.accumulator.lock().await;
1756 let accumulator = accumulator_guard.take().unwrap();
1757 info!(
1758 "Creating checkpoint executor for epoch {}",
1759 epoch_store.epoch()
1760 );
1761
1762 let summary_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1764 guard.as_ref().map(|handle| {
1765 let tx = handle.checkpoint_summary_broadcaster().clone();
1766 Box::new(move |summary: &CertifiedCheckpointSummary| {
1767 tx.send_traced(summary);
1768 }) as Box<dyn Fn(&CertifiedCheckpointSummary) + Send + Sync>
1769 })
1770 } else {
1771 None
1772 };
1773 let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1774 guard.as_ref().map(|handle| {
1775 let tx = handle.checkpoint_data_broadcaster().clone();
1776 Box::new(move |data: &CheckpointData| {
1777 tx.send_traced(data);
1778 }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1779 })
1780 } else {
1781 None
1782 };
1783
1784 let checkpoint_executor = CheckpointExecutor::new(
1785 epoch_store.clone(),
1786 self.checkpoint_store.clone(),
1787 self.state.clone(),
1788 accumulator.clone(),
1789 self.backpressure_manager.clone(),
1790 self.config.checkpoint_executor_config.clone(),
1791 checkpoint_executor_metrics.clone(),
1792 summary_sender,
1793 data_sender,
1794 );
1795
1796 let run_with_range = self.config.run_with_range;
1797
1798 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1799
1800 if let Some(components) = &*self.validator_components.lock().await {
1802 tokio::time::sleep(Duration::from_millis(1)).await;
1804
1805 let config = cur_epoch_store.protocol_config();
1806 let binary_config = to_binary_config(config);
1807 let transaction = ConsensusTransaction::new_capability_notification_v1(
1808 AuthorityCapabilitiesV1::new(
1809 self.state.name,
1810 cur_epoch_store.get_chain_identifier().chain(),
1811 self.config
1812 .supported_protocol_versions
1813 .expect("Supported versions should be populated")
1814 .truncate_below(config.version),
1816 self.state
1817 .get_available_system_packages(&binary_config)
1818 .await,
1819 ),
1820 );
1821 info!(?transaction, "submitting capabilities to consensus");
1822 components
1823 .consensus_adapter
1824 .submit(transaction, None, &cur_epoch_store)?;
1825 } else if self.state.is_active_validator(&cur_epoch_store)
1826 && cur_epoch_store
1827 .protocol_config()
1828 .track_non_committee_eligible_validators()
1829 {
1830 let epoch_store = cur_epoch_store.clone();
1834 let node_clone = self.clone();
1835 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1836 node_clone
1837 .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1838 .await;
1839 }));
1840 }
1841
1842 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1843
1844 if stop_condition == StopReason::RunWithRangeCondition {
1845 IotaNode::shutdown(&self).await;
1846 self.shutdown_channel_tx
1847 .send(run_with_range)
1848 .expect("RunWithRangeCondition met but failed to send shutdown message");
1849 return Ok(());
1850 }
1851
1852 let latest_system_state = self
1854 .state
1855 .get_object_cache_reader()
1856 .try_get_iota_system_state_object_unsafe()
1857 .expect("Read IOTA System State object cannot fail");
1858
1859 #[cfg(msim)]
1860 if !self
1861 .sim_state
1862 .sim_safe_mode_expected
1863 .load(Ordering::Relaxed)
1864 {
1865 debug_assert!(!latest_system_state.safe_mode());
1866 }
1867
1868 #[cfg(not(msim))]
1869 debug_assert!(!latest_system_state.safe_mode());
1870
1871 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1872 if self.state.is_fullnode(&cur_epoch_store) {
1873 warn!(
1874 "Failed to send end of epoch notification to subscriber: {:?}",
1875 err
1876 );
1877 }
1878 }
1879
1880 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1881 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1882
1883 self.auth_agg.store(Arc::new(
1884 self.auth_agg
1885 .load()
1886 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1887 ));
1888
1889 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1890 let next_epoch = next_epoch_committee.epoch();
1891 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1892
1893 info!(
1894 next_epoch,
1895 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1896 );
1897
1898 fail_point_async!("reconfig_delay");
1899
1900 let authority_names_to_peer_ids =
1905 new_epoch_start_state.get_authority_names_to_peer_ids();
1906 self.connection_monitor_status
1907 .update_mapping_for_epoch(authority_names_to_peer_ids);
1908
1909 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1910
1911 send_trusted_peer_change(
1912 &self.config,
1913 &self.trusted_peer_change_tx,
1914 &new_epoch_start_state,
1915 );
1916
1917 let mut validator_components_lock_guard = self.validator_components.lock().await;
1918
1919 let new_epoch_store = self
1923 .reconfigure_state(
1924 &self.state,
1925 &cur_epoch_store,
1926 next_epoch_committee.clone(),
1927 new_epoch_start_state,
1928 accumulator.clone(),
1929 )
1930 .await?;
1931
1932 let new_validator_components = if let Some(ValidatorComponents {
1933 validator_server_handle,
1934 validator_overload_monitor_handle,
1935 consensus_manager,
1936 consensus_store_pruner,
1937 consensus_adapter,
1938 mut checkpoint_service_tasks,
1939 checkpoint_metrics,
1940 iota_tx_validator_metrics,
1941 validator_registry_id,
1942 }) = validator_components_lock_guard.take()
1943 {
1944 info!("Reconfiguring the validator.");
1945 checkpoint_service_tasks.abort_all();
1950 while let Some(result) = checkpoint_service_tasks.join_next().await {
1951 if let Err(err) = result {
1952 if err.is_panic() {
1953 std::panic::resume_unwind(err.into_panic());
1954 }
1955 warn!("Error in checkpoint service task: {:?}", err);
1956 }
1957 }
1958 info!("Checkpoint service has shut down.");
1959
1960 consensus_manager.shutdown().await;
1961 info!("Consensus has shut down.");
1962
1963 info!("Epoch store finished reconfiguration.");
1964
1965 let accumulator_metrics = Arc::into_inner(accumulator)
1968 .expect("Accumulator should have no other references at this point")
1969 .metrics();
1970 let new_accumulator = Arc::new(StateAccumulator::new(
1971 self.state.get_accumulator_store().clone(),
1972 accumulator_metrics,
1973 ));
1974 let weak_accumulator = Arc::downgrade(&new_accumulator);
1975 *accumulator_guard = Some(new_accumulator);
1976
1977 consensus_store_pruner.prune(next_epoch).await;
1978
1979 if self.state.is_committee_validator(&new_epoch_store) {
1980 Some(
1982 Self::start_epoch_specific_validator_components(
1983 &self.config,
1984 self.state.clone(),
1985 consensus_adapter,
1986 self.checkpoint_store.clone(),
1987 new_epoch_store.clone(),
1988 self.state_sync_handle.clone(),
1989 self.randomness_handle.clone(),
1990 consensus_manager,
1991 consensus_store_pruner,
1992 weak_accumulator,
1993 self.backpressure_manager.clone(),
1994 validator_server_handle,
1995 validator_overload_monitor_handle,
1996 checkpoint_metrics,
1997 self.metrics.clone(),
1998 iota_tx_validator_metrics,
1999 validator_registry_id,
2000 )
2001 .await?,
2002 )
2003 } else {
2004 info!("This node is no longer a validator after reconfiguration");
2005 if self.registry_service.remove(validator_registry_id) {
2006 debug!("Removed validator metrics registry");
2007 } else {
2008 warn!("Failed to remove validator metrics registry");
2009 }
2010 validator_server_handle.shutdown();
2011 debug!("Validator grpc server shutdown triggered");
2012
2013 None
2014 }
2015 } else {
2016 let accumulator_metrics = Arc::into_inner(accumulator)
2019 .expect("Accumulator should have no other references at this point")
2020 .metrics();
2021 let new_accumulator = Arc::new(StateAccumulator::new(
2022 self.state.get_accumulator_store().clone(),
2023 accumulator_metrics,
2024 ));
2025 let weak_accumulator = Arc::downgrade(&new_accumulator);
2026 *accumulator_guard = Some(new_accumulator);
2027
2028 if self.state.is_committee_validator(&new_epoch_store) {
2029 info!("Promoting the node from fullnode to validator, starting grpc server");
2030
2031 let mut components = Self::construct_validator_components(
2032 self.config.clone(),
2033 self.state.clone(),
2034 Arc::new(next_epoch_committee.clone()),
2035 new_epoch_store.clone(),
2036 self.checkpoint_store.clone(),
2037 self.state_sync_handle.clone(),
2038 self.randomness_handle.clone(),
2039 weak_accumulator,
2040 self.backpressure_manager.clone(),
2041 self.connection_monitor_status.clone(),
2042 &self.registry_service,
2043 self.metrics.clone(),
2044 )
2045 .await?;
2046
2047 components.validator_server_handle =
2048 components.validator_server_handle.start().await;
2049
2050 Some(components)
2051 } else {
2052 None
2053 }
2054 };
2055 *validator_components_lock_guard = new_validator_components;
2056
2057 cur_epoch_store.release_db_handles();
2060
2061 if cfg!(msim)
2062 && !matches!(
2063 self.config
2064 .authority_store_pruning_config
2065 .num_epochs_to_retain_for_checkpoints(),
2066 None | Some(u64::MAX) | Some(0)
2067 )
2068 {
2069 self.state
2070 .prune_checkpoints_for_eligible_epochs_for_testing(
2071 self.config.clone(),
2072 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2073 )
2074 .await?;
2075 }
2076
2077 epoch_store = new_epoch_store;
2078 info!("Reconfiguration finished");
2079 }
2080 }
2081
2082 async fn shutdown(&self) {
2083 if let Some(validator_components) = &*self.validator_components.lock().await {
2084 validator_components.consensus_manager.shutdown().await;
2085 }
2086
2087 if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2089 info!("Shutting down gRPC server");
2090 if let Err(e) = grpc_handle.shutdown().await {
2091 warn!("Failed to gracefully shutdown gRPC server: {e}");
2092 }
2093 }
2094 }
2095
2096 async fn reconfigure_state(
2099 &self,
2100 state: &Arc<AuthorityState>,
2101 cur_epoch_store: &AuthorityPerEpochStore,
2102 next_epoch_committee: Committee,
2103 next_epoch_start_system_state: EpochStartSystemState,
2104 accumulator: Arc<StateAccumulator>,
2105 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2106 let next_epoch = next_epoch_committee.epoch();
2107
2108 let last_checkpoint = self
2109 .checkpoint_store
2110 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2111 .expect("Error loading last checkpoint for current epoch")
2112 .expect("Could not load last checkpoint for current epoch");
2113 let epoch_supply_change = last_checkpoint
2114 .end_of_epoch_data
2115 .as_ref()
2116 .ok_or_else(|| {
2117 IotaError::from("last checkpoint in epoch should contain end of epoch data")
2118 })?
2119 .epoch_supply_change;
2120
2121 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2122
2123 assert_eq!(
2124 Some(last_checkpoint_seq),
2125 self.checkpoint_store
2126 .get_highest_executed_checkpoint_seq_number()
2127 .expect("Error loading highest executed checkpoint sequence number")
2128 );
2129
2130 let epoch_start_configuration = EpochStartConfiguration::new(
2131 next_epoch_start_system_state,
2132 *last_checkpoint.digest(),
2133 state.get_object_store().as_ref(),
2134 EpochFlag::default_flags_for_new_epoch(&state.config),
2135 )
2136 .expect("EpochStartConfiguration construction cannot fail");
2137
2138 let new_epoch_store = self
2139 .state
2140 .reconfigure(
2141 cur_epoch_store,
2142 self.config.supported_protocol_versions.unwrap(),
2143 next_epoch_committee,
2144 epoch_start_configuration,
2145 accumulator,
2146 &self.config.expensive_safety_check_config,
2147 epoch_supply_change,
2148 last_checkpoint_seq,
2149 )
2150 .await
2151 .expect("Reconfigure authority state cannot fail");
2152 info!(next_epoch, "Node State has been reconfigured");
2153 assert_eq!(next_epoch, new_epoch_store.epoch());
2154 self.state.get_reconfig_api().update_epoch_flags_metrics(
2155 cur_epoch_store.epoch_start_config().flags(),
2156 new_epoch_store.epoch_start_config().flags(),
2157 );
2158
2159 Ok(new_epoch_store)
2160 }
2161
2162 pub fn get_config(&self) -> &NodeConfig {
2163 &self.config
2164 }
2165
2166 async fn execute_transaction_immediately_at_zero_epoch(
2167 state: &Arc<AuthorityState>,
2168 epoch_store: &Arc<AuthorityPerEpochStore>,
2169 tx: &Transaction,
2170 span: tracing::Span,
2171 ) {
2172 let _guard = span.enter();
2173 let transaction =
2174 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2175 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2176 tx.data().clone(),
2177 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2178 ),
2179 );
2180 state
2181 .try_execute_immediately(&transaction, None, epoch_store)
2182 .unwrap();
2183 }
2184
2185 pub fn randomness_handle(&self) -> randomness::Handle {
2186 self.randomness_handle.clone()
2187 }
2188
2189 async fn send_signed_capability_notification_to_committee_with_retry(
2195 &self,
2196 epoch_store: &Arc<AuthorityPerEpochStore>,
2197 ) {
2198 const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2199 const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2200 const MAX_RETRY_INTERVAL_SECS: u64 = 300; let config = epoch_store.protocol_config();
2204 let binary_config = to_binary_config(config);
2205
2206 let capabilities = AuthorityCapabilitiesV1::new(
2208 self.state.name,
2209 epoch_store.get_chain_identifier().chain(),
2210 self.config
2211 .supported_protocol_versions
2212 .expect("Supported versions should be populated")
2213 .truncate_below(config.version),
2214 self.state
2215 .get_available_system_packages(&binary_config)
2216 .await,
2217 );
2218
2219 let signature = AuthoritySignature::new_secure(
2221 &IntentMessage::new(
2222 Intent::iota_app(IntentScope::AuthorityCapabilities),
2223 &capabilities,
2224 ),
2225 &epoch_store.epoch(),
2226 self.config.authority_key_pair(),
2227 );
2228
2229 let request = HandleCapabilityNotificationRequestV1 {
2230 message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2231 };
2232
2233 let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2234
2235 loop {
2236 let auth_agg = self.auth_agg.load();
2237 match auth_agg
2238 .send_capability_notification_to_quorum(request.clone())
2239 .await
2240 {
2241 Ok(_) => {
2242 info!("Successfully sent capability notification to committee");
2243 break;
2244 }
2245 Err(err) => {
2246 match &err {
2247 AggregatorSendCapabilityNotificationError::RetryableNotification {
2248 errors,
2249 } => {
2250 warn!(
2251 "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2252 retry_interval, errors
2253 );
2254 }
2255 AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2256 errors,
2257 } => {
2258 error!(
2259 "Failed to send capability notification to committee (non-retryable error): {:?}",
2260 errors
2261 );
2262 break;
2263 }
2264 };
2265
2266 tokio::time::sleep(retry_interval).await;
2268
2269 retry_interval = std::cmp::min(
2271 retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2272 Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2273 );
2274 }
2275 }
2276 }
2277 }
2278}
2279
2280#[cfg(not(msim))]
2281impl IotaNode {
2282 async fn fetch_jwks(
2283 _authority: AuthorityName,
2284 provider: &OIDCProvider,
2285 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2286 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2287 let client = reqwest::Client::new();
2288 fetch_jwks(provider, &client)
2289 .await
2290 .map_err(|_| IotaError::JWKRetrieval)
2291 }
2292}
2293
2294#[cfg(msim)]
2295impl IotaNode {
2296 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2297 self.sim_state.sim_node.id()
2298 }
2299
2300 pub fn set_safe_mode_expected(&self, new_value: bool) {
2301 info!("Setting safe mode expected to {}", new_value);
2302 self.sim_state
2303 .sim_safe_mode_expected
2304 .store(new_value, Ordering::Relaxed);
2305 }
2306
2307 async fn fetch_jwks(
2308 authority: AuthorityName,
2309 provider: &OIDCProvider,
2310 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2311 get_jwk_injector()(authority, provider)
2312 }
2313}
2314
2315enum SpawnOnce {
2316 Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2318 #[allow(unused)]
2319 Started(iota_http::ServerHandle),
2320}
2321
2322impl SpawnOnce {
2323 pub fn new(
2324 future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2325 ) -> Self {
2326 Self::Unstarted(Mutex::new(Box::pin(future)))
2327 }
2328
2329 pub async fn start(self) -> Self {
2330 match self {
2331 Self::Unstarted(future) => {
2332 let server = future
2333 .into_inner()
2334 .await
2335 .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2336 let handle = server.handle().clone();
2337 tokio::spawn(async move {
2338 if let Err(err) = server.serve().await {
2339 info!("Server stopped: {err}");
2340 }
2341 info!("Server stopped");
2342 });
2343 Self::Started(handle)
2344 }
2345 Self::Started(_) => self,
2346 }
2347 }
2348
2349 pub fn shutdown(self) {
2350 if let SpawnOnce::Started(handle) = self {
2351 handle.trigger_shutdown();
2352 }
2353 }
2354}
2355
2356fn send_trusted_peer_change(
2359 config: &NodeConfig,
2360 sender: &watch::Sender<TrustedPeerChangeEvent>,
2361 new_epoch_start_state: &EpochStartSystemState,
2362) {
2363 let new_committee =
2364 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2365
2366 sender.send_modify(|event| {
2367 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2368 event.new_committee = new_committee;
2369 })
2370}
2371
2372fn build_kv_store(
2373 state: &Arc<AuthorityState>,
2374 config: &NodeConfig,
2375 registry: &Registry,
2376) -> Result<Arc<TransactionKeyValueStore>> {
2377 let metrics = KeyValueStoreMetrics::new(registry);
2378 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2379
2380 let base_url = &config.transaction_kv_store_read_config.base_url;
2381
2382 if base_url.is_empty() {
2383 info!("no http kv store url provided, using local db only");
2384 return Ok(Arc::new(db_store));
2385 }
2386
2387 base_url.parse::<url::Url>().tap_err(|e| {
2388 error!(
2389 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2390 base_url, e
2391 )
2392 })?;
2393
2394 let http_store = HttpKVStore::new_kv(
2395 base_url,
2396 config.transaction_kv_store_read_config.cache_size,
2397 metrics.clone(),
2398 )?;
2399 info!("using local key-value store with fallback to http key-value store");
2400 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2401 db_store,
2402 http_store,
2403 metrics,
2404 "json_rpc_fallback",
2405 )))
2406}
2407
2408async fn build_grpc_server(
2423 config: &NodeConfig,
2424 state: Arc<AuthorityState>,
2425 state_sync_store: RocksDbStore,
2426) -> Result<Option<GrpcServerHandle>> {
2427 if config.consensus_config().is_some() || !config.enable_grpc_api {
2429 return Ok(None);
2430 }
2431
2432 let Some(grpc_config) = &config.grpc_api_config else {
2433 return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2434 };
2435
2436 let rest_read_store = Arc::new(RestReadStore::new(state.clone(), state_sync_store));
2437
2438 let shutdown_token = CancellationToken::new();
2440
2441 let grpc_reader = Arc::new(GrpcReader::from_rest_state_reader(rest_read_store));
2443
2444 let event_subscriber =
2446 state.subscription_handler.clone() as Arc<dyn iota_grpc_server::EventSubscriber>;
2447
2448 let handle = start_grpc_server(
2451 grpc_reader,
2452 event_subscriber,
2453 grpc_config.clone(),
2454 shutdown_token,
2455 )
2456 .await?;
2457
2458 Ok(Some(handle))
2459}
2460
2461pub async fn build_http_server(
2478 state: Arc<AuthorityState>,
2479 store: RocksDbStore,
2480 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2481 config: &NodeConfig,
2482 prometheus_registry: &Registry,
2483 _custom_runtime: Option<Handle>,
2484 software_version: &'static str,
2485) -> Result<Option<iota_http::ServerHandle>> {
2486 if config.consensus_config().is_some() {
2488 return Ok(None);
2489 }
2490
2491 let mut router = axum::Router::new();
2492
2493 let json_rpc_router = {
2494 let mut server = JsonRpcServerBuilder::new(
2495 env!("CARGO_PKG_VERSION"),
2496 prometheus_registry,
2497 config.policy_config.clone(),
2498 config.firewall_config.clone(),
2499 );
2500
2501 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2502
2503 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2504 server.register_module(ReadApi::new(
2505 state.clone(),
2506 kv_store.clone(),
2507 metrics.clone(),
2508 ))?;
2509 server.register_module(CoinReadApi::new(
2510 state.clone(),
2511 kv_store.clone(),
2512 metrics.clone(),
2513 )?)?;
2514
2515 if config.run_with_range.is_none() {
2518 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2519 }
2520 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2521
2522 if let Some(transaction_orchestrator) = transaction_orchestrator {
2523 server.register_module(TransactionExecutionApi::new(
2524 state.clone(),
2525 transaction_orchestrator.clone(),
2526 metrics.clone(),
2527 ))?;
2528 }
2529
2530 let iota_names_config = config
2531 .iota_names_config
2532 .clone()
2533 .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2534
2535 server.register_module(IndexerApi::new(
2536 state.clone(),
2537 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2538 kv_store,
2539 metrics,
2540 iota_names_config,
2541 config.indexer_max_subscriptions,
2542 ))?;
2543 server.register_module(MoveUtils::new(state.clone()))?;
2544
2545 let server_type = config.jsonrpc_server_type();
2546
2547 server.to_router(server_type).await?
2548 };
2549
2550 router = router.merge(json_rpc_router);
2551
2552 if config.enable_rest_api {
2553 let mut rest_service = iota_rest_api::RestService::new(
2554 Arc::new(RestReadStore::new(state.clone(), store)),
2555 software_version,
2556 );
2557
2558 if let Some(config) = config.rest.clone() {
2559 rest_service.with_config(config);
2560 }
2561
2562 rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2563
2564 if let Some(transaction_orchestrator) = transaction_orchestrator {
2565 rest_service.with_executor(transaction_orchestrator.clone())
2566 }
2567
2568 router = router.merge(rest_service.into_router());
2569 }
2570
2571 router = router
2574 .route("/health", axum::routing::get(health_check_handler))
2575 .route_layer(axum::Extension(state));
2576
2577 let layers = ServiceBuilder::new()
2578 .map_request(|mut request: axum::http::Request<_>| {
2579 if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2580 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2581 request.extensions_mut().insert(axum_connect_info);
2582 }
2583 request
2584 })
2585 .layer(axum::middleware::from_fn(server_timing_middleware));
2586
2587 router = router.layer(layers);
2588
2589 let handle = iota_http::Builder::new()
2590 .serve(&config.json_rpc_address, router)
2591 .map_err(|e| anyhow::anyhow!("{e}"))?;
2592 info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2593
2594 Ok(Some(handle))
2595}
2596
2597#[derive(Debug, serde::Serialize, serde::Deserialize)]
2598pub struct Threshold {
2599 pub threshold_seconds: Option<u32>,
2600}
2601
2602async fn health_check_handler(
2603 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2604 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2605) -> impl axum::response::IntoResponse {
2606 if let Some(threshold_seconds) = threshold_seconds {
2607 let summary = match state
2609 .get_checkpoint_store()
2610 .get_highest_executed_checkpoint()
2611 {
2612 Ok(Some(summary)) => summary,
2613 Ok(None) => {
2614 warn!("Highest executed checkpoint not found");
2615 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2616 }
2617 Err(err) => {
2618 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2619 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2620 }
2621 };
2622
2623 let latest_chain_time = summary.timestamp();
2625 let threshold =
2626 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2627
2628 if latest_chain_time < threshold {
2630 warn!(
2631 ?latest_chain_time,
2632 ?threshold,
2633 "failing health check due to checkpoint lag"
2634 );
2635 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2636 }
2637 }
2638 (axum::http::StatusCode::OK, "up")
2640}
2641
2642#[cfg(not(test))]
2643fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2644 protocol_config.max_transactions_per_checkpoint() as usize
2645}
2646
2647#[cfg(test)]
2648fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2649 2
2650}