1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::{BTreeSet, HashMap, HashSet},
9 fmt,
10 future::Future,
11 path::PathBuf,
12 str::FromStr,
13 sync::{Arc, Weak},
14 time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19 callback::CallbackLayer,
20 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::future::BoxFuture;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30 ConsensusConfig, NodeConfig,
31 node::{DBCheckpointConfig, RunWithRange},
32 node_config_metrics::NodeConfigMetrics,
33 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36 authority::{
37 AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38 authority_per_epoch_store::AuthorityPerEpochStore,
39 authority_store_pruner::ObjectsCompactionFilter,
40 authority_store_tables::{
41 AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42 },
43 backpressure::BackpressureManager,
44 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45 },
46 authority_aggregator::{
47 AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
48 },
49 authority_client::NetworkAuthorityClient,
50 authority_server::{ValidatorService, ValidatorServiceMetrics},
51 checkpoints::{
52 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
53 SubmitCheckpointToConsensus,
54 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
55 },
56 connection_monitor::ConnectionMonitor,
57 consensus_adapter::{
58 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
59 ConsensusClient,
60 },
61 consensus_handler::ConsensusHandlerInitializer,
62 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
63 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
64 db_checkpoint_handler::DBCheckpointHandler,
65 epoch::{
66 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
67 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
68 reconfiguration::ReconfigurationInitiator,
69 },
70 execution_cache::build_execution_cache,
71 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
72 jsonrpc_index::IndexStore,
73 module_cache_metrics::ResolverMetrics,
74 overload_monitor::overload_monitor,
75 safe_client::SafeClientMetricsBase,
76 signature_verifier::SignatureVerifierMetrics,
77 state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
78 storage::{GrpcReadStore, RocksDbStore},
79 traffic_controller::metrics::TrafficControllerMetrics,
80 transaction_orchestrator::TransactionOrchestrator,
81 validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87 transaction_builder_api::TransactionBuilderApi,
88 transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93 RegistryID, RegistryService,
94 hardware_metrics::register_hardware_metrics,
95 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96 server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
101};
102use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
103use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
104use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
105use iota_snapshot::uploader::StateSnapshotUploader;
106use iota_storage::{
107 FileCompression, StorageFormat,
108 http_key_value_store::HttpKVStore,
109 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
110 key_value_store_metrics::KeyValueStoreMetrics,
111};
112use iota_types::{
113 base_types::{AuthorityName, ConciseableName, EpochId},
114 committee::Committee,
115 crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
116 digests::ChainIdentifier,
117 error::{IotaError, IotaResult},
118 executable_transaction::VerifiedExecutableTransaction,
119 execution_config_utils::to_binary_config,
120 full_checkpoint_content::CheckpointData,
121 iota_system_state::{
122 IotaSystemState, IotaSystemStateTrait,
123 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
124 },
125 messages_consensus::{
126 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
127 SignedAuthorityCapabilitiesV1, check_total_jwk_size,
128 },
129 messages_grpc::HandleCapabilityNotificationRequestV1,
130 quorum_driver_types::QuorumDriverEffectsQueueResult,
131 supported_protocol_versions::SupportedProtocolVersions,
132 transaction::{Transaction, VerifiedCertificate},
133};
134use prometheus::Registry;
135#[cfg(msim)]
136pub use simulator::set_jwk_injector;
137#[cfg(msim)]
138use simulator::*;
139use tap::tap::TapFallible;
140use tokio::{
141 sync::{Mutex, broadcast, mpsc, watch},
142 task::{JoinHandle, JoinSet},
143};
144use tokio_util::sync::CancellationToken;
145use tower::ServiceBuilder;
146use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
147use typed_store::{
148 DBMetrics,
149 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
150};
151
152use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
153
154pub mod admin;
155mod handle;
156pub mod metrics;
157
158pub struct ValidatorComponents {
159 validator_server_handle: SpawnOnce,
160 validator_overload_monitor_handle: Option<JoinHandle<()>>,
161 consensus_manager: ConsensusManager,
162 consensus_store_pruner: ConsensusStorePruner,
163 consensus_adapter: Arc<ConsensusAdapter>,
164 checkpoint_service_tasks: JoinSet<()>,
166 checkpoint_metrics: Arc<CheckpointMetrics>,
167 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
168 validator_registry_id: RegistryID,
169}
170
171#[cfg(msim)]
172mod simulator {
173 use std::sync::atomic::AtomicBool;
174
175 use super::*;
176
177 pub(super) struct SimState {
178 pub sim_node: iota_simulator::runtime::NodeHandle,
179 pub sim_safe_mode_expected: AtomicBool,
180 _leak_detector: iota_simulator::NodeLeakDetector,
181 }
182
183 impl Default for SimState {
184 fn default() -> Self {
185 Self {
186 sim_node: iota_simulator::runtime::NodeHandle::current(),
187 sim_safe_mode_expected: AtomicBool::new(false),
188 _leak_detector: iota_simulator::NodeLeakDetector::new(),
189 }
190 }
191 }
192
193 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
194 + Send
195 + Sync
196 + 'static;
197
198 fn default_fetch_jwks(
199 _authority: AuthorityName,
200 _provider: &OIDCProvider,
201 ) -> IotaResult<Vec<(JwkId, JWK)>> {
202 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
203 parse_jwks(
205 iota_types::zk_login_util::DEFAULT_JWK_BYTES,
206 &OIDCProvider::Twitch,
207 )
208 .map_err(|_| IotaError::JWKRetrieval)
209 }
210
211 thread_local! {
212 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
213 }
214
215 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
216 JWK_INJECTOR.with(|injector| injector.borrow().clone())
217 }
218
219 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
220 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
221 }
222}
223
224#[derive(Clone)]
225pub struct ServerVersion {
226 pub bin: &'static str,
227 pub version: &'static str,
228}
229
230impl ServerVersion {
231 pub fn new(bin: &'static str, version: &'static str) -> Self {
232 Self { bin, version }
233 }
234}
235
236impl std::fmt::Display for ServerVersion {
237 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238 f.write_str(self.bin)?;
239 f.write_str("/")?;
240 f.write_str(self.version)
241 }
242}
243
244pub struct IotaNode {
245 config: NodeConfig,
246 validator_components: Mutex<Option<ValidatorComponents>>,
247 _http_server: Option<iota_http::ServerHandle>,
249 state: Arc<AuthorityState>,
250 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
251 registry_service: RegistryService,
252 metrics: Arc<IotaNodeMetrics>,
253
254 _discovery: discovery::Handle,
255 state_sync_handle: state_sync::Handle,
256 randomness_handle: randomness::Handle,
257 checkpoint_store: Arc<CheckpointStore>,
258 accumulator: Mutex<Option<Arc<StateAccumulator>>>,
259 connection_monitor_status: Arc<ConnectionMonitorStatus>,
260
261 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
263
264 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
267
268 backpressure_manager: Arc<BackpressureManager>,
269
270 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
271
272 #[cfg(msim)]
273 sim_state: SimState,
274
275 _state_archive_handle: Option<broadcast::Sender<()>>,
276
277 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
278 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
280
281 grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
283
284 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
290}
291
292impl fmt::Debug for IotaNode {
293 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
294 f.debug_struct("IotaNode")
295 .field("name", &self.state.name.concise())
296 .finish()
297 }
298}
299
300static MAX_JWK_KEYS_PER_FETCH: usize = 100;
301
302impl IotaNode {
303 pub async fn start(
304 config: NodeConfig,
305 registry_service: RegistryService,
306 ) -> Result<Arc<IotaNode>> {
307 Self::start_async(
308 config,
309 registry_service,
310 ServerVersion::new("iota-node", "unknown"),
311 )
312 .await
313 }
314
315 fn start_jwk_updater(
320 config: &NodeConfig,
321 metrics: Arc<IotaNodeMetrics>,
322 authority: AuthorityName,
323 epoch_store: Arc<AuthorityPerEpochStore>,
324 consensus_adapter: Arc<ConsensusAdapter>,
325 ) {
326 let epoch = epoch_store.epoch();
327
328 let supported_providers = config
329 .zklogin_oauth_providers
330 .get(&epoch_store.get_chain_identifier().chain())
331 .unwrap_or(&BTreeSet::new())
332 .iter()
333 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
334 .collect::<Vec<_>>();
335
336 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
337
338 info!(
339 ?fetch_interval,
340 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
341 );
342
343 fn validate_jwk(
344 metrics: &Arc<IotaNodeMetrics>,
345 provider: &OIDCProvider,
346 id: &JwkId,
347 jwk: &JWK,
348 ) -> bool {
349 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
350 warn!(
351 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
352 id.iss, provider
353 );
354 metrics
355 .invalid_jwks
356 .with_label_values(&[&provider.to_string()])
357 .inc();
358 return false;
359 };
360
361 if iss_provider != *provider {
362 warn!(
363 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
364 id.iss, provider, iss_provider
365 );
366 metrics
367 .invalid_jwks
368 .with_label_values(&[&provider.to_string()])
369 .inc();
370 return false;
371 }
372
373 if !check_total_jwk_size(id, jwk) {
374 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
375 metrics
376 .invalid_jwks
377 .with_label_values(&[&provider.to_string()])
378 .inc();
379 return false;
380 }
381
382 true
383 }
384
385 for p in supported_providers.into_iter() {
394 let provider_str = p.to_string();
395 let epoch_store = epoch_store.clone();
396 let consensus_adapter = consensus_adapter.clone();
397 let metrics = metrics.clone();
398 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
399 async move {
400 let mut seen = HashSet::new();
403 loop {
404 info!("fetching JWK for provider {:?}", p);
405 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
406 match Self::fetch_jwks(authority, &p).await {
407 Err(e) => {
408 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
409 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
410 tokio::time::sleep(Duration::from_secs(30)).await;
412 continue;
413 }
414 Ok(mut keys) => {
415 metrics.total_jwks
416 .with_label_values(&[&provider_str])
417 .inc_by(keys.len() as u64);
418
419 keys.retain(|(id, jwk)| {
420 validate_jwk(&metrics, &p, id, jwk) &&
421 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
422 seen.insert((id.clone(), jwk.clone()))
423 });
424
425 metrics.unique_jwks
426 .with_label_values(&[&provider_str])
427 .inc_by(keys.len() as u64);
428
429 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
432 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
433 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
434 }
435
436 for (id, jwk) in keys.into_iter() {
437 info!("Submitting JWK to consensus: {:?}", id);
438
439 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
440 consensus_adapter.submit(txn, None, &epoch_store)
441 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
442 .ok();
443 }
444 }
445 }
446 tokio::time::sleep(fetch_interval).await;
447 }
448 }
449 .instrument(error_span!("jwk_updater_task", epoch)),
450 ));
451 }
452 }
453
454 pub async fn start_async(
455 config: NodeConfig,
456 registry_service: RegistryService,
457 server_version: ServerVersion,
458 ) -> Result<Arc<IotaNode>> {
459 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
460 let mut config = config.clone();
461 if config.supported_protocol_versions.is_none() {
462 info!(
463 "populating config.supported_protocol_versions with default {:?}",
464 SupportedProtocolVersions::SYSTEM_DEFAULT
465 );
466 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
467 }
468
469 let run_with_range = config.run_with_range;
470 let is_validator = config.consensus_config().is_some();
471 let is_full_node = !is_validator;
472 let prometheus_registry = registry_service.default_registry();
473
474 info!(node =? config.authority_public_key(),
475 "Initializing iota-node listening on {}", config.network_address
476 );
477
478 let genesis = config.genesis()?.clone();
479
480 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
481 info!("IOTA chain identifier: {chain_identifier}");
482
483 let db_corrupted_path = &config.db_path().join("status");
485 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
486 panic!("Failed to check database corruption: {err}");
487 }
488
489 DBMetrics::init(&prometheus_registry);
491
492 iota_metrics::init_metrics(&prometheus_registry);
494 #[cfg(not(msim))]
497 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
498
499 register_hardware_metrics(®istry_service, &config.db_path)
501 .expect("Failed registering hardware metrics");
502 prometheus_registry
504 .register(iota_metrics::uptime_metric(
505 if is_validator {
506 "validator"
507 } else {
508 "fullnode"
509 },
510 server_version.version,
511 &chain_identifier.to_string(),
512 ))
513 .expect("Failed registering uptime metric");
514
515 let migration_tx_data = if genesis.contains_migrations() {
518 Some(config.load_migration_tx_data()?)
521 } else {
522 None
523 };
524
525 let secret = Arc::pin(config.authority_key_pair().copy());
526 let genesis_committee = genesis.committee()?;
527 let committee_store = Arc::new(CommitteeStore::new(
528 config.db_path().join("epochs"),
529 &genesis_committee,
530 None,
531 ));
532
533 let mut pruner_db = None;
534 if config
535 .authority_store_pruning_config
536 .enable_compaction_filter
537 {
538 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
539 &config.db_path().join("store"),
540 )));
541 }
542 let compaction_filter = pruner_db
543 .clone()
544 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
545
546 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
548 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
549 enable_write_stall,
550 compaction_filter,
551 };
552 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
553 &config.db_path().join("store"),
554 Some(perpetual_tables_options),
555 ));
556 let is_genesis = perpetual_tables
557 .database_is_empty()
558 .expect("Database read should not fail at init.");
559 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
560 let backpressure_manager =
561 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
562
563 let store = AuthorityStore::open(
564 perpetual_tables,
565 &genesis,
566 &config,
567 &prometheus_registry,
568 migration_tx_data.as_ref(),
569 )
570 .await?;
571
572 let cur_epoch = store.get_recovery_epoch_at_restart()?;
573 let committee = committee_store
574 .get_committee(&cur_epoch)?
575 .expect("Committee of the current epoch must exist");
576 let epoch_start_configuration = store
577 .get_epoch_start_configuration()?
578 .expect("EpochStartConfiguration of the current epoch must exist");
579 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
580 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
581
582 let cache_traits = build_execution_cache(
583 &config.execution_cache_config,
584 &epoch_start_configuration,
585 &prometheus_registry,
586 &store,
587 backpressure_manager.clone(),
588 );
589
590 let auth_agg = {
591 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
592 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
593 Arc::new(ArcSwap::new(Arc::new(
594 AuthorityAggregator::new_from_epoch_start_state(
595 epoch_start_configuration.epoch_start_state(),
596 &committee_store,
597 safe_client_metrics_base,
598 auth_agg_metrics,
599 ),
600 )))
601 };
602
603 let chain = match config.chain_override_for_testing {
604 Some(chain) => chain,
605 None => chain_identifier.chain(),
606 };
607
608 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
609 let epoch_store = AuthorityPerEpochStore::new(
610 config.authority_public_key(),
611 committee.clone(),
612 &config.db_path().join("store"),
613 Some(epoch_options.options),
614 EpochMetrics::new(®istry_service.default_registry()),
615 epoch_start_configuration,
616 cache_traits.backing_package_store.clone(),
617 cache_traits.object_store.clone(),
618 cache_metrics,
619 signature_verifier_metrics,
620 &config.expensive_safety_check_config,
621 (chain_identifier, chain),
622 checkpoint_store
623 .get_highest_executed_checkpoint_seq_number()
624 .expect("checkpoint store read cannot fail")
625 .unwrap_or(0),
626 )?;
627
628 info!("created epoch store");
629
630 replay_log!(
631 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
632 epoch_store.epoch(),
633 epoch_store.protocol_config()
634 );
635
636 if is_genesis {
638 info!("checking IOTA conservation at genesis");
639 cache_traits
644 .reconfig_api
645 .try_expensive_check_iota_conservation(&epoch_store, None)
646 .expect("IOTA conservation check cannot fail at genesis");
647 }
648
649 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
650 let default_buffer_stake = epoch_store
651 .protocol_config()
652 .buffer_stake_for_protocol_upgrade_bps();
653 if effective_buffer_stake != default_buffer_stake {
654 warn!(
655 ?effective_buffer_stake,
656 ?default_buffer_stake,
657 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
658 );
659 }
660
661 checkpoint_store.insert_genesis_checkpoint(
662 genesis.checkpoint(),
663 genesis.checkpoint_contents().clone(),
664 &epoch_store,
665 );
666
667 unmark_db_corruption(db_corrupted_path)?;
669
670 info!("creating state sync store");
671 let state_sync_store = RocksDbStore::new(
672 cache_traits.clone(),
673 committee_store.clone(),
674 checkpoint_store.clone(),
675 );
676
677 let index_store = if is_full_node && config.enable_index_processing {
678 info!("creating index store");
679 Some(Arc::new(IndexStore::new(
680 config.db_path().join("indexes"),
681 &prometheus_registry,
682 epoch_store
683 .protocol_config()
684 .max_move_identifier_len_as_option(),
685 )))
686 } else {
687 None
688 };
689
690 let grpc_indexes_store = if is_full_node && config.enable_grpc_api {
691 Some(Arc::new(
692 GrpcIndexesStore::new(
693 config.db_path().join(GRPC_INDEXES_DIR),
694 Arc::clone(&store),
695 &checkpoint_store,
696 )
697 .await,
698 ))
699 } else {
700 None
701 };
702
703 info!("creating archive reader");
704 let archive_readers =
709 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
710 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
711 let (randomness_tx, randomness_rx) = mpsc::channel(
712 config
713 .p2p_config
714 .randomness
715 .clone()
716 .unwrap_or_default()
717 .mailbox_capacity(),
718 );
719 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
720 Self::create_p2p_network(
721 &config,
722 state_sync_store.clone(),
723 chain_identifier,
724 trusted_peer_change_rx,
725 archive_readers.clone(),
726 randomness_tx,
727 &prometheus_registry,
728 )?;
729
730 send_trusted_peer_change(
733 &config,
734 &trusted_peer_change_tx,
735 epoch_store.epoch_start_state(),
736 );
737
738 info!("start state archival");
739 let state_archive_handle =
741 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
742 .await?;
743
744 info!("start snapshot upload");
745 let state_snapshot_handle =
747 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
748
749 info!("start db checkpoint");
751 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
752 &config,
753 &prometheus_registry,
754 state_snapshot_handle.is_some(),
755 )?;
756
757 let mut genesis_objects = genesis.objects().to_vec();
758 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
759 genesis_objects.extend(migration_tx_data.get_objects());
760 }
761
762 let authority_name = config.authority_public_key();
763 let validator_tx_finalizer =
764 config
765 .enable_validator_tx_finalizer
766 .then_some(Arc::new(ValidatorTxFinalizer::new(
767 auth_agg.clone(),
768 authority_name,
769 &prometheus_registry,
770 )));
771
772 info!("create authority state");
773 let state = AuthorityState::new(
774 authority_name,
775 secret,
776 config.supported_protocol_versions.unwrap(),
777 store.clone(),
778 cache_traits.clone(),
779 epoch_store.clone(),
780 committee_store.clone(),
781 index_store.clone(),
782 grpc_indexes_store,
783 checkpoint_store.clone(),
784 &prometheus_registry,
785 &genesis_objects,
786 &db_checkpoint_config,
787 config.clone(),
788 archive_readers,
789 validator_tx_finalizer,
790 chain_identifier,
791 pruner_db,
792 )
793 .await;
794
795 if epoch_store.epoch() == 0 {
797 let genesis_tx = &genesis.transaction();
798 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
799 Self::execute_transaction_immediately_at_zero_epoch(
801 &state,
802 &epoch_store,
803 genesis_tx,
804 span,
805 )
806 .await;
807
808 if let Some(migration_tx_data) = migration_tx_data {
810 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
811 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
812 Self::execute_transaction_immediately_at_zero_epoch(
813 &state,
814 &epoch_store,
815 tx,
816 span,
817 )
818 .await;
819 }
820 }
821 }
822
823 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
826
827 if config
828 .expensive_safety_check_config
829 .enable_secondary_index_checks()
830 {
831 if let Some(indexes) = state.indexes.clone() {
832 iota_core::verify_indexes::verify_indexes(
833 state.get_accumulator_store().as_ref(),
834 indexes,
835 )
836 .expect("secondary indexes are inconsistent");
837 }
838 }
839
840 let (end_of_epoch_channel, end_of_epoch_receiver) =
841 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
842
843 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
844 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
845 auth_agg.load_full(),
846 state.clone(),
847 end_of_epoch_receiver,
848 &config.db_path(),
849 &prometheus_registry,
850 )))
851 } else {
852 None
853 };
854
855 let http_server = build_http_server(
856 state.clone(),
857 &transaction_orchestrator.clone(),
858 &config,
859 &prometheus_registry,
860 )
861 .await?;
862
863 let accumulator = Arc::new(StateAccumulator::new(
864 cache_traits.accumulator_store.clone(),
865 StateAccumulatorMetrics::new(&prometheus_registry),
866 ));
867
868 let authority_names_to_peer_ids = epoch_store
869 .epoch_start_state()
870 .get_authority_names_to_peer_ids();
871
872 let network_connection_metrics =
873 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
874
875 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
876
877 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
878 p2p_network.downgrade(),
879 network_connection_metrics,
880 HashMap::new(),
881 None,
882 );
883
884 let connection_monitor_status = ConnectionMonitorStatus {
885 connection_statuses,
886 authority_names_to_peer_ids,
887 };
888
889 let connection_monitor_status = Arc::new(connection_monitor_status);
890 let iota_node_metrics =
891 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
892
893 iota_node_metrics
894 .binary_max_protocol_version
895 .set(ProtocolVersion::MAX.as_u64() as i64);
896 iota_node_metrics
897 .configured_max_protocol_version
898 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
899
900 let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
904 transaction_orchestrator
905 .clone()
906 .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
907
908 let grpc_server_handle = build_grpc_server(
909 &config,
910 state.clone(),
911 state_sync_store.clone(),
912 executor,
913 &prometheus_registry,
914 server_version,
915 )
916 .await?;
917
918 let validator_components = if state.is_committee_validator(&epoch_store) {
919 let (components, _) = futures::join!(
920 Self::construct_validator_components(
921 config.clone(),
922 state.clone(),
923 committee,
924 epoch_store.clone(),
925 checkpoint_store.clone(),
926 state_sync_handle.clone(),
927 randomness_handle.clone(),
928 Arc::downgrade(&accumulator),
929 backpressure_manager.clone(),
930 connection_monitor_status.clone(),
931 ®istry_service,
932 iota_node_metrics.clone(),
933 ),
934 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
935 );
936 let mut components = components?;
937
938 components.consensus_adapter.submit_recovered(&epoch_store);
939
940 components.validator_server_handle = components.validator_server_handle.start().await;
942
943 Some(components)
944 } else {
945 None
946 };
947
948 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
950
951 let node = Self {
952 config,
953 validator_components: Mutex::new(validator_components),
954 _http_server: http_server,
955 state,
956 transaction_orchestrator,
957 registry_service,
958 metrics: iota_node_metrics,
959
960 _discovery: discovery_handle,
961 state_sync_handle,
962 randomness_handle,
963 checkpoint_store,
964 accumulator: Mutex::new(Some(accumulator)),
965 end_of_epoch_channel,
966 connection_monitor_status,
967 trusted_peer_change_tx,
968 backpressure_manager,
969
970 _db_checkpoint_handle: db_checkpoint_handle,
971
972 #[cfg(msim)]
973 sim_state: Default::default(),
974
975 _state_archive_handle: state_archive_handle,
976 _state_snapshot_uploader_handle: state_snapshot_handle,
977 shutdown_channel_tx: shutdown_channel,
978
979 grpc_server_handle: Mutex::new(grpc_server_handle),
980
981 auth_agg,
982 };
983
984 info!("IotaNode started!");
985 let node = Arc::new(node);
986 let node_copy = node.clone();
987 spawn_monitored_task!(async move {
988 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
989 if let Err(error) = result {
990 warn!("Reconfiguration finished with error {:?}", error);
991 }
992 });
993
994 Ok(node)
995 }
996
997 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
998 self.end_of_epoch_channel.subscribe()
999 }
1000
1001 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
1002 self.shutdown_channel_tx.subscribe()
1003 }
1004
1005 pub fn current_epoch_for_testing(&self) -> EpochId {
1006 self.state.current_epoch_for_testing()
1007 }
1008
1009 pub fn db_checkpoint_path(&self) -> PathBuf {
1010 self.config.db_checkpoint_path()
1011 }
1012
1013 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
1015 info!("close_epoch (current epoch = {})", epoch_store.epoch());
1016 self.validator_components
1017 .lock()
1018 .await
1019 .as_ref()
1020 .ok_or_else(|| IotaError::from("Node is not a validator"))?
1021 .consensus_adapter
1022 .close_epoch(epoch_store);
1023 Ok(())
1024 }
1025
1026 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
1027 self.state
1028 .clear_override_protocol_upgrade_buffer_stake(epoch)
1029 }
1030
1031 pub fn set_override_protocol_upgrade_buffer_stake(
1032 &self,
1033 epoch: EpochId,
1034 buffer_stake_bps: u64,
1035 ) -> IotaResult {
1036 self.state
1037 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1038 }
1039
1040 pub async fn close_epoch_for_testing(&self) -> IotaResult {
1043 let epoch_store = self.state.epoch_store_for_testing();
1044 self.close_epoch(&epoch_store).await
1045 }
1046
1047 async fn start_state_archival(
1048 config: &NodeConfig,
1049 prometheus_registry: &Registry,
1050 state_sync_store: RocksDbStore,
1051 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1052 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1053 let local_store_config = ObjectStoreConfig {
1054 object_store: Some(ObjectStoreType::File),
1055 directory: Some(config.archive_path()),
1056 ..Default::default()
1057 };
1058 let archive_writer = ArchiveWriter::new(
1059 local_store_config,
1060 remote_store_config.clone(),
1061 FileCompression::Zstd,
1062 StorageFormat::Blob,
1063 Duration::from_secs(600),
1064 256 * 1024 * 1024,
1065 prometheus_registry,
1066 )
1067 .await?;
1068 Ok(Some(archive_writer.start(state_sync_store).await?))
1069 } else {
1070 Ok(None)
1071 }
1072 }
1073
1074 fn start_state_snapshot(
1077 config: &NodeConfig,
1078 prometheus_registry: &Registry,
1079 checkpoint_store: Arc<CheckpointStore>,
1080 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1081 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1082 let snapshot_uploader = StateSnapshotUploader::new(
1083 &config.db_checkpoint_path(),
1084 &config.snapshot_path(),
1085 remote_store_config.clone(),
1086 60,
1087 prometheus_registry,
1088 checkpoint_store,
1089 )?;
1090 Ok(Some(snapshot_uploader.start()))
1091 } else {
1092 Ok(None)
1093 }
1094 }
1095
1096 fn start_db_checkpoint(
1097 config: &NodeConfig,
1098 prometheus_registry: &Registry,
1099 state_snapshot_enabled: bool,
1100 ) -> Result<(
1101 DBCheckpointConfig,
1102 Option<tokio::sync::broadcast::Sender<()>>,
1103 )> {
1104 let checkpoint_path = Some(
1105 config
1106 .db_checkpoint_config
1107 .checkpoint_path
1108 .clone()
1109 .unwrap_or_else(|| config.db_checkpoint_path()),
1110 );
1111 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1112 DBCheckpointConfig {
1113 checkpoint_path,
1114 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1115 true
1116 } else {
1117 config
1118 .db_checkpoint_config
1119 .perform_db_checkpoints_at_epoch_end
1120 },
1121 ..config.db_checkpoint_config.clone()
1122 }
1123 } else {
1124 config.db_checkpoint_config.clone()
1125 };
1126
1127 match (
1128 db_checkpoint_config.object_store_config.as_ref(),
1129 state_snapshot_enabled,
1130 ) {
1131 (None, false) => Ok((db_checkpoint_config, None)),
1136 (_, _) => {
1137 let handler = DBCheckpointHandler::new(
1138 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1139 db_checkpoint_config.object_store_config.as_ref(),
1140 60,
1141 db_checkpoint_config
1142 .prune_and_compact_before_upload
1143 .unwrap_or(true),
1144 config.authority_store_pruning_config.clone(),
1145 prometheus_registry,
1146 state_snapshot_enabled,
1147 )?;
1148 Ok((
1149 db_checkpoint_config,
1150 Some(DBCheckpointHandler::start(handler)),
1151 ))
1152 }
1153 }
1154 }
1155
1156 fn create_p2p_network(
1157 config: &NodeConfig,
1158 state_sync_store: RocksDbStore,
1159 chain_identifier: ChainIdentifier,
1160 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1161 archive_readers: ArchiveReaderBalancer,
1162 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1163 prometheus_registry: &Registry,
1164 ) -> Result<(
1165 Network,
1166 discovery::Handle,
1167 state_sync::Handle,
1168 randomness::Handle,
1169 )> {
1170 let (state_sync, state_sync_server) = state_sync::Builder::new()
1171 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1172 .store(state_sync_store)
1173 .archive_readers(archive_readers)
1174 .with_metrics(prometheus_registry)
1175 .build();
1176
1177 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1178 .config(config.p2p_config.clone())
1179 .build();
1180
1181 let (randomness, randomness_router) =
1182 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1183 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1184 .with_metrics(prometheus_registry)
1185 .build();
1186
1187 let p2p_network = {
1188 let routes = anemo::Router::new()
1189 .add_rpc_service(discovery_server)
1190 .add_rpc_service(state_sync_server);
1191 let routes = routes.merge(randomness_router);
1192
1193 let inbound_network_metrics =
1194 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1195 let outbound_network_metrics =
1196 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1197
1198 let service = ServiceBuilder::new()
1199 .layer(
1200 TraceLayer::new_for_server_errors()
1201 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1202 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1203 )
1204 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1205 Arc::new(inbound_network_metrics),
1206 config.p2p_config.excessive_message_size(),
1207 )))
1208 .service(routes);
1209
1210 let outbound_layer = ServiceBuilder::new()
1211 .layer(
1212 TraceLayer::new_for_client_and_server_errors()
1213 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1214 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1215 )
1216 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1217 Arc::new(outbound_network_metrics),
1218 config.p2p_config.excessive_message_size(),
1219 )))
1220 .into_inner();
1221
1222 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1223 anemo_config.max_frame_size = Some(1 << 30);
1226
1227 let mut quic_config = anemo_config.quic.unwrap_or_default();
1230 if quic_config.socket_send_buffer_size.is_none() {
1231 quic_config.socket_send_buffer_size = Some(20 << 20);
1232 }
1233 if quic_config.socket_receive_buffer_size.is_none() {
1234 quic_config.socket_receive_buffer_size = Some(20 << 20);
1235 }
1236 quic_config.allow_failed_socket_buffer_size_setting = true;
1237
1238 if quic_config.max_concurrent_bidi_streams.is_none() {
1241 quic_config.max_concurrent_bidi_streams = Some(500);
1242 }
1243 if quic_config.max_concurrent_uni_streams.is_none() {
1244 quic_config.max_concurrent_uni_streams = Some(500);
1245 }
1246 if quic_config.stream_receive_window.is_none() {
1247 quic_config.stream_receive_window = Some(100 << 20);
1248 }
1249 if quic_config.receive_window.is_none() {
1250 quic_config.receive_window = Some(200 << 20);
1251 }
1252 if quic_config.send_window.is_none() {
1253 quic_config.send_window = Some(200 << 20);
1254 }
1255 if quic_config.crypto_buffer_size.is_none() {
1256 quic_config.crypto_buffer_size = Some(1 << 20);
1257 }
1258 if quic_config.max_idle_timeout_ms.is_none() {
1259 quic_config.max_idle_timeout_ms = Some(30_000);
1260 }
1261 if quic_config.keep_alive_interval_ms.is_none() {
1262 quic_config.keep_alive_interval_ms = Some(5_000);
1263 }
1264 anemo_config.quic = Some(quic_config);
1265
1266 let server_name = format!("iota-{chain_identifier}");
1267 let network = Network::bind(config.p2p_config.listen_address)
1268 .server_name(&server_name)
1269 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1270 .config(anemo_config)
1271 .outbound_request_layer(outbound_layer)
1272 .start(service)?;
1273 info!(
1274 server_name = server_name,
1275 "P2p network started on {}",
1276 network.local_addr()
1277 );
1278
1279 network
1280 };
1281
1282 let discovery_handle =
1283 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1284 let state_sync_handle = state_sync.start(p2p_network.clone());
1285 let randomness_handle = randomness.start(p2p_network.clone());
1286
1287 Ok((
1288 p2p_network,
1289 discovery_handle,
1290 state_sync_handle,
1291 randomness_handle,
1292 ))
1293 }
1294
1295 async fn construct_validator_components(
1298 config: NodeConfig,
1299 state: Arc<AuthorityState>,
1300 committee: Arc<Committee>,
1301 epoch_store: Arc<AuthorityPerEpochStore>,
1302 checkpoint_store: Arc<CheckpointStore>,
1303 state_sync_handle: state_sync::Handle,
1304 randomness_handle: randomness::Handle,
1305 accumulator: Weak<StateAccumulator>,
1306 backpressure_manager: Arc<BackpressureManager>,
1307 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1308 registry_service: &RegistryService,
1309 iota_node_metrics: Arc<IotaNodeMetrics>,
1310 ) -> Result<ValidatorComponents> {
1311 let mut config_clone = config.clone();
1312 let consensus_config = config_clone
1313 .consensus_config
1314 .as_mut()
1315 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1316 let validator_registry = Registry::new();
1317 let validator_registry_id = registry_service.add(validator_registry.clone());
1318
1319 let client = Arc::new(UpdatableConsensusClient::new());
1320 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1321 &committee,
1322 consensus_config,
1323 state.name,
1324 connection_monitor_status.clone(),
1325 &validator_registry,
1326 client.clone(),
1327 checkpoint_store.clone(),
1328 ));
1329 let consensus_manager = ConsensusManager::new(
1330 &config,
1331 consensus_config,
1332 registry_service,
1333 &validator_registry,
1334 client,
1335 );
1336
1337 let consensus_store_pruner = ConsensusStorePruner::new(
1340 consensus_manager.get_storage_base_path(),
1341 consensus_config.db_retention_epochs(),
1342 consensus_config.db_pruner_period(),
1343 &validator_registry,
1344 );
1345
1346 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1347 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1348
1349 let validator_server_handle = Self::start_grpc_validator_service(
1350 &config,
1351 state.clone(),
1352 consensus_adapter.clone(),
1353 &validator_registry,
1354 )
1355 .await?;
1356
1357 let validator_overload_monitor_handle = if config
1360 .authority_overload_config
1361 .max_load_shedding_percentage
1362 > 0
1363 {
1364 let authority_state = Arc::downgrade(&state);
1365 let overload_config = config.authority_overload_config.clone();
1366 fail_point!("starting_overload_monitor");
1367 Some(spawn_monitored_task!(overload_monitor(
1368 authority_state,
1369 overload_config,
1370 )))
1371 } else {
1372 None
1373 };
1374
1375 Self::start_epoch_specific_validator_components(
1376 &config,
1377 state.clone(),
1378 consensus_adapter,
1379 checkpoint_store,
1380 epoch_store,
1381 state_sync_handle,
1382 randomness_handle,
1383 consensus_manager,
1384 consensus_store_pruner,
1385 accumulator,
1386 backpressure_manager,
1387 validator_server_handle,
1388 validator_overload_monitor_handle,
1389 checkpoint_metrics,
1390 iota_node_metrics,
1391 iota_tx_validator_metrics,
1392 validator_registry_id,
1393 )
1394 .await
1395 }
1396
1397 async fn start_epoch_specific_validator_components(
1400 config: &NodeConfig,
1401 state: Arc<AuthorityState>,
1402 consensus_adapter: Arc<ConsensusAdapter>,
1403 checkpoint_store: Arc<CheckpointStore>,
1404 epoch_store: Arc<AuthorityPerEpochStore>,
1405 state_sync_handle: state_sync::Handle,
1406 randomness_handle: randomness::Handle,
1407 consensus_manager: ConsensusManager,
1408 consensus_store_pruner: ConsensusStorePruner,
1409 accumulator: Weak<StateAccumulator>,
1410 backpressure_manager: Arc<BackpressureManager>,
1411 validator_server_handle: SpawnOnce,
1412 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1413 checkpoint_metrics: Arc<CheckpointMetrics>,
1414 iota_node_metrics: Arc<IotaNodeMetrics>,
1415 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1416 validator_registry_id: RegistryID,
1417 ) -> Result<ValidatorComponents> {
1418 let checkpoint_service = Self::build_checkpoint_service(
1419 config,
1420 consensus_adapter.clone(),
1421 checkpoint_store.clone(),
1422 epoch_store.clone(),
1423 state.clone(),
1424 state_sync_handle,
1425 accumulator,
1426 checkpoint_metrics.clone(),
1427 );
1428
1429 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1434
1435 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1436
1437 let randomness_manager = RandomnessManager::try_new(
1438 Arc::downgrade(&epoch_store),
1439 Box::new(consensus_adapter.clone()),
1440 randomness_handle,
1441 config.authority_key_pair(),
1442 )
1443 .await;
1444 if let Some(randomness_manager) = randomness_manager {
1445 epoch_store
1446 .set_randomness_manager(randomness_manager)
1447 .await?;
1448 }
1449
1450 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1451 state.clone(),
1452 checkpoint_service.clone(),
1453 epoch_store.clone(),
1454 low_scoring_authorities,
1455 backpressure_manager,
1456 );
1457
1458 info!("Starting consensus manager");
1459
1460 consensus_manager
1461 .start(
1462 config,
1463 epoch_store.clone(),
1464 consensus_handler_initializer,
1465 IotaTxValidator::new(
1466 epoch_store.clone(),
1467 checkpoint_service.clone(),
1468 state.transaction_manager().clone(),
1469 iota_tx_validator_metrics.clone(),
1470 ),
1471 )
1472 .await;
1473
1474 info!("Spawning checkpoint service");
1475 let checkpoint_service_tasks = checkpoint_service.spawn().await;
1476
1477 if epoch_store.authenticator_state_enabled() {
1478 Self::start_jwk_updater(
1479 config,
1480 iota_node_metrics,
1481 state.name,
1482 epoch_store.clone(),
1483 consensus_adapter.clone(),
1484 );
1485 }
1486
1487 Ok(ValidatorComponents {
1488 validator_server_handle,
1489 validator_overload_monitor_handle,
1490 consensus_manager,
1491 consensus_store_pruner,
1492 consensus_adapter,
1493 checkpoint_service_tasks,
1494 checkpoint_metrics,
1495 iota_tx_validator_metrics,
1496 validator_registry_id,
1497 })
1498 }
1499
1500 fn build_checkpoint_service(
1507 config: &NodeConfig,
1508 consensus_adapter: Arc<ConsensusAdapter>,
1509 checkpoint_store: Arc<CheckpointStore>,
1510 epoch_store: Arc<AuthorityPerEpochStore>,
1511 state: Arc<AuthorityState>,
1512 state_sync_handle: state_sync::Handle,
1513 accumulator: Weak<StateAccumulator>,
1514 checkpoint_metrics: Arc<CheckpointMetrics>,
1515 ) -> Arc<CheckpointService> {
1516 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1517 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1518
1519 debug!(
1520 "Starting checkpoint service with epoch start timestamp {}
1521 and epoch duration {}",
1522 epoch_start_timestamp_ms, epoch_duration_ms
1523 );
1524
1525 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1526 sender: consensus_adapter,
1527 signer: state.secret.clone(),
1528 authority: config.authority_public_key(),
1529 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1530 .checked_add(epoch_duration_ms)
1531 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1532 metrics: checkpoint_metrics.clone(),
1533 });
1534
1535 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1536 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1537 let max_checkpoint_size_bytes =
1538 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1539
1540 CheckpointService::build(
1541 state.clone(),
1542 checkpoint_store,
1543 epoch_store,
1544 state.get_transaction_cache_reader().clone(),
1545 accumulator,
1546 checkpoint_output,
1547 Box::new(certified_checkpoint_output),
1548 checkpoint_metrics,
1549 max_tx_per_checkpoint,
1550 max_checkpoint_size_bytes,
1551 )
1552 }
1553
1554 fn construct_consensus_adapter(
1555 committee: &Committee,
1556 consensus_config: &ConsensusConfig,
1557 authority: AuthorityName,
1558 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1559 prometheus_registry: &Registry,
1560 consensus_client: Arc<dyn ConsensusClient>,
1561 checkpoint_store: Arc<CheckpointStore>,
1562 ) -> ConsensusAdapter {
1563 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1564 ConsensusAdapter::new(
1568 consensus_client,
1569 checkpoint_store,
1570 authority,
1571 connection_monitor_status,
1572 consensus_config.max_pending_transactions(),
1573 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1574 consensus_config.max_submit_position,
1575 consensus_config.submit_delay_step_override(),
1576 ca_metrics,
1577 )
1578 }
1579
1580 async fn start_grpc_validator_service(
1581 config: &NodeConfig,
1582 state: Arc<AuthorityState>,
1583 consensus_adapter: Arc<ConsensusAdapter>,
1584 prometheus_registry: &Registry,
1585 ) -> Result<SpawnOnce> {
1586 let validator_service = ValidatorService::new(
1587 state,
1588 consensus_adapter,
1589 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1590 TrafficControllerMetrics::new(prometheus_registry),
1591 config.policy_config.clone(),
1592 config.firewall_config.clone(),
1593 );
1594
1595 let mut server_conf = iota_network_stack::config::Config::new();
1596 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1597 server_conf.load_shed = config.grpc_load_shed;
1598 let server_builder =
1599 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1600 .add_service(ValidatorServer::new(validator_service));
1601
1602 let tls_config = iota_tls::create_rustls_server_config(
1603 config.network_key_pair().copy().private(),
1604 IOTA_TLS_SERVER_NAME.to_string(),
1605 );
1606
1607 let network_address = config.network_address().clone();
1608
1609 let bind_future = async move {
1610 let server = server_builder
1611 .bind(&network_address, Some(tls_config))
1612 .await
1613 .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1614
1615 let local_addr = server.local_addr();
1616 info!("Listening to traffic on {local_addr}");
1617
1618 Ok(server)
1619 };
1620
1621 Ok(SpawnOnce::new(bind_future))
1622 }
1623
1624 async fn reexecute_pending_consensus_certs(
1642 epoch_store: &Arc<AuthorityPerEpochStore>,
1643 state: &Arc<AuthorityState>,
1644 ) {
1645 let mut pending_consensus_certificates = Vec::new();
1646 let mut additional_certs = Vec::new();
1647
1648 for tx in epoch_store.get_all_pending_consensus_transactions() {
1649 match tx.kind {
1650 ConsensusTransactionKind::CertifiedTransaction(tx)
1653 if !tx.contains_shared_object() =>
1654 {
1655 let tx = *tx;
1656 let tx = VerifiedExecutableTransaction::new_from_certificate(
1659 VerifiedCertificate::new_unchecked(tx),
1660 );
1661 if let Some(fx_digest) = epoch_store
1664 .get_signed_effects_digest(tx.digest())
1665 .expect("db error")
1666 {
1667 pending_consensus_certificates.push((tx, fx_digest));
1668 } else {
1669 additional_certs.push(tx);
1670 }
1671 }
1672 _ => (),
1673 }
1674 }
1675
1676 let digests = pending_consensus_certificates
1677 .iter()
1678 .map(|(tx, _)| *tx.digest())
1679 .collect::<Vec<_>>();
1680
1681 info!(
1682 "reexecuting {} pending consensus certificates: {:?}",
1683 digests.len(),
1684 digests
1685 );
1686
1687 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1688 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1689
1690 let timeout = if cfg!(msim) { 120 } else { 60 };
1696 if tokio::time::timeout(
1697 std::time::Duration::from_secs(timeout),
1698 state
1699 .get_transaction_cache_reader()
1700 .try_notify_read_executed_effects_digests(&digests),
1701 )
1702 .await
1703 .is_err()
1704 {
1705 if let Ok(executed_effects_digests) = state
1707 .get_transaction_cache_reader()
1708 .try_multi_get_executed_effects_digests(&digests)
1709 {
1710 let pending_digests = digests
1711 .iter()
1712 .zip(executed_effects_digests.iter())
1713 .filter_map(|(digest, executed_effects_digest)| {
1714 if executed_effects_digest.is_none() {
1715 Some(digest)
1716 } else {
1717 None
1718 }
1719 })
1720 .collect::<Vec<_>>();
1721 debug_fatal!(
1722 "Timed out waiting for effects digests to be executed: {:?}",
1723 pending_digests
1724 );
1725 } else {
1726 debug_fatal!(
1727 "Timed out waiting for effects digests to be executed, digests not found"
1728 );
1729 }
1730 }
1731 }
1732
1733 pub fn state(&self) -> Arc<AuthorityState> {
1734 self.state.clone()
1735 }
1736
1737 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1739 self.state.reference_gas_price_for_testing()
1740 }
1741
1742 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1743 self.state.committee_store().clone()
1744 }
1745
1746 pub fn clone_authority_aggregator(
1756 &self,
1757 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1758 self.transaction_orchestrator
1759 .as_ref()
1760 .map(|to| to.clone_authority_aggregator())
1761 }
1762
1763 pub fn transaction_orchestrator(
1764 &self,
1765 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1766 self.transaction_orchestrator.clone()
1767 }
1768
1769 pub fn subscribe_to_transaction_orchestrator_effects(
1770 &self,
1771 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1772 self.transaction_orchestrator
1773 .as_ref()
1774 .map(|to| to.subscribe_to_effects_queue())
1775 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1776 }
1777
1778 pub async fn monitor_reconfiguration(
1784 self: Arc<Self>,
1785 mut epoch_store: Arc<AuthorityPerEpochStore>,
1786 ) -> Result<()> {
1787 let checkpoint_executor_metrics =
1788 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1789
1790 loop {
1791 let mut accumulator_guard = self.accumulator.lock().await;
1792 let accumulator = accumulator_guard.take().unwrap();
1793 info!(
1794 "Creating checkpoint executor for epoch {}",
1795 epoch_store.epoch()
1796 );
1797
1798 let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1800 guard.as_ref().map(|handle| {
1801 let tx = handle.checkpoint_data_broadcaster().clone();
1802 Box::new(move |data: &CheckpointData| {
1803 tx.send_traced(data);
1804 }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1805 })
1806 } else {
1807 None
1808 };
1809
1810 let checkpoint_executor = CheckpointExecutor::new(
1811 epoch_store.clone(),
1812 self.checkpoint_store.clone(),
1813 self.state.clone(),
1814 accumulator.clone(),
1815 self.backpressure_manager.clone(),
1816 self.config.checkpoint_executor_config.clone(),
1817 checkpoint_executor_metrics.clone(),
1818 data_sender,
1819 );
1820
1821 let run_with_range = self.config.run_with_range;
1822
1823 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1824
1825 self.metrics
1827 .current_protocol_version
1828 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1829
1830 if let Some(components) = &*self.validator_components.lock().await {
1832 tokio::time::sleep(Duration::from_millis(1)).await;
1834
1835 let config = cur_epoch_store.protocol_config();
1836 let binary_config = to_binary_config(config);
1837 let transaction = ConsensusTransaction::new_capability_notification_v1(
1838 AuthorityCapabilitiesV1::new(
1839 self.state.name,
1840 cur_epoch_store.get_chain_identifier().chain(),
1841 self.config
1842 .supported_protocol_versions
1843 .expect("Supported versions should be populated")
1844 .truncate_below(config.version),
1846 self.state
1847 .get_available_system_packages(&binary_config)
1848 .await,
1849 ),
1850 );
1851 info!(?transaction, "submitting capabilities to consensus");
1852 components
1853 .consensus_adapter
1854 .submit(transaction, None, &cur_epoch_store)?;
1855 } else if self.state.is_active_validator(&cur_epoch_store)
1856 && cur_epoch_store
1857 .protocol_config()
1858 .track_non_committee_eligible_validators()
1859 {
1860 let epoch_store = cur_epoch_store.clone();
1864 let node_clone = self.clone();
1865 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1866 node_clone
1867 .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1868 .instrument(trace_span!(
1869 "send_signed_capability_notification_to_committee_with_retry"
1870 ))
1871 .await;
1872 }));
1873 }
1874
1875 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1876
1877 if stop_condition == StopReason::RunWithRangeCondition {
1878 IotaNode::shutdown(&self).await;
1879 self.shutdown_channel_tx
1880 .send(run_with_range)
1881 .expect("RunWithRangeCondition met but failed to send shutdown message");
1882 return Ok(());
1883 }
1884
1885 let latest_system_state = self
1887 .state
1888 .get_object_cache_reader()
1889 .try_get_iota_system_state_object_unsafe()
1890 .expect("Read IOTA System State object cannot fail");
1891
1892 #[cfg(msim)]
1893 if !self
1894 .sim_state
1895 .sim_safe_mode_expected
1896 .load(Ordering::Relaxed)
1897 {
1898 debug_assert!(!latest_system_state.safe_mode());
1899 }
1900
1901 #[cfg(not(msim))]
1902 debug_assert!(!latest_system_state.safe_mode());
1903
1904 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1905 if self.state.is_fullnode(&cur_epoch_store) {
1906 warn!(
1907 "Failed to send end of epoch notification to subscriber: {:?}",
1908 err
1909 );
1910 }
1911 }
1912
1913 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1914 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1915
1916 self.auth_agg.store(Arc::new(
1917 self.auth_agg
1918 .load()
1919 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1920 ));
1921
1922 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1923 let next_epoch = next_epoch_committee.epoch();
1924 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1925
1926 info!(
1927 next_epoch,
1928 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1929 );
1930
1931 fail_point_async!("reconfig_delay");
1932
1933 let authority_names_to_peer_ids =
1938 new_epoch_start_state.get_authority_names_to_peer_ids();
1939 self.connection_monitor_status
1940 .update_mapping_for_epoch(authority_names_to_peer_ids);
1941
1942 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1943
1944 send_trusted_peer_change(
1945 &self.config,
1946 &self.trusted_peer_change_tx,
1947 &new_epoch_start_state,
1948 );
1949
1950 let mut validator_components_lock_guard = self.validator_components.lock().await;
1951
1952 let new_epoch_store = self
1956 .reconfigure_state(
1957 &self.state,
1958 &cur_epoch_store,
1959 next_epoch_committee.clone(),
1960 new_epoch_start_state,
1961 accumulator.clone(),
1962 )
1963 .await?;
1964
1965 let new_validator_components = if let Some(ValidatorComponents {
1966 validator_server_handle,
1967 validator_overload_monitor_handle,
1968 consensus_manager,
1969 consensus_store_pruner,
1970 consensus_adapter,
1971 mut checkpoint_service_tasks,
1972 checkpoint_metrics,
1973 iota_tx_validator_metrics,
1974 validator_registry_id,
1975 }) = validator_components_lock_guard.take()
1976 {
1977 info!("Reconfiguring the validator.");
1978 checkpoint_service_tasks.abort_all();
1983 while let Some(result) = checkpoint_service_tasks.join_next().await {
1984 if let Err(err) = result {
1985 if err.is_panic() {
1986 std::panic::resume_unwind(err.into_panic());
1987 }
1988 warn!("Error in checkpoint service task: {:?}", err);
1989 }
1990 }
1991 info!("Checkpoint service has shut down.");
1992
1993 consensus_manager.shutdown().await;
1994 info!("Consensus has shut down.");
1995
1996 info!("Epoch store finished reconfiguration.");
1997
1998 let accumulator_metrics = Arc::into_inner(accumulator)
2001 .expect("Accumulator should have no other references at this point")
2002 .metrics();
2003 let new_accumulator = Arc::new(StateAccumulator::new(
2004 self.state.get_accumulator_store().clone(),
2005 accumulator_metrics,
2006 ));
2007 let weak_accumulator = Arc::downgrade(&new_accumulator);
2008 *accumulator_guard = Some(new_accumulator);
2009
2010 consensus_store_pruner.prune(next_epoch).await;
2011
2012 if self.state.is_committee_validator(&new_epoch_store) {
2013 Some(
2015 Self::start_epoch_specific_validator_components(
2016 &self.config,
2017 self.state.clone(),
2018 consensus_adapter,
2019 self.checkpoint_store.clone(),
2020 new_epoch_store.clone(),
2021 self.state_sync_handle.clone(),
2022 self.randomness_handle.clone(),
2023 consensus_manager,
2024 consensus_store_pruner,
2025 weak_accumulator,
2026 self.backpressure_manager.clone(),
2027 validator_server_handle,
2028 validator_overload_monitor_handle,
2029 checkpoint_metrics,
2030 self.metrics.clone(),
2031 iota_tx_validator_metrics,
2032 validator_registry_id,
2033 )
2034 .await?,
2035 )
2036 } else {
2037 info!("This node is no longer a validator after reconfiguration");
2038 if self.registry_service.remove(validator_registry_id) {
2039 debug!("Removed validator metrics registry");
2040 } else {
2041 warn!("Failed to remove validator metrics registry");
2042 }
2043 validator_server_handle.shutdown();
2044 debug!("Validator grpc server shutdown triggered");
2045
2046 None
2047 }
2048 } else {
2049 let accumulator_metrics = Arc::into_inner(accumulator)
2052 .expect("Accumulator should have no other references at this point")
2053 .metrics();
2054 let new_accumulator = Arc::new(StateAccumulator::new(
2055 self.state.get_accumulator_store().clone(),
2056 accumulator_metrics,
2057 ));
2058 let weak_accumulator = Arc::downgrade(&new_accumulator);
2059 *accumulator_guard = Some(new_accumulator);
2060
2061 if self.state.is_committee_validator(&new_epoch_store) {
2062 info!("Promoting the node from fullnode to validator, starting grpc server");
2063
2064 let mut components = Self::construct_validator_components(
2065 self.config.clone(),
2066 self.state.clone(),
2067 Arc::new(next_epoch_committee.clone()),
2068 new_epoch_store.clone(),
2069 self.checkpoint_store.clone(),
2070 self.state_sync_handle.clone(),
2071 self.randomness_handle.clone(),
2072 weak_accumulator,
2073 self.backpressure_manager.clone(),
2074 self.connection_monitor_status.clone(),
2075 &self.registry_service,
2076 self.metrics.clone(),
2077 )
2078 .await?;
2079
2080 components.validator_server_handle =
2081 components.validator_server_handle.start().await;
2082
2083 Some(components)
2084 } else {
2085 None
2086 }
2087 };
2088 *validator_components_lock_guard = new_validator_components;
2089
2090 cur_epoch_store.release_db_handles();
2093
2094 if cfg!(msim)
2095 && !matches!(
2096 self.config
2097 .authority_store_pruning_config
2098 .num_epochs_to_retain_for_checkpoints(),
2099 None | Some(u64::MAX) | Some(0)
2100 )
2101 {
2102 self.state
2103 .prune_checkpoints_for_eligible_epochs_for_testing(
2104 self.config.clone(),
2105 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2106 )
2107 .await?;
2108 }
2109
2110 epoch_store = new_epoch_store;
2111 info!("Reconfiguration finished");
2112 }
2113 }
2114
2115 async fn shutdown(&self) {
2116 if let Some(validator_components) = &*self.validator_components.lock().await {
2117 validator_components.consensus_manager.shutdown().await;
2118 }
2119
2120 if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2122 info!("Shutting down gRPC server");
2123 if let Err(e) = grpc_handle.shutdown().await {
2124 warn!("Failed to gracefully shutdown gRPC server: {e}");
2125 }
2126 }
2127 }
2128
2129 async fn reconfigure_state(
2132 &self,
2133 state: &Arc<AuthorityState>,
2134 cur_epoch_store: &AuthorityPerEpochStore,
2135 next_epoch_committee: Committee,
2136 next_epoch_start_system_state: EpochStartSystemState,
2137 accumulator: Arc<StateAccumulator>,
2138 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2139 let next_epoch = next_epoch_committee.epoch();
2140
2141 let last_checkpoint = self
2142 .checkpoint_store
2143 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2144 .expect("Error loading last checkpoint for current epoch")
2145 .expect("Could not load last checkpoint for current epoch");
2146 let epoch_supply_change = last_checkpoint
2147 .end_of_epoch_data
2148 .as_ref()
2149 .ok_or_else(|| {
2150 IotaError::from("last checkpoint in epoch should contain end of epoch data")
2151 })?
2152 .epoch_supply_change;
2153
2154 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2155
2156 assert_eq!(
2157 Some(last_checkpoint_seq),
2158 self.checkpoint_store
2159 .get_highest_executed_checkpoint_seq_number()
2160 .expect("Error loading highest executed checkpoint sequence number")
2161 );
2162
2163 let epoch_start_configuration = EpochStartConfiguration::new(
2164 next_epoch_start_system_state,
2165 *last_checkpoint.digest(),
2166 state.get_object_store().as_ref(),
2167 EpochFlag::default_flags_for_new_epoch(&state.config),
2168 )
2169 .expect("EpochStartConfiguration construction cannot fail");
2170
2171 let new_epoch_store = self
2172 .state
2173 .reconfigure(
2174 cur_epoch_store,
2175 self.config.supported_protocol_versions.unwrap(),
2176 next_epoch_committee,
2177 epoch_start_configuration,
2178 accumulator,
2179 &self.config.expensive_safety_check_config,
2180 epoch_supply_change,
2181 last_checkpoint_seq,
2182 )
2183 .await
2184 .expect("Reconfigure authority state cannot fail");
2185 info!(next_epoch, "Node State has been reconfigured");
2186 assert_eq!(next_epoch, new_epoch_store.epoch());
2187 self.state.get_reconfig_api().update_epoch_flags_metrics(
2188 cur_epoch_store.epoch_start_config().flags(),
2189 new_epoch_store.epoch_start_config().flags(),
2190 );
2191
2192 Ok(new_epoch_store)
2193 }
2194
2195 pub fn get_config(&self) -> &NodeConfig {
2196 &self.config
2197 }
2198
2199 async fn execute_transaction_immediately_at_zero_epoch(
2200 state: &Arc<AuthorityState>,
2201 epoch_store: &Arc<AuthorityPerEpochStore>,
2202 tx: &Transaction,
2203 span: tracing::Span,
2204 ) {
2205 let _guard = span.enter();
2206 let transaction =
2207 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2208 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2209 tx.data().clone(),
2210 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2211 ),
2212 );
2213 state
2214 .try_execute_immediately(&transaction, None, epoch_store)
2215 .unwrap();
2216 }
2217
2218 pub fn randomness_handle(&self) -> randomness::Handle {
2219 self.randomness_handle.clone()
2220 }
2221
2222 async fn send_signed_capability_notification_to_committee_with_retry(
2228 &self,
2229 epoch_store: &Arc<AuthorityPerEpochStore>,
2230 ) {
2231 const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2232 const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2233 const MAX_RETRY_INTERVAL_SECS: u64 = 300; let config = epoch_store.protocol_config();
2237 let binary_config = to_binary_config(config);
2238
2239 let capabilities = AuthorityCapabilitiesV1::new(
2241 self.state.name,
2242 epoch_store.get_chain_identifier().chain(),
2243 self.config
2244 .supported_protocol_versions
2245 .expect("Supported versions should be populated")
2246 .truncate_below(config.version),
2247 self.state
2248 .get_available_system_packages(&binary_config)
2249 .await,
2250 );
2251
2252 let signature = AuthoritySignature::new_secure(
2254 &IntentMessage::new(
2255 Intent::iota_app(IntentScope::AuthorityCapabilities),
2256 &capabilities,
2257 ),
2258 &epoch_store.epoch(),
2259 self.config.authority_key_pair(),
2260 );
2261
2262 let request = HandleCapabilityNotificationRequestV1 {
2263 message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2264 };
2265
2266 let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2267
2268 loop {
2269 let auth_agg = self.auth_agg.load();
2270 match auth_agg
2271 .send_capability_notification_to_quorum(request.clone())
2272 .await
2273 {
2274 Ok(_) => {
2275 info!("Successfully sent capability notification to committee");
2276 break;
2277 }
2278 Err(err) => {
2279 match &err {
2280 AggregatorSendCapabilityNotificationError::RetryableNotification {
2281 errors,
2282 } => {
2283 warn!(
2284 "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2285 retry_interval, errors
2286 );
2287 }
2288 AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2289 errors,
2290 } => {
2291 error!(
2292 "Failed to send capability notification to committee (non-retryable error): {:?}",
2293 errors
2294 );
2295 break;
2296 }
2297 };
2298
2299 tokio::time::sleep(retry_interval).await;
2301
2302 retry_interval = std::cmp::min(
2304 retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2305 Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2306 );
2307 }
2308 }
2309 }
2310 }
2311}
2312
2313#[cfg(not(msim))]
2314impl IotaNode {
2315 async fn fetch_jwks(
2316 _authority: AuthorityName,
2317 provider: &OIDCProvider,
2318 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2319 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2320 let client = reqwest::Client::new();
2321 fetch_jwks(provider, &client)
2322 .await
2323 .map_err(|_| IotaError::JWKRetrieval)
2324 }
2325}
2326
2327#[cfg(msim)]
2328impl IotaNode {
2329 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2330 self.sim_state.sim_node.id()
2331 }
2332
2333 pub fn set_safe_mode_expected(&self, new_value: bool) {
2334 info!("Setting safe mode expected to {}", new_value);
2335 self.sim_state
2336 .sim_safe_mode_expected
2337 .store(new_value, Ordering::Relaxed);
2338 }
2339
2340 async fn fetch_jwks(
2341 authority: AuthorityName,
2342 provider: &OIDCProvider,
2343 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2344 get_jwk_injector()(authority, provider)
2345 }
2346}
2347
2348enum SpawnOnce {
2349 Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2351 #[allow(unused)]
2352 Started(iota_http::ServerHandle),
2353}
2354
2355impl SpawnOnce {
2356 pub fn new(
2357 future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2358 ) -> Self {
2359 Self::Unstarted(Mutex::new(Box::pin(future)))
2360 }
2361
2362 pub async fn start(self) -> Self {
2363 match self {
2364 Self::Unstarted(future) => {
2365 let server = future
2366 .into_inner()
2367 .await
2368 .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2369 let handle = server.handle().clone();
2370 tokio::spawn(async move {
2371 if let Err(err) = server.serve().await {
2372 info!("Server stopped: {err}");
2373 }
2374 info!("Server stopped");
2375 });
2376 Self::Started(handle)
2377 }
2378 Self::Started(_) => self,
2379 }
2380 }
2381
2382 pub fn shutdown(self) {
2383 if let SpawnOnce::Started(handle) = self {
2384 handle.trigger_shutdown();
2385 }
2386 }
2387}
2388
2389fn send_trusted_peer_change(
2392 config: &NodeConfig,
2393 sender: &watch::Sender<TrustedPeerChangeEvent>,
2394 new_epoch_start_state: &EpochStartSystemState,
2395) {
2396 let new_committee =
2397 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2398
2399 sender.send_modify(|event| {
2400 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2401 event.new_committee = new_committee;
2402 })
2403}
2404
2405fn build_kv_store(
2406 state: &Arc<AuthorityState>,
2407 config: &NodeConfig,
2408 registry: &Registry,
2409) -> Result<Arc<TransactionKeyValueStore>> {
2410 let metrics = KeyValueStoreMetrics::new(registry);
2411 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2412
2413 let base_url = &config.transaction_kv_store_read_config.base_url;
2414
2415 if base_url.is_empty() {
2416 info!("no http kv store url provided, using local db only");
2417 return Ok(Arc::new(db_store));
2418 }
2419
2420 base_url.parse::<url::Url>().tap_err(|e| {
2421 error!(
2422 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2423 base_url, e
2424 )
2425 })?;
2426
2427 let http_store = HttpKVStore::new_kv(
2428 base_url,
2429 config.transaction_kv_store_read_config.cache_size,
2430 metrics.clone(),
2431 )?;
2432 info!("using local key-value store with fallback to http key-value store");
2433 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2434 db_store,
2435 http_store,
2436 metrics,
2437 "json_rpc_fallback",
2438 )))
2439}
2440
2441async fn build_grpc_server(
2456 config: &NodeConfig,
2457 state: Arc<AuthorityState>,
2458 state_sync_store: RocksDbStore,
2459 executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2460 prometheus_registry: &Registry,
2461 server_version: ServerVersion,
2462) -> Result<Option<GrpcServerHandle>> {
2463 if config.consensus_config().is_some() || !config.enable_grpc_api {
2465 return Ok(None);
2466 }
2467
2468 let Some(grpc_config) = &config.grpc_api_config else {
2469 return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2470 };
2471
2472 let chain_id = state.get_chain_identifier();
2474
2475 let grpc_read_store = Arc::new(GrpcReadStore::new(state.clone(), state_sync_store));
2476
2477 let shutdown_token = CancellationToken::new();
2479
2480 let grpc_reader = Arc::new(GrpcReader::new(
2482 grpc_read_store,
2483 Some(server_version.to_string()),
2484 ));
2485
2486 let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2488
2489 let handle = start_grpc_server(
2490 grpc_reader,
2491 executor,
2492 grpc_config.clone(),
2493 shutdown_token,
2494 chain_id,
2495 Some(grpc_server_metrics),
2496 )
2497 .await?;
2498
2499 Ok(Some(handle))
2500}
2501
2502pub async fn build_http_server(
2517 state: Arc<AuthorityState>,
2518 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2519 config: &NodeConfig,
2520 prometheus_registry: &Registry,
2521) -> Result<Option<iota_http::ServerHandle>> {
2522 if config.consensus_config().is_some() {
2524 return Ok(None);
2525 }
2526
2527 let mut router = axum::Router::new();
2528
2529 let json_rpc_router = {
2530 let mut server = JsonRpcServerBuilder::new(
2531 env!("CARGO_PKG_VERSION"),
2532 prometheus_registry,
2533 config.policy_config.clone(),
2534 config.firewall_config.clone(),
2535 );
2536
2537 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2538
2539 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2540 server.register_module(ReadApi::new(
2541 state.clone(),
2542 kv_store.clone(),
2543 metrics.clone(),
2544 ))?;
2545 server.register_module(CoinReadApi::new(
2546 state.clone(),
2547 kv_store.clone(),
2548 metrics.clone(),
2549 )?)?;
2550
2551 if config.run_with_range.is_none() {
2554 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2555 }
2556 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2557
2558 if let Some(transaction_orchestrator) = transaction_orchestrator {
2559 server.register_module(TransactionExecutionApi::new(
2560 state.clone(),
2561 transaction_orchestrator.clone(),
2562 metrics.clone(),
2563 ))?;
2564 }
2565
2566 let iota_names_config = config
2567 .iota_names_config
2568 .clone()
2569 .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2570
2571 server.register_module(IndexerApi::new(
2572 state.clone(),
2573 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2574 kv_store,
2575 metrics,
2576 iota_names_config,
2577 config.indexer_max_subscriptions,
2578 ))?;
2579 server.register_module(MoveUtils::new(state.clone()))?;
2580
2581 let server_type = config.jsonrpc_server_type();
2582
2583 server.to_router(server_type).await?
2584 };
2585
2586 router = router.merge(json_rpc_router);
2587
2588 router = router
2589 .route("/health", axum::routing::get(health_check_handler))
2590 .route_layer(axum::Extension(state));
2591
2592 let layers = ServiceBuilder::new()
2593 .map_request(|mut request: axum::http::Request<_>| {
2594 if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2595 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2596 request.extensions_mut().insert(axum_connect_info);
2597 }
2598 request
2599 })
2600 .layer(axum::middleware::from_fn(server_timing_middleware));
2601
2602 router = router.layer(layers);
2603
2604 let handle = iota_http::Builder::new()
2605 .serve(&config.json_rpc_address, router)
2606 .map_err(|e| anyhow::anyhow!("{e}"))?;
2607 info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2608
2609 Ok(Some(handle))
2610}
2611
2612#[derive(Debug, serde::Serialize, serde::Deserialize)]
2613pub struct Threshold {
2614 pub threshold_seconds: Option<u32>,
2615}
2616
2617async fn health_check_handler(
2618 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2619 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2620) -> impl axum::response::IntoResponse {
2621 if let Some(threshold_seconds) = threshold_seconds {
2622 let summary = match state
2624 .get_checkpoint_store()
2625 .get_highest_executed_checkpoint()
2626 {
2627 Ok(Some(summary)) => summary,
2628 Ok(None) => {
2629 warn!("Highest executed checkpoint not found");
2630 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2631 }
2632 Err(err) => {
2633 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2634 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2635 }
2636 };
2637
2638 let latest_chain_time = summary.timestamp();
2640 let threshold =
2641 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2642
2643 if latest_chain_time < threshold {
2645 warn!(
2646 ?latest_chain_time,
2647 ?threshold,
2648 "failing health check due to checkpoint lag"
2649 );
2650 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2651 }
2652 }
2653 (axum::http::StatusCode::OK, "up")
2655}
2656
2657#[cfg(not(test))]
2658fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2659 protocol_config.max_transactions_per_checkpoint() as usize
2660}
2661
2662#[cfg(test)]
2663fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2664 2
2665}