1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::{BTreeSet, HashMap, HashSet},
9 fmt,
10 future::Future,
11 path::PathBuf,
12 str::FromStr,
13 sync::{Arc, Weak},
14 time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19 callback::CallbackLayer,
20 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::future::BoxFuture;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30 ConsensusConfig, NodeConfig,
31 node::{DBCheckpointConfig, RunWithRange},
32 node_config_metrics::NodeConfigMetrics,
33 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36 authority::{
37 AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38 authority_per_epoch_store::AuthorityPerEpochStore,
39 authority_store_pruner::ObjectsCompactionFilter,
40 authority_store_tables::{
41 AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42 },
43 backpressure::BackpressureManager,
44 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45 },
46 authority_aggregator::{
47 AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
48 },
49 authority_client::NetworkAuthorityClient,
50 authority_server::{ValidatorService, ValidatorServiceMetrics},
51 checkpoints::{
52 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
53 SubmitCheckpointToConsensus,
54 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
55 },
56 connection_monitor::ConnectionMonitor,
57 consensus_adapter::{
58 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
59 ConsensusClient,
60 },
61 consensus_handler::ConsensusHandlerInitializer,
62 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
63 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
64 db_checkpoint_handler::DBCheckpointHandler,
65 epoch::{
66 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
67 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
68 reconfiguration::ReconfigurationInitiator,
69 },
70 execution_cache::build_execution_cache,
71 grpc_indexes::{GRPC_INDEXES_DIR, GrpcIndexesStore},
72 jsonrpc_index::IndexStore,
73 module_cache_metrics::ResolverMetrics,
74 overload_monitor::overload_monitor,
75 safe_client::SafeClientMetricsBase,
76 signature_verifier::SignatureVerifierMetrics,
77 state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
78 storage::{GrpcReadStore, RocksDbStore},
79 traffic_controller::metrics::TrafficControllerMetrics,
80 transaction_orchestrator::TransactionOrchestrator,
81 validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87 transaction_builder_api::TransactionBuilderApi,
88 transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93 RegistryID, RegistryService,
94 hardware_metrics::register_hardware_metrics,
95 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96 server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
101};
102use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
103use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
104use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
105use iota_snapshot::uploader::StateSnapshotUploader;
106use iota_storage::{
107 FileCompression, StorageFormat,
108 http_key_value_store::HttpKVStore,
109 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
110 key_value_store_metrics::KeyValueStoreMetrics,
111};
112use iota_types::{
113 base_types::{AuthorityName, ConciseableName, EpochId},
114 committee::Committee,
115 crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
116 digests::ChainIdentifier,
117 error::{IotaError, IotaResult},
118 executable_transaction::VerifiedExecutableTransaction,
119 execution_config_utils::to_binary_config,
120 full_checkpoint_content::CheckpointData,
121 iota_system_state::{
122 IotaSystemState, IotaSystemStateTrait,
123 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
124 },
125 messages_consensus::{
126 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
127 SignedAuthorityCapabilitiesV1, check_total_jwk_size,
128 },
129 messages_grpc::HandleCapabilityNotificationRequestV1,
130 quorum_driver_types::QuorumDriverEffectsQueueResult,
131 supported_protocol_versions::SupportedProtocolVersions,
132 transaction::{Transaction, VerifiedCertificate},
133};
134use prometheus::Registry;
135#[cfg(msim)]
136pub use simulator::set_jwk_injector;
137#[cfg(msim)]
138use simulator::*;
139use tap::tap::TapFallible;
140use tokio::{
141 sync::{Mutex, broadcast, mpsc, watch},
142 task::{JoinHandle, JoinSet},
143};
144use tokio_util::sync::CancellationToken;
145use tower::ServiceBuilder;
146use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
147use typed_store::{
148 DBMetrics,
149 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
150};
151
152use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
153
154pub mod admin;
155mod handle;
156pub mod metrics;
157
158pub struct ValidatorComponents {
159 validator_server_handle: SpawnOnce,
160 validator_overload_monitor_handle: Option<JoinHandle<()>>,
161 consensus_manager: ConsensusManager,
162 consensus_store_pruner: ConsensusStorePruner,
163 consensus_adapter: Arc<ConsensusAdapter>,
164 checkpoint_service_tasks: JoinSet<()>,
166 checkpoint_metrics: Arc<CheckpointMetrics>,
167 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
168 validator_registry_id: RegistryID,
169}
170
171#[cfg(msim)]
172mod simulator {
173 use std::sync::atomic::AtomicBool;
174
175 use super::*;
176
177 pub(super) struct SimState {
178 pub sim_node: iota_simulator::runtime::NodeHandle,
179 pub sim_safe_mode_expected: AtomicBool,
180 _leak_detector: iota_simulator::NodeLeakDetector,
181 }
182
183 impl Default for SimState {
184 fn default() -> Self {
185 Self {
186 sim_node: iota_simulator::runtime::NodeHandle::current(),
187 sim_safe_mode_expected: AtomicBool::new(false),
188 _leak_detector: iota_simulator::NodeLeakDetector::new(),
189 }
190 }
191 }
192
193 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
194 + Send
195 + Sync
196 + 'static;
197
198 fn default_fetch_jwks(
199 _authority: AuthorityName,
200 _provider: &OIDCProvider,
201 ) -> IotaResult<Vec<(JwkId, JWK)>> {
202 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
203 parse_jwks(
205 iota_types::zk_login_util::DEFAULT_JWK_BYTES,
206 &OIDCProvider::Twitch,
207 )
208 .map_err(|_| IotaError::JWKRetrieval)
209 }
210
211 thread_local! {
212 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
213 }
214
215 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
216 JWK_INJECTOR.with(|injector| injector.borrow().clone())
217 }
218
219 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
220 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
221 }
222}
223
224#[derive(Clone)]
225pub struct ServerVersion {
226 pub bin: &'static str,
227 pub version: &'static str,
228}
229
230impl ServerVersion {
231 pub fn new(bin: &'static str, version: &'static str) -> Self {
232 Self { bin, version }
233 }
234}
235
236impl std::fmt::Display for ServerVersion {
237 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238 f.write_str(self.bin)?;
239 f.write_str("/")?;
240 f.write_str(self.version)
241 }
242}
243
244pub struct IotaNode {
245 config: NodeConfig,
246 validator_components: Mutex<Option<ValidatorComponents>>,
247 _http_server: Option<iota_http::ServerHandle>,
249 state: Arc<AuthorityState>,
250 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
251 registry_service: RegistryService,
252 metrics: Arc<IotaNodeMetrics>,
253
254 _discovery: discovery::Handle,
255 state_sync_handle: state_sync::Handle,
256 randomness_handle: randomness::Handle,
257 checkpoint_store: Arc<CheckpointStore>,
258 accumulator: Mutex<Option<Arc<StateAccumulator>>>,
259 connection_monitor_status: Arc<ConnectionMonitorStatus>,
260
261 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
263
264 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
267
268 backpressure_manager: Arc<BackpressureManager>,
269
270 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
271
272 #[cfg(msim)]
273 sim_state: SimState,
274
275 _state_archive_handle: Option<broadcast::Sender<()>>,
276
277 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
278 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
280
281 grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
283
284 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
290}
291
292impl fmt::Debug for IotaNode {
293 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
294 f.debug_struct("IotaNode")
295 .field("name", &self.state.name.concise())
296 .finish()
297 }
298}
299
300static MAX_JWK_KEYS_PER_FETCH: usize = 100;
301
302impl IotaNode {
303 pub async fn start(
304 config: NodeConfig,
305 registry_service: RegistryService,
306 ) -> Result<Arc<IotaNode>> {
307 Self::start_async(
308 config,
309 registry_service,
310 ServerVersion::new("iota-node", "unknown"),
311 )
312 .await
313 }
314
315 fn start_jwk_updater(
320 config: &NodeConfig,
321 metrics: Arc<IotaNodeMetrics>,
322 authority: AuthorityName,
323 epoch_store: Arc<AuthorityPerEpochStore>,
324 consensus_adapter: Arc<ConsensusAdapter>,
325 ) {
326 let epoch = epoch_store.epoch();
327
328 let supported_providers = config
329 .zklogin_oauth_providers
330 .get(&epoch_store.get_chain_identifier().chain())
331 .unwrap_or(&BTreeSet::new())
332 .iter()
333 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
334 .collect::<Vec<_>>();
335
336 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
337
338 info!(
339 ?fetch_interval,
340 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
341 );
342
343 fn validate_jwk(
344 metrics: &Arc<IotaNodeMetrics>,
345 provider: &OIDCProvider,
346 id: &JwkId,
347 jwk: &JWK,
348 ) -> bool {
349 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
350 warn!(
351 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
352 id.iss, provider
353 );
354 metrics
355 .invalid_jwks
356 .with_label_values(&[&provider.to_string()])
357 .inc();
358 return false;
359 };
360
361 if iss_provider != *provider {
362 warn!(
363 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
364 id.iss, provider, iss_provider
365 );
366 metrics
367 .invalid_jwks
368 .with_label_values(&[&provider.to_string()])
369 .inc();
370 return false;
371 }
372
373 if !check_total_jwk_size(id, jwk) {
374 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
375 metrics
376 .invalid_jwks
377 .with_label_values(&[&provider.to_string()])
378 .inc();
379 return false;
380 }
381
382 true
383 }
384
385 for p in supported_providers.into_iter() {
394 let provider_str = p.to_string();
395 let epoch_store = epoch_store.clone();
396 let consensus_adapter = consensus_adapter.clone();
397 let metrics = metrics.clone();
398 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
399 async move {
400 let mut seen = HashSet::new();
403 loop {
404 info!("fetching JWK for provider {:?}", p);
405 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
406 match Self::fetch_jwks(authority, &p).await {
407 Err(e) => {
408 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
409 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
410 tokio::time::sleep(Duration::from_secs(30)).await;
412 continue;
413 }
414 Ok(mut keys) => {
415 metrics.total_jwks
416 .with_label_values(&[&provider_str])
417 .inc_by(keys.len() as u64);
418
419 keys.retain(|(id, jwk)| {
420 validate_jwk(&metrics, &p, id, jwk) &&
421 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
422 seen.insert((id.clone(), jwk.clone()))
423 });
424
425 metrics.unique_jwks
426 .with_label_values(&[&provider_str])
427 .inc_by(keys.len() as u64);
428
429 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
432 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
433 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
434 }
435
436 for (id, jwk) in keys.into_iter() {
437 info!("Submitting JWK to consensus: {:?}", id);
438
439 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
440 consensus_adapter.submit(txn, None, &epoch_store)
441 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
442 .ok();
443 }
444 }
445 }
446 tokio::time::sleep(fetch_interval).await;
447 }
448 }
449 .instrument(error_span!("jwk_updater_task", epoch)),
450 ));
451 }
452 }
453
454 pub async fn start_async(
455 config: NodeConfig,
456 registry_service: RegistryService,
457 server_version: ServerVersion,
458 ) -> Result<Arc<IotaNode>> {
459 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
460 let mut config = config.clone();
461 if config.supported_protocol_versions.is_none() {
462 info!(
463 "populating config.supported_protocol_versions with default {:?}",
464 SupportedProtocolVersions::SYSTEM_DEFAULT
465 );
466 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
467 }
468
469 let run_with_range = config.run_with_range;
470 let is_validator = config.consensus_config().is_some();
471 let is_full_node = !is_validator;
472 let prometheus_registry = registry_service.default_registry();
473
474 info!(node =? config.authority_public_key(),
475 "Initializing iota-node listening on {}", config.network_address
476 );
477
478 let genesis = config.genesis()?.clone();
479
480 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
481 info!("IOTA chain identifier: {chain_identifier}");
482
483 let db_corrupted_path = &config.db_path().join("status");
485 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
486 panic!("Failed to check database corruption: {err}");
487 }
488
489 DBMetrics::init(&prometheus_registry);
491
492 iota_metrics::init_metrics(&prometheus_registry);
494 #[cfg(not(msim))]
497 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
498
499 register_hardware_metrics(®istry_service, &config.db_path)
501 .expect("Failed registering hardware metrics");
502 prometheus_registry
504 .register(iota_metrics::uptime_metric(
505 if is_validator {
506 "validator"
507 } else {
508 "fullnode"
509 },
510 server_version.version,
511 &chain_identifier.to_string(),
512 ))
513 .expect("Failed registering uptime metric");
514
515 let migration_tx_data = if genesis.contains_migrations() {
518 Some(config.load_migration_tx_data()?)
521 } else {
522 None
523 };
524
525 let secret = Arc::pin(config.authority_key_pair().copy());
526 let genesis_committee = genesis.committee()?;
527 let committee_store = Arc::new(CommitteeStore::new(
528 config.db_path().join("epochs"),
529 &genesis_committee,
530 None,
531 ));
532
533 let mut pruner_db = None;
534 if config
535 .authority_store_pruning_config
536 .enable_compaction_filter
537 {
538 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
539 &config.db_path().join("store"),
540 )));
541 }
542 let compaction_filter = pruner_db
543 .clone()
544 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
545
546 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
548 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
549 enable_write_stall,
550 compaction_filter,
551 };
552 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
553 &config.db_path().join("store"),
554 Some(perpetual_tables_options),
555 ));
556 let is_genesis = perpetual_tables
557 .database_is_empty()
558 .expect("Database read should not fail at init.");
559 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
560 let backpressure_manager =
561 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
562
563 let store = AuthorityStore::open(
564 perpetual_tables,
565 &genesis,
566 &config,
567 &prometheus_registry,
568 migration_tx_data.as_ref(),
569 )
570 .await?;
571
572 let cur_epoch = store.get_recovery_epoch_at_restart()?;
573 let committee = committee_store
574 .get_committee(&cur_epoch)?
575 .expect("Committee of the current epoch must exist");
576 let epoch_start_configuration = store
577 .get_epoch_start_configuration()?
578 .expect("EpochStartConfiguration of the current epoch must exist");
579 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
580 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
581
582 let cache_traits = build_execution_cache(
583 &config.execution_cache_config,
584 &epoch_start_configuration,
585 &prometheus_registry,
586 &store,
587 backpressure_manager.clone(),
588 );
589
590 let auth_agg = {
591 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
592 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
593 Arc::new(ArcSwap::new(Arc::new(
594 AuthorityAggregator::new_from_epoch_start_state(
595 epoch_start_configuration.epoch_start_state(),
596 &committee_store,
597 safe_client_metrics_base,
598 auth_agg_metrics,
599 ),
600 )))
601 };
602
603 let chain = match config.chain_override_for_testing {
604 Some(chain) => chain,
605 None => chain_identifier.chain(),
606 };
607
608 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
609 let epoch_store = AuthorityPerEpochStore::new(
610 config.authority_public_key(),
611 committee.clone(),
612 &config.db_path().join("store"),
613 Some(epoch_options.options),
614 EpochMetrics::new(®istry_service.default_registry()),
615 epoch_start_configuration,
616 cache_traits.backing_package_store.clone(),
617 cache_traits.object_store.clone(),
618 cache_metrics,
619 signature_verifier_metrics,
620 &config.expensive_safety_check_config,
621 (chain_identifier, chain),
622 checkpoint_store
623 .get_highest_executed_checkpoint_seq_number()
624 .expect("checkpoint store read cannot fail")
625 .unwrap_or(0),
626 )?;
627
628 info!("created epoch store");
629
630 replay_log!(
631 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
632 epoch_store.epoch(),
633 epoch_store.protocol_config()
634 );
635
636 if is_genesis {
638 info!("checking IOTA conservation at genesis");
639 cache_traits
644 .reconfig_api
645 .try_expensive_check_iota_conservation(&epoch_store, None)
646 .expect("IOTA conservation check cannot fail at genesis");
647 }
648
649 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
650 let default_buffer_stake = epoch_store
651 .protocol_config()
652 .buffer_stake_for_protocol_upgrade_bps();
653 if effective_buffer_stake != default_buffer_stake {
654 warn!(
655 ?effective_buffer_stake,
656 ?default_buffer_stake,
657 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
658 );
659 }
660
661 checkpoint_store.insert_genesis_checkpoint(
662 genesis.checkpoint(),
663 genesis.checkpoint_contents().clone(),
664 &epoch_store,
665 );
666
667 unmark_db_corruption(db_corrupted_path)?;
669
670 info!("creating state sync store");
671 let state_sync_store = RocksDbStore::new(
672 cache_traits.clone(),
673 committee_store.clone(),
674 checkpoint_store.clone(),
675 );
676
677 let index_store = if is_full_node && config.enable_index_processing {
678 info!("creating index store");
679 Some(Arc::new(IndexStore::new(
680 config.db_path().join("indexes"),
681 &prometheus_registry,
682 epoch_store
683 .protocol_config()
684 .max_move_identifier_len_as_option(),
685 )))
686 } else {
687 None
688 };
689
690 let grpc_indexes_store = if is_full_node && config.enable_grpc_api {
691 GrpcIndexesStore::migrate_legacy_dirs(&config.db_path());
693 Some(Arc::new(
694 GrpcIndexesStore::new(
695 config.db_path().join(GRPC_INDEXES_DIR),
696 Arc::clone(&store),
697 &checkpoint_store,
698 )
699 .await,
700 ))
701 } else {
702 None
703 };
704
705 info!("creating archive reader");
706 let archive_readers =
711 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
712 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
713 let (randomness_tx, randomness_rx) = mpsc::channel(
714 config
715 .p2p_config
716 .randomness
717 .clone()
718 .unwrap_or_default()
719 .mailbox_capacity(),
720 );
721 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
722 Self::create_p2p_network(
723 &config,
724 state_sync_store.clone(),
725 chain_identifier,
726 trusted_peer_change_rx,
727 archive_readers.clone(),
728 randomness_tx,
729 &prometheus_registry,
730 )?;
731
732 send_trusted_peer_change(
735 &config,
736 &trusted_peer_change_tx,
737 epoch_store.epoch_start_state(),
738 );
739
740 info!("start state archival");
741 let state_archive_handle =
743 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
744 .await?;
745
746 info!("start snapshot upload");
747 let state_snapshot_handle =
749 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
750
751 info!("start db checkpoint");
753 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
754 &config,
755 &prometheus_registry,
756 state_snapshot_handle.is_some(),
757 )?;
758
759 let mut genesis_objects = genesis.objects().to_vec();
760 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
761 genesis_objects.extend(migration_tx_data.get_objects());
762 }
763
764 let authority_name = config.authority_public_key();
765 let validator_tx_finalizer =
766 config
767 .enable_validator_tx_finalizer
768 .then_some(Arc::new(ValidatorTxFinalizer::new(
769 auth_agg.clone(),
770 authority_name,
771 &prometheus_registry,
772 )));
773
774 info!("create authority state");
775 let state = AuthorityState::new(
776 authority_name,
777 secret,
778 config.supported_protocol_versions.unwrap(),
779 store.clone(),
780 cache_traits.clone(),
781 epoch_store.clone(),
782 committee_store.clone(),
783 index_store.clone(),
784 grpc_indexes_store,
785 checkpoint_store.clone(),
786 &prometheus_registry,
787 &genesis_objects,
788 &db_checkpoint_config,
789 config.clone(),
790 archive_readers,
791 validator_tx_finalizer,
792 chain_identifier,
793 pruner_db,
794 )
795 .await;
796
797 if epoch_store.epoch() == 0 {
799 let genesis_tx = &genesis.transaction();
800 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
801 Self::execute_transaction_immediately_at_zero_epoch(
803 &state,
804 &epoch_store,
805 genesis_tx,
806 span,
807 )
808 .await;
809
810 if let Some(migration_tx_data) = migration_tx_data {
812 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
813 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
814 Self::execute_transaction_immediately_at_zero_epoch(
815 &state,
816 &epoch_store,
817 tx,
818 span,
819 )
820 .await;
821 }
822 }
823 }
824
825 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
828
829 if config
830 .expensive_safety_check_config
831 .enable_secondary_index_checks()
832 {
833 if let Some(indexes) = state.indexes.clone() {
834 iota_core::verify_indexes::verify_indexes(
835 state.get_accumulator_store().as_ref(),
836 indexes,
837 )
838 .expect("secondary indexes are inconsistent");
839 }
840 }
841
842 let (end_of_epoch_channel, end_of_epoch_receiver) =
843 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
844
845 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
846 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
847 auth_agg.load_full(),
848 state.clone(),
849 end_of_epoch_receiver,
850 &config.db_path(),
851 &prometheus_registry,
852 )))
853 } else {
854 None
855 };
856
857 let http_server = build_http_server(
858 state.clone(),
859 &transaction_orchestrator.clone(),
860 &config,
861 &prometheus_registry,
862 )
863 .await?;
864
865 let accumulator = Arc::new(StateAccumulator::new(
866 cache_traits.accumulator_store.clone(),
867 StateAccumulatorMetrics::new(&prometheus_registry),
868 ));
869
870 let authority_names_to_peer_ids = epoch_store
871 .epoch_start_state()
872 .get_authority_names_to_peer_ids();
873
874 let network_connection_metrics =
875 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
876
877 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
878
879 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
880 p2p_network.downgrade(),
881 network_connection_metrics,
882 HashMap::new(),
883 None,
884 );
885
886 let connection_monitor_status = ConnectionMonitorStatus {
887 connection_statuses,
888 authority_names_to_peer_ids,
889 };
890
891 let connection_monitor_status = Arc::new(connection_monitor_status);
892 let iota_node_metrics =
893 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
894
895 iota_node_metrics
896 .binary_max_protocol_version
897 .set(ProtocolVersion::MAX.as_u64() as i64);
898 iota_node_metrics
899 .configured_max_protocol_version
900 .set(config.supported_protocol_versions.unwrap().max.as_u64() as i64);
901
902 let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
906 transaction_orchestrator
907 .clone()
908 .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
909
910 let grpc_server_handle = build_grpc_server(
911 &config,
912 state.clone(),
913 state_sync_store.clone(),
914 executor,
915 &prometheus_registry,
916 server_version,
917 )
918 .await?;
919
920 let validator_components = if state.is_committee_validator(&epoch_store) {
921 let (components, _) = futures::join!(
922 Self::construct_validator_components(
923 config.clone(),
924 state.clone(),
925 committee,
926 epoch_store.clone(),
927 checkpoint_store.clone(),
928 state_sync_handle.clone(),
929 randomness_handle.clone(),
930 Arc::downgrade(&accumulator),
931 backpressure_manager.clone(),
932 connection_monitor_status.clone(),
933 ®istry_service,
934 iota_node_metrics.clone(),
935 ),
936 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
937 );
938 let mut components = components?;
939
940 components.consensus_adapter.submit_recovered(&epoch_store);
941
942 components.validator_server_handle = components.validator_server_handle.start().await;
944
945 Some(components)
946 } else {
947 None
948 };
949
950 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
952
953 let node = Self {
954 config,
955 validator_components: Mutex::new(validator_components),
956 _http_server: http_server,
957 state,
958 transaction_orchestrator,
959 registry_service,
960 metrics: iota_node_metrics,
961
962 _discovery: discovery_handle,
963 state_sync_handle,
964 randomness_handle,
965 checkpoint_store,
966 accumulator: Mutex::new(Some(accumulator)),
967 end_of_epoch_channel,
968 connection_monitor_status,
969 trusted_peer_change_tx,
970 backpressure_manager,
971
972 _db_checkpoint_handle: db_checkpoint_handle,
973
974 #[cfg(msim)]
975 sim_state: Default::default(),
976
977 _state_archive_handle: state_archive_handle,
978 _state_snapshot_uploader_handle: state_snapshot_handle,
979 shutdown_channel_tx: shutdown_channel,
980
981 grpc_server_handle: Mutex::new(grpc_server_handle),
982
983 auth_agg,
984 };
985
986 info!("IotaNode started!");
987 let node = Arc::new(node);
988 let node_copy = node.clone();
989 spawn_monitored_task!(async move {
990 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
991 if let Err(error) = result {
992 warn!("Reconfiguration finished with error {:?}", error);
993 }
994 });
995
996 Ok(node)
997 }
998
999 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
1000 self.end_of_epoch_channel.subscribe()
1001 }
1002
1003 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
1004 self.shutdown_channel_tx.subscribe()
1005 }
1006
1007 pub fn current_epoch_for_testing(&self) -> EpochId {
1008 self.state.current_epoch_for_testing()
1009 }
1010
1011 pub fn db_checkpoint_path(&self) -> PathBuf {
1012 self.config.db_checkpoint_path()
1013 }
1014
1015 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
1017 info!("close_epoch (current epoch = {})", epoch_store.epoch());
1018 self.validator_components
1019 .lock()
1020 .await
1021 .as_ref()
1022 .ok_or_else(|| IotaError::from("Node is not a validator"))?
1023 .consensus_adapter
1024 .close_epoch(epoch_store);
1025 Ok(())
1026 }
1027
1028 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
1029 self.state
1030 .clear_override_protocol_upgrade_buffer_stake(epoch)
1031 }
1032
1033 pub fn set_override_protocol_upgrade_buffer_stake(
1034 &self,
1035 epoch: EpochId,
1036 buffer_stake_bps: u64,
1037 ) -> IotaResult {
1038 self.state
1039 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1040 }
1041
1042 pub async fn close_epoch_for_testing(&self) -> IotaResult {
1045 let epoch_store = self.state.epoch_store_for_testing();
1046 self.close_epoch(&epoch_store).await
1047 }
1048
1049 async fn start_state_archival(
1050 config: &NodeConfig,
1051 prometheus_registry: &Registry,
1052 state_sync_store: RocksDbStore,
1053 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1054 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1055 let local_store_config = ObjectStoreConfig {
1056 object_store: Some(ObjectStoreType::File),
1057 directory: Some(config.archive_path()),
1058 ..Default::default()
1059 };
1060 let archive_writer = ArchiveWriter::new(
1061 local_store_config,
1062 remote_store_config.clone(),
1063 FileCompression::Zstd,
1064 StorageFormat::Blob,
1065 Duration::from_secs(600),
1066 256 * 1024 * 1024,
1067 prometheus_registry,
1068 )
1069 .await?;
1070 Ok(Some(archive_writer.start(state_sync_store).await?))
1071 } else {
1072 Ok(None)
1073 }
1074 }
1075
1076 fn start_state_snapshot(
1079 config: &NodeConfig,
1080 prometheus_registry: &Registry,
1081 checkpoint_store: Arc<CheckpointStore>,
1082 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1083 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1084 let snapshot_uploader = StateSnapshotUploader::new(
1085 &config.db_checkpoint_path(),
1086 &config.snapshot_path(),
1087 remote_store_config.clone(),
1088 60,
1089 prometheus_registry,
1090 checkpoint_store,
1091 )?;
1092 Ok(Some(snapshot_uploader.start()))
1093 } else {
1094 Ok(None)
1095 }
1096 }
1097
1098 fn start_db_checkpoint(
1099 config: &NodeConfig,
1100 prometheus_registry: &Registry,
1101 state_snapshot_enabled: bool,
1102 ) -> Result<(
1103 DBCheckpointConfig,
1104 Option<tokio::sync::broadcast::Sender<()>>,
1105 )> {
1106 let checkpoint_path = Some(
1107 config
1108 .db_checkpoint_config
1109 .checkpoint_path
1110 .clone()
1111 .unwrap_or_else(|| config.db_checkpoint_path()),
1112 );
1113 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1114 DBCheckpointConfig {
1115 checkpoint_path,
1116 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1117 true
1118 } else {
1119 config
1120 .db_checkpoint_config
1121 .perform_db_checkpoints_at_epoch_end
1122 },
1123 ..config.db_checkpoint_config.clone()
1124 }
1125 } else {
1126 config.db_checkpoint_config.clone()
1127 };
1128
1129 match (
1130 db_checkpoint_config.object_store_config.as_ref(),
1131 state_snapshot_enabled,
1132 ) {
1133 (None, false) => Ok((db_checkpoint_config, None)),
1138 (_, _) => {
1139 let handler = DBCheckpointHandler::new(
1140 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1141 db_checkpoint_config.object_store_config.as_ref(),
1142 60,
1143 db_checkpoint_config
1144 .prune_and_compact_before_upload
1145 .unwrap_or(true),
1146 config.authority_store_pruning_config.clone(),
1147 prometheus_registry,
1148 state_snapshot_enabled,
1149 )?;
1150 Ok((
1151 db_checkpoint_config,
1152 Some(DBCheckpointHandler::start(handler)),
1153 ))
1154 }
1155 }
1156 }
1157
1158 fn create_p2p_network(
1159 config: &NodeConfig,
1160 state_sync_store: RocksDbStore,
1161 chain_identifier: ChainIdentifier,
1162 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1163 archive_readers: ArchiveReaderBalancer,
1164 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1165 prometheus_registry: &Registry,
1166 ) -> Result<(
1167 Network,
1168 discovery::Handle,
1169 state_sync::Handle,
1170 randomness::Handle,
1171 )> {
1172 let (state_sync, state_sync_server) = state_sync::Builder::new()
1173 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1174 .store(state_sync_store)
1175 .archive_readers(archive_readers)
1176 .with_metrics(prometheus_registry)
1177 .build();
1178
1179 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1180 .config(config.p2p_config.clone())
1181 .build();
1182
1183 let (randomness, randomness_router) =
1184 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1185 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1186 .with_metrics(prometheus_registry)
1187 .build();
1188
1189 let p2p_network = {
1190 let routes = anemo::Router::new()
1191 .add_rpc_service(discovery_server)
1192 .add_rpc_service(state_sync_server);
1193 let routes = routes.merge(randomness_router);
1194
1195 let inbound_network_metrics =
1196 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1197 let outbound_network_metrics =
1198 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1199
1200 let service = ServiceBuilder::new()
1201 .layer(
1202 TraceLayer::new_for_server_errors()
1203 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1204 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1205 )
1206 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1207 Arc::new(inbound_network_metrics),
1208 config.p2p_config.excessive_message_size(),
1209 )))
1210 .service(routes);
1211
1212 let outbound_layer = ServiceBuilder::new()
1213 .layer(
1214 TraceLayer::new_for_client_and_server_errors()
1215 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1216 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1217 )
1218 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1219 Arc::new(outbound_network_metrics),
1220 config.p2p_config.excessive_message_size(),
1221 )))
1222 .into_inner();
1223
1224 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1225 anemo_config.max_frame_size = Some(1 << 30);
1228
1229 let mut quic_config = anemo_config.quic.unwrap_or_default();
1232 if quic_config.socket_send_buffer_size.is_none() {
1233 quic_config.socket_send_buffer_size = Some(20 << 20);
1234 }
1235 if quic_config.socket_receive_buffer_size.is_none() {
1236 quic_config.socket_receive_buffer_size = Some(20 << 20);
1237 }
1238 quic_config.allow_failed_socket_buffer_size_setting = true;
1239
1240 if quic_config.max_concurrent_bidi_streams.is_none() {
1243 quic_config.max_concurrent_bidi_streams = Some(500);
1244 }
1245 if quic_config.max_concurrent_uni_streams.is_none() {
1246 quic_config.max_concurrent_uni_streams = Some(500);
1247 }
1248 if quic_config.stream_receive_window.is_none() {
1249 quic_config.stream_receive_window = Some(100 << 20);
1250 }
1251 if quic_config.receive_window.is_none() {
1252 quic_config.receive_window = Some(200 << 20);
1253 }
1254 if quic_config.send_window.is_none() {
1255 quic_config.send_window = Some(200 << 20);
1256 }
1257 if quic_config.crypto_buffer_size.is_none() {
1258 quic_config.crypto_buffer_size = Some(1 << 20);
1259 }
1260 if quic_config.max_idle_timeout_ms.is_none() {
1261 quic_config.max_idle_timeout_ms = Some(30_000);
1262 }
1263 if quic_config.keep_alive_interval_ms.is_none() {
1264 quic_config.keep_alive_interval_ms = Some(5_000);
1265 }
1266 anemo_config.quic = Some(quic_config);
1267
1268 let server_name = format!("iota-{chain_identifier}");
1269 let network = Network::bind(config.p2p_config.listen_address)
1270 .server_name(&server_name)
1271 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1272 .config(anemo_config)
1273 .outbound_request_layer(outbound_layer)
1274 .start(service)?;
1275 info!(
1276 server_name = server_name,
1277 "P2p network started on {}",
1278 network.local_addr()
1279 );
1280
1281 network
1282 };
1283
1284 let discovery_handle =
1285 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1286 let state_sync_handle = state_sync.start(p2p_network.clone());
1287 let randomness_handle = randomness.start(p2p_network.clone());
1288
1289 Ok((
1290 p2p_network,
1291 discovery_handle,
1292 state_sync_handle,
1293 randomness_handle,
1294 ))
1295 }
1296
1297 async fn construct_validator_components(
1300 config: NodeConfig,
1301 state: Arc<AuthorityState>,
1302 committee: Arc<Committee>,
1303 epoch_store: Arc<AuthorityPerEpochStore>,
1304 checkpoint_store: Arc<CheckpointStore>,
1305 state_sync_handle: state_sync::Handle,
1306 randomness_handle: randomness::Handle,
1307 accumulator: Weak<StateAccumulator>,
1308 backpressure_manager: Arc<BackpressureManager>,
1309 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1310 registry_service: &RegistryService,
1311 iota_node_metrics: Arc<IotaNodeMetrics>,
1312 ) -> Result<ValidatorComponents> {
1313 let mut config_clone = config.clone();
1314 let consensus_config = config_clone
1315 .consensus_config
1316 .as_mut()
1317 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1318 let validator_registry = Registry::new();
1319 let validator_registry_id = registry_service.add(validator_registry.clone());
1320
1321 let client = Arc::new(UpdatableConsensusClient::new());
1322 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1323 &committee,
1324 consensus_config,
1325 state.name,
1326 connection_monitor_status.clone(),
1327 &validator_registry,
1328 client.clone(),
1329 checkpoint_store.clone(),
1330 ));
1331 let consensus_manager = ConsensusManager::new(
1332 &config,
1333 consensus_config,
1334 registry_service,
1335 &validator_registry,
1336 client,
1337 );
1338
1339 let consensus_store_pruner = ConsensusStorePruner::new(
1342 consensus_manager.get_storage_base_path(),
1343 consensus_config.db_retention_epochs(),
1344 consensus_config.db_pruner_period(),
1345 &validator_registry,
1346 );
1347
1348 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1349 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1350
1351 let validator_server_handle = Self::start_grpc_validator_service(
1352 &config,
1353 state.clone(),
1354 consensus_adapter.clone(),
1355 &validator_registry,
1356 )
1357 .await?;
1358
1359 let validator_overload_monitor_handle = if config
1362 .authority_overload_config
1363 .max_load_shedding_percentage
1364 > 0
1365 {
1366 let authority_state = Arc::downgrade(&state);
1367 let overload_config = config.authority_overload_config.clone();
1368 fail_point!("starting_overload_monitor");
1369 Some(spawn_monitored_task!(overload_monitor(
1370 authority_state,
1371 overload_config,
1372 )))
1373 } else {
1374 None
1375 };
1376
1377 Self::start_epoch_specific_validator_components(
1378 &config,
1379 state.clone(),
1380 consensus_adapter,
1381 checkpoint_store,
1382 epoch_store,
1383 state_sync_handle,
1384 randomness_handle,
1385 consensus_manager,
1386 consensus_store_pruner,
1387 accumulator,
1388 backpressure_manager,
1389 validator_server_handle,
1390 validator_overload_monitor_handle,
1391 checkpoint_metrics,
1392 iota_node_metrics,
1393 iota_tx_validator_metrics,
1394 validator_registry_id,
1395 )
1396 .await
1397 }
1398
1399 async fn start_epoch_specific_validator_components(
1402 config: &NodeConfig,
1403 state: Arc<AuthorityState>,
1404 consensus_adapter: Arc<ConsensusAdapter>,
1405 checkpoint_store: Arc<CheckpointStore>,
1406 epoch_store: Arc<AuthorityPerEpochStore>,
1407 state_sync_handle: state_sync::Handle,
1408 randomness_handle: randomness::Handle,
1409 consensus_manager: ConsensusManager,
1410 consensus_store_pruner: ConsensusStorePruner,
1411 accumulator: Weak<StateAccumulator>,
1412 backpressure_manager: Arc<BackpressureManager>,
1413 validator_server_handle: SpawnOnce,
1414 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1415 checkpoint_metrics: Arc<CheckpointMetrics>,
1416 iota_node_metrics: Arc<IotaNodeMetrics>,
1417 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1418 validator_registry_id: RegistryID,
1419 ) -> Result<ValidatorComponents> {
1420 let checkpoint_service = Self::build_checkpoint_service(
1421 config,
1422 consensus_adapter.clone(),
1423 checkpoint_store.clone(),
1424 epoch_store.clone(),
1425 state.clone(),
1426 state_sync_handle,
1427 accumulator,
1428 checkpoint_metrics.clone(),
1429 );
1430
1431 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1436
1437 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1438
1439 let randomness_manager = RandomnessManager::try_new(
1440 Arc::downgrade(&epoch_store),
1441 Box::new(consensus_adapter.clone()),
1442 randomness_handle,
1443 config.authority_key_pair(),
1444 )
1445 .await;
1446 if let Some(randomness_manager) = randomness_manager {
1447 epoch_store
1448 .set_randomness_manager(randomness_manager)
1449 .await?;
1450 }
1451
1452 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1453 state.clone(),
1454 checkpoint_service.clone(),
1455 epoch_store.clone(),
1456 low_scoring_authorities,
1457 backpressure_manager,
1458 );
1459
1460 info!("Starting consensus manager");
1461
1462 consensus_manager
1463 .start(
1464 config,
1465 epoch_store.clone(),
1466 consensus_handler_initializer,
1467 IotaTxValidator::new(
1468 epoch_store.clone(),
1469 checkpoint_service.clone(),
1470 state.transaction_manager().clone(),
1471 iota_tx_validator_metrics.clone(),
1472 ),
1473 )
1474 .await;
1475
1476 info!("Spawning checkpoint service");
1477 let checkpoint_service_tasks = checkpoint_service.spawn().await;
1478
1479 if epoch_store.authenticator_state_enabled() {
1480 Self::start_jwk_updater(
1481 config,
1482 iota_node_metrics,
1483 state.name,
1484 epoch_store.clone(),
1485 consensus_adapter.clone(),
1486 );
1487 }
1488
1489 Ok(ValidatorComponents {
1490 validator_server_handle,
1491 validator_overload_monitor_handle,
1492 consensus_manager,
1493 consensus_store_pruner,
1494 consensus_adapter,
1495 checkpoint_service_tasks,
1496 checkpoint_metrics,
1497 iota_tx_validator_metrics,
1498 validator_registry_id,
1499 })
1500 }
1501
1502 fn build_checkpoint_service(
1509 config: &NodeConfig,
1510 consensus_adapter: Arc<ConsensusAdapter>,
1511 checkpoint_store: Arc<CheckpointStore>,
1512 epoch_store: Arc<AuthorityPerEpochStore>,
1513 state: Arc<AuthorityState>,
1514 state_sync_handle: state_sync::Handle,
1515 accumulator: Weak<StateAccumulator>,
1516 checkpoint_metrics: Arc<CheckpointMetrics>,
1517 ) -> Arc<CheckpointService> {
1518 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1519 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1520
1521 debug!(
1522 "Starting checkpoint service with epoch start timestamp {}
1523 and epoch duration {}",
1524 epoch_start_timestamp_ms, epoch_duration_ms
1525 );
1526
1527 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1528 sender: consensus_adapter,
1529 signer: state.secret.clone(),
1530 authority: config.authority_public_key(),
1531 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1532 .checked_add(epoch_duration_ms)
1533 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1534 metrics: checkpoint_metrics.clone(),
1535 });
1536
1537 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1538 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1539 let max_checkpoint_size_bytes =
1540 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1541
1542 CheckpointService::build(
1543 state.clone(),
1544 checkpoint_store,
1545 epoch_store,
1546 state.get_transaction_cache_reader().clone(),
1547 accumulator,
1548 checkpoint_output,
1549 Box::new(certified_checkpoint_output),
1550 checkpoint_metrics,
1551 max_tx_per_checkpoint,
1552 max_checkpoint_size_bytes,
1553 )
1554 }
1555
1556 fn construct_consensus_adapter(
1557 committee: &Committee,
1558 consensus_config: &ConsensusConfig,
1559 authority: AuthorityName,
1560 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1561 prometheus_registry: &Registry,
1562 consensus_client: Arc<dyn ConsensusClient>,
1563 checkpoint_store: Arc<CheckpointStore>,
1564 ) -> ConsensusAdapter {
1565 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1566 ConsensusAdapter::new(
1570 consensus_client,
1571 checkpoint_store,
1572 authority,
1573 connection_monitor_status,
1574 consensus_config.max_pending_transactions(),
1575 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1576 consensus_config.max_submit_position,
1577 consensus_config.submit_delay_step_override(),
1578 ca_metrics,
1579 )
1580 }
1581
1582 async fn start_grpc_validator_service(
1583 config: &NodeConfig,
1584 state: Arc<AuthorityState>,
1585 consensus_adapter: Arc<ConsensusAdapter>,
1586 prometheus_registry: &Registry,
1587 ) -> Result<SpawnOnce> {
1588 let validator_service = ValidatorService::new(
1589 state,
1590 consensus_adapter,
1591 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1592 TrafficControllerMetrics::new(prometheus_registry),
1593 config.policy_config.clone(),
1594 config.firewall_config.clone(),
1595 );
1596
1597 let mut server_conf = iota_network_stack::config::Config::new();
1598 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1599 server_conf.load_shed = config.grpc_load_shed;
1600 let server_builder =
1601 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1602 .add_service(ValidatorServer::new(validator_service));
1603
1604 let tls_config = iota_tls::create_rustls_server_config(
1605 config.network_key_pair().copy().private(),
1606 IOTA_TLS_SERVER_NAME.to_string(),
1607 );
1608
1609 let network_address = config.network_address().clone();
1610
1611 let bind_future = async move {
1612 let server = server_builder
1613 .bind(&network_address, Some(tls_config))
1614 .await
1615 .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1616
1617 let local_addr = server.local_addr();
1618 info!("Listening to traffic on {local_addr}");
1619
1620 Ok(server)
1621 };
1622
1623 Ok(SpawnOnce::new(bind_future))
1624 }
1625
1626 async fn reexecute_pending_consensus_certs(
1644 epoch_store: &Arc<AuthorityPerEpochStore>,
1645 state: &Arc<AuthorityState>,
1646 ) {
1647 let mut pending_consensus_certificates = Vec::new();
1648 let mut additional_certs = Vec::new();
1649
1650 for tx in epoch_store.get_all_pending_consensus_transactions() {
1651 match tx.kind {
1652 ConsensusTransactionKind::CertifiedTransaction(tx)
1655 if !tx.contains_shared_object() =>
1656 {
1657 let tx = *tx;
1658 let tx = VerifiedExecutableTransaction::new_from_certificate(
1661 VerifiedCertificate::new_unchecked(tx),
1662 );
1663 if let Some(fx_digest) = epoch_store
1666 .get_signed_effects_digest(tx.digest())
1667 .expect("db error")
1668 {
1669 pending_consensus_certificates.push((tx, fx_digest));
1670 } else {
1671 additional_certs.push(tx);
1672 }
1673 }
1674 _ => (),
1675 }
1676 }
1677
1678 let digests = pending_consensus_certificates
1679 .iter()
1680 .map(|(tx, _)| *tx.digest())
1681 .collect::<Vec<_>>();
1682
1683 info!(
1684 "reexecuting {} pending consensus certificates: {:?}",
1685 digests.len(),
1686 digests
1687 );
1688
1689 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1690 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1691
1692 let timeout = if cfg!(msim) { 120 } else { 60 };
1698 if tokio::time::timeout(
1699 std::time::Duration::from_secs(timeout),
1700 state
1701 .get_transaction_cache_reader()
1702 .try_notify_read_executed_effects_digests(&digests),
1703 )
1704 .await
1705 .is_err()
1706 {
1707 if let Ok(executed_effects_digests) = state
1709 .get_transaction_cache_reader()
1710 .try_multi_get_executed_effects_digests(&digests)
1711 {
1712 let pending_digests = digests
1713 .iter()
1714 .zip(executed_effects_digests.iter())
1715 .filter_map(|(digest, executed_effects_digest)| {
1716 if executed_effects_digest.is_none() {
1717 Some(digest)
1718 } else {
1719 None
1720 }
1721 })
1722 .collect::<Vec<_>>();
1723 debug_fatal!(
1724 "Timed out waiting for effects digests to be executed: {:?}",
1725 pending_digests
1726 );
1727 } else {
1728 debug_fatal!(
1729 "Timed out waiting for effects digests to be executed, digests not found"
1730 );
1731 }
1732 }
1733 }
1734
1735 pub fn state(&self) -> Arc<AuthorityState> {
1736 self.state.clone()
1737 }
1738
1739 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1741 self.state.reference_gas_price_for_testing()
1742 }
1743
1744 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1745 self.state.committee_store().clone()
1746 }
1747
1748 pub fn clone_authority_aggregator(
1758 &self,
1759 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1760 self.transaction_orchestrator
1761 .as_ref()
1762 .map(|to| to.clone_authority_aggregator())
1763 }
1764
1765 pub fn transaction_orchestrator(
1766 &self,
1767 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1768 self.transaction_orchestrator.clone()
1769 }
1770
1771 pub fn subscribe_to_transaction_orchestrator_effects(
1772 &self,
1773 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1774 self.transaction_orchestrator
1775 .as_ref()
1776 .map(|to| to.subscribe_to_effects_queue())
1777 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1778 }
1779
1780 pub async fn monitor_reconfiguration(
1786 self: Arc<Self>,
1787 mut epoch_store: Arc<AuthorityPerEpochStore>,
1788 ) -> Result<()> {
1789 let checkpoint_executor_metrics =
1790 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1791
1792 loop {
1793 let mut accumulator_guard = self.accumulator.lock().await;
1794 let accumulator = accumulator_guard.take().unwrap();
1795 info!(
1796 "Creating checkpoint executor for epoch {}",
1797 epoch_store.epoch()
1798 );
1799
1800 let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1802 guard.as_ref().map(|handle| {
1803 let tx = handle.checkpoint_data_broadcaster().clone();
1804 Box::new(move |data: &CheckpointData| {
1805 tx.send_traced(data);
1806 }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1807 })
1808 } else {
1809 None
1810 };
1811
1812 let checkpoint_executor = CheckpointExecutor::new(
1813 epoch_store.clone(),
1814 self.checkpoint_store.clone(),
1815 self.state.clone(),
1816 accumulator.clone(),
1817 self.backpressure_manager.clone(),
1818 self.config.checkpoint_executor_config.clone(),
1819 checkpoint_executor_metrics.clone(),
1820 data_sender,
1821 );
1822
1823 let run_with_range = self.config.run_with_range;
1824
1825 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1826
1827 self.metrics
1829 .current_protocol_version
1830 .set(cur_epoch_store.protocol_config().version.as_u64() as i64);
1831
1832 if let Some(components) = &*self.validator_components.lock().await {
1834 tokio::time::sleep(Duration::from_millis(1)).await;
1836
1837 let config = cur_epoch_store.protocol_config();
1838 let binary_config = to_binary_config(config);
1839 let transaction = ConsensusTransaction::new_capability_notification_v1(
1840 AuthorityCapabilitiesV1::new(
1841 self.state.name,
1842 cur_epoch_store.get_chain_identifier().chain(),
1843 self.config
1844 .supported_protocol_versions
1845 .expect("Supported versions should be populated")
1846 .truncate_below(config.version),
1848 self.state
1849 .get_available_system_packages(&binary_config)
1850 .await,
1851 ),
1852 );
1853 info!(?transaction, "submitting capabilities to consensus");
1854 components
1855 .consensus_adapter
1856 .submit(transaction, None, &cur_epoch_store)?;
1857 } else if self.state.is_active_validator(&cur_epoch_store)
1858 && cur_epoch_store
1859 .protocol_config()
1860 .track_non_committee_eligible_validators()
1861 {
1862 let epoch_store = cur_epoch_store.clone();
1866 let node_clone = self.clone();
1867 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1868 node_clone
1869 .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1870 .instrument(trace_span!(
1871 "send_signed_capability_notification_to_committee_with_retry"
1872 ))
1873 .await;
1874 }));
1875 }
1876
1877 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1878
1879 if stop_condition == StopReason::RunWithRangeCondition {
1880 IotaNode::shutdown(&self).await;
1881 self.shutdown_channel_tx
1882 .send(run_with_range)
1883 .expect("RunWithRangeCondition met but failed to send shutdown message");
1884 return Ok(());
1885 }
1886
1887 let latest_system_state = self
1889 .state
1890 .get_object_cache_reader()
1891 .try_get_iota_system_state_object_unsafe()
1892 .expect("Read IOTA System State object cannot fail");
1893
1894 #[cfg(msim)]
1895 if !self
1896 .sim_state
1897 .sim_safe_mode_expected
1898 .load(Ordering::Relaxed)
1899 {
1900 debug_assert!(!latest_system_state.safe_mode());
1901 }
1902
1903 #[cfg(not(msim))]
1904 debug_assert!(!latest_system_state.safe_mode());
1905
1906 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1907 if self.state.is_fullnode(&cur_epoch_store) {
1908 warn!(
1909 "Failed to send end of epoch notification to subscriber: {:?}",
1910 err
1911 );
1912 }
1913 }
1914
1915 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1916 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1917
1918 self.auth_agg.store(Arc::new(
1919 self.auth_agg
1920 .load()
1921 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1922 ));
1923
1924 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1925 let next_epoch = next_epoch_committee.epoch();
1926 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1927
1928 info!(
1929 next_epoch,
1930 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1931 );
1932
1933 fail_point_async!("reconfig_delay");
1934
1935 let authority_names_to_peer_ids =
1940 new_epoch_start_state.get_authority_names_to_peer_ids();
1941 self.connection_monitor_status
1942 .update_mapping_for_epoch(authority_names_to_peer_ids);
1943
1944 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1945
1946 send_trusted_peer_change(
1947 &self.config,
1948 &self.trusted_peer_change_tx,
1949 &new_epoch_start_state,
1950 );
1951
1952 let mut validator_components_lock_guard = self.validator_components.lock().await;
1953
1954 let new_epoch_store = self
1958 .reconfigure_state(
1959 &self.state,
1960 &cur_epoch_store,
1961 next_epoch_committee.clone(),
1962 new_epoch_start_state,
1963 accumulator.clone(),
1964 )
1965 .await?;
1966
1967 let new_validator_components = if let Some(ValidatorComponents {
1968 validator_server_handle,
1969 validator_overload_monitor_handle,
1970 consensus_manager,
1971 consensus_store_pruner,
1972 consensus_adapter,
1973 mut checkpoint_service_tasks,
1974 checkpoint_metrics,
1975 iota_tx_validator_metrics,
1976 validator_registry_id,
1977 }) = validator_components_lock_guard.take()
1978 {
1979 info!("Reconfiguring the validator.");
1980 checkpoint_service_tasks.abort_all();
1985 while let Some(result) = checkpoint_service_tasks.join_next().await {
1986 if let Err(err) = result {
1987 if err.is_panic() {
1988 std::panic::resume_unwind(err.into_panic());
1989 }
1990 warn!("Error in checkpoint service task: {:?}", err);
1991 }
1992 }
1993 info!("Checkpoint service has shut down.");
1994
1995 consensus_manager.shutdown().await;
1996 info!("Consensus has shut down.");
1997
1998 info!("Epoch store finished reconfiguration.");
1999
2000 let accumulator_metrics = Arc::into_inner(accumulator)
2003 .expect("Accumulator should have no other references at this point")
2004 .metrics();
2005 let new_accumulator = Arc::new(StateAccumulator::new(
2006 self.state.get_accumulator_store().clone(),
2007 accumulator_metrics,
2008 ));
2009 let weak_accumulator = Arc::downgrade(&new_accumulator);
2010 *accumulator_guard = Some(new_accumulator);
2011
2012 consensus_store_pruner.prune(next_epoch).await;
2013
2014 if self.state.is_committee_validator(&new_epoch_store) {
2015 Some(
2017 Self::start_epoch_specific_validator_components(
2018 &self.config,
2019 self.state.clone(),
2020 consensus_adapter,
2021 self.checkpoint_store.clone(),
2022 new_epoch_store.clone(),
2023 self.state_sync_handle.clone(),
2024 self.randomness_handle.clone(),
2025 consensus_manager,
2026 consensus_store_pruner,
2027 weak_accumulator,
2028 self.backpressure_manager.clone(),
2029 validator_server_handle,
2030 validator_overload_monitor_handle,
2031 checkpoint_metrics,
2032 self.metrics.clone(),
2033 iota_tx_validator_metrics,
2034 validator_registry_id,
2035 )
2036 .await?,
2037 )
2038 } else {
2039 info!("This node is no longer a validator after reconfiguration");
2040 if self.registry_service.remove(validator_registry_id) {
2041 debug!("Removed validator metrics registry");
2042 } else {
2043 warn!("Failed to remove validator metrics registry");
2044 }
2045 validator_server_handle.shutdown();
2046 debug!("Validator grpc server shutdown triggered");
2047
2048 None
2049 }
2050 } else {
2051 let accumulator_metrics = Arc::into_inner(accumulator)
2054 .expect("Accumulator should have no other references at this point")
2055 .metrics();
2056 let new_accumulator = Arc::new(StateAccumulator::new(
2057 self.state.get_accumulator_store().clone(),
2058 accumulator_metrics,
2059 ));
2060 let weak_accumulator = Arc::downgrade(&new_accumulator);
2061 *accumulator_guard = Some(new_accumulator);
2062
2063 if self.state.is_committee_validator(&new_epoch_store) {
2064 info!("Promoting the node from fullnode to validator, starting grpc server");
2065
2066 let mut components = Self::construct_validator_components(
2067 self.config.clone(),
2068 self.state.clone(),
2069 Arc::new(next_epoch_committee.clone()),
2070 new_epoch_store.clone(),
2071 self.checkpoint_store.clone(),
2072 self.state_sync_handle.clone(),
2073 self.randomness_handle.clone(),
2074 weak_accumulator,
2075 self.backpressure_manager.clone(),
2076 self.connection_monitor_status.clone(),
2077 &self.registry_service,
2078 self.metrics.clone(),
2079 )
2080 .await?;
2081
2082 components.validator_server_handle =
2083 components.validator_server_handle.start().await;
2084
2085 Some(components)
2086 } else {
2087 None
2088 }
2089 };
2090 *validator_components_lock_guard = new_validator_components;
2091
2092 cur_epoch_store.release_db_handles();
2095
2096 if cfg!(msim)
2097 && !matches!(
2098 self.config
2099 .authority_store_pruning_config
2100 .num_epochs_to_retain_for_checkpoints(),
2101 None | Some(u64::MAX) | Some(0)
2102 )
2103 {
2104 self.state
2105 .prune_checkpoints_for_eligible_epochs_for_testing(
2106 self.config.clone(),
2107 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2108 )
2109 .await?;
2110 }
2111
2112 epoch_store = new_epoch_store;
2113 info!("Reconfiguration finished");
2114 }
2115 }
2116
2117 async fn shutdown(&self) {
2118 if let Some(validator_components) = &*self.validator_components.lock().await {
2119 validator_components.consensus_manager.shutdown().await;
2120 }
2121
2122 if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2124 info!("Shutting down gRPC server");
2125 if let Err(e) = grpc_handle.shutdown().await {
2126 warn!("Failed to gracefully shutdown gRPC server: {e}");
2127 }
2128 }
2129 }
2130
2131 async fn reconfigure_state(
2134 &self,
2135 state: &Arc<AuthorityState>,
2136 cur_epoch_store: &AuthorityPerEpochStore,
2137 next_epoch_committee: Committee,
2138 next_epoch_start_system_state: EpochStartSystemState,
2139 accumulator: Arc<StateAccumulator>,
2140 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2141 let next_epoch = next_epoch_committee.epoch();
2142
2143 let last_checkpoint = self
2144 .checkpoint_store
2145 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2146 .expect("Error loading last checkpoint for current epoch")
2147 .expect("Could not load last checkpoint for current epoch");
2148 let epoch_supply_change = last_checkpoint
2149 .end_of_epoch_data
2150 .as_ref()
2151 .ok_or_else(|| {
2152 IotaError::from("last checkpoint in epoch should contain end of epoch data")
2153 })?
2154 .epoch_supply_change;
2155
2156 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2157
2158 assert_eq!(
2159 Some(last_checkpoint_seq),
2160 self.checkpoint_store
2161 .get_highest_executed_checkpoint_seq_number()
2162 .expect("Error loading highest executed checkpoint sequence number")
2163 );
2164
2165 let epoch_start_configuration = EpochStartConfiguration::new(
2166 next_epoch_start_system_state,
2167 *last_checkpoint.digest(),
2168 state.get_object_store().as_ref(),
2169 EpochFlag::default_flags_for_new_epoch(&state.config),
2170 )
2171 .expect("EpochStartConfiguration construction cannot fail");
2172
2173 let new_epoch_store = self
2174 .state
2175 .reconfigure(
2176 cur_epoch_store,
2177 self.config.supported_protocol_versions.unwrap(),
2178 next_epoch_committee,
2179 epoch_start_configuration,
2180 accumulator,
2181 &self.config.expensive_safety_check_config,
2182 epoch_supply_change,
2183 last_checkpoint_seq,
2184 )
2185 .await
2186 .expect("Reconfigure authority state cannot fail");
2187 info!(next_epoch, "Node State has been reconfigured");
2188 assert_eq!(next_epoch, new_epoch_store.epoch());
2189 self.state.get_reconfig_api().update_epoch_flags_metrics(
2190 cur_epoch_store.epoch_start_config().flags(),
2191 new_epoch_store.epoch_start_config().flags(),
2192 );
2193
2194 Ok(new_epoch_store)
2195 }
2196
2197 pub fn get_config(&self) -> &NodeConfig {
2198 &self.config
2199 }
2200
2201 async fn execute_transaction_immediately_at_zero_epoch(
2202 state: &Arc<AuthorityState>,
2203 epoch_store: &Arc<AuthorityPerEpochStore>,
2204 tx: &Transaction,
2205 span: tracing::Span,
2206 ) {
2207 let _guard = span.enter();
2208 let transaction =
2209 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2210 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2211 tx.data().clone(),
2212 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2213 ),
2214 );
2215 state
2216 .try_execute_immediately(&transaction, None, epoch_store)
2217 .unwrap();
2218 }
2219
2220 pub fn randomness_handle(&self) -> randomness::Handle {
2221 self.randomness_handle.clone()
2222 }
2223
2224 async fn send_signed_capability_notification_to_committee_with_retry(
2230 &self,
2231 epoch_store: &Arc<AuthorityPerEpochStore>,
2232 ) {
2233 const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2234 const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2235 const MAX_RETRY_INTERVAL_SECS: u64 = 300; let config = epoch_store.protocol_config();
2239 let binary_config = to_binary_config(config);
2240
2241 let capabilities = AuthorityCapabilitiesV1::new(
2243 self.state.name,
2244 epoch_store.get_chain_identifier().chain(),
2245 self.config
2246 .supported_protocol_versions
2247 .expect("Supported versions should be populated")
2248 .truncate_below(config.version),
2249 self.state
2250 .get_available_system_packages(&binary_config)
2251 .await,
2252 );
2253
2254 let signature = AuthoritySignature::new_secure(
2256 &IntentMessage::new(
2257 Intent::iota_app(IntentScope::AuthorityCapabilities),
2258 &capabilities,
2259 ),
2260 &epoch_store.epoch(),
2261 self.config.authority_key_pair(),
2262 );
2263
2264 let request = HandleCapabilityNotificationRequestV1 {
2265 message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2266 };
2267
2268 let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2269
2270 loop {
2271 let auth_agg = self.auth_agg.load();
2272 match auth_agg
2273 .send_capability_notification_to_quorum(request.clone())
2274 .await
2275 {
2276 Ok(_) => {
2277 info!("Successfully sent capability notification to committee");
2278 break;
2279 }
2280 Err(err) => {
2281 match &err {
2282 AggregatorSendCapabilityNotificationError::RetryableNotification {
2283 errors,
2284 } => {
2285 warn!(
2286 "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2287 retry_interval, errors
2288 );
2289 }
2290 AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2291 errors,
2292 } => {
2293 error!(
2294 "Failed to send capability notification to committee (non-retryable error): {:?}",
2295 errors
2296 );
2297 break;
2298 }
2299 };
2300
2301 tokio::time::sleep(retry_interval).await;
2303
2304 retry_interval = std::cmp::min(
2306 retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2307 Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2308 );
2309 }
2310 }
2311 }
2312 }
2313}
2314
2315#[cfg(not(msim))]
2316impl IotaNode {
2317 async fn fetch_jwks(
2318 _authority: AuthorityName,
2319 provider: &OIDCProvider,
2320 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2321 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2322 let client = reqwest::Client::new();
2323 fetch_jwks(provider, &client)
2324 .await
2325 .map_err(|_| IotaError::JWKRetrieval)
2326 }
2327}
2328
2329#[cfg(msim)]
2330impl IotaNode {
2331 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2332 self.sim_state.sim_node.id()
2333 }
2334
2335 pub fn set_safe_mode_expected(&self, new_value: bool) {
2336 info!("Setting safe mode expected to {}", new_value);
2337 self.sim_state
2338 .sim_safe_mode_expected
2339 .store(new_value, Ordering::Relaxed);
2340 }
2341
2342 async fn fetch_jwks(
2343 authority: AuthorityName,
2344 provider: &OIDCProvider,
2345 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2346 get_jwk_injector()(authority, provider)
2347 }
2348}
2349
2350enum SpawnOnce {
2351 Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2353 #[allow(unused)]
2354 Started(iota_http::ServerHandle),
2355}
2356
2357impl SpawnOnce {
2358 pub fn new(
2359 future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2360 ) -> Self {
2361 Self::Unstarted(Mutex::new(Box::pin(future)))
2362 }
2363
2364 pub async fn start(self) -> Self {
2365 match self {
2366 Self::Unstarted(future) => {
2367 let server = future
2368 .into_inner()
2369 .await
2370 .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2371 let handle = server.handle().clone();
2372 tokio::spawn(async move {
2373 if let Err(err) = server.serve().await {
2374 info!("Server stopped: {err}");
2375 }
2376 info!("Server stopped");
2377 });
2378 Self::Started(handle)
2379 }
2380 Self::Started(_) => self,
2381 }
2382 }
2383
2384 pub fn shutdown(self) {
2385 if let SpawnOnce::Started(handle) = self {
2386 handle.trigger_shutdown();
2387 }
2388 }
2389}
2390
2391fn send_trusted_peer_change(
2394 config: &NodeConfig,
2395 sender: &watch::Sender<TrustedPeerChangeEvent>,
2396 new_epoch_start_state: &EpochStartSystemState,
2397) {
2398 let new_committee =
2399 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2400
2401 sender.send_modify(|event| {
2402 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2403 event.new_committee = new_committee;
2404 })
2405}
2406
2407fn build_kv_store(
2408 state: &Arc<AuthorityState>,
2409 config: &NodeConfig,
2410 registry: &Registry,
2411) -> Result<Arc<TransactionKeyValueStore>> {
2412 let metrics = KeyValueStoreMetrics::new(registry);
2413 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2414
2415 let base_url = &config.transaction_kv_store_read_config.base_url;
2416
2417 if base_url.is_empty() {
2418 info!("no http kv store url provided, using local db only");
2419 return Ok(Arc::new(db_store));
2420 }
2421
2422 base_url.parse::<url::Url>().tap_err(|e| {
2423 error!(
2424 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2425 base_url, e
2426 )
2427 })?;
2428
2429 let http_store = HttpKVStore::new_kv(
2430 base_url,
2431 config.transaction_kv_store_read_config.cache_size,
2432 metrics.clone(),
2433 )?;
2434 info!("using local key-value store with fallback to http key-value store");
2435 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2436 db_store,
2437 http_store,
2438 metrics,
2439 "json_rpc_fallback",
2440 )))
2441}
2442
2443async fn build_grpc_server(
2458 config: &NodeConfig,
2459 state: Arc<AuthorityState>,
2460 state_sync_store: RocksDbStore,
2461 executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2462 prometheus_registry: &Registry,
2463 server_version: ServerVersion,
2464) -> Result<Option<GrpcServerHandle>> {
2465 if config.consensus_config().is_some() || !config.enable_grpc_api {
2467 return Ok(None);
2468 }
2469
2470 let Some(grpc_config) = &config.grpc_api_config else {
2471 return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2472 };
2473
2474 let chain_id = state.get_chain_identifier();
2476
2477 let grpc_read_store = Arc::new(GrpcReadStore::new(state.clone(), state_sync_store));
2478
2479 let shutdown_token = CancellationToken::new();
2481
2482 let grpc_reader = Arc::new(GrpcReader::new(
2484 grpc_read_store,
2485 Some(server_version.to_string()),
2486 ));
2487
2488 let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2490
2491 let handle = start_grpc_server(
2492 grpc_reader,
2493 executor,
2494 grpc_config.clone(),
2495 shutdown_token,
2496 chain_id,
2497 Some(grpc_server_metrics),
2498 )
2499 .await?;
2500
2501 Ok(Some(handle))
2502}
2503
2504pub async fn build_http_server(
2519 state: Arc<AuthorityState>,
2520 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2521 config: &NodeConfig,
2522 prometheus_registry: &Registry,
2523) -> Result<Option<iota_http::ServerHandle>> {
2524 if config.consensus_config().is_some() {
2526 return Ok(None);
2527 }
2528
2529 let mut router = axum::Router::new();
2530
2531 let json_rpc_router = {
2532 let mut server = JsonRpcServerBuilder::new(
2533 env!("CARGO_PKG_VERSION"),
2534 prometheus_registry,
2535 config.policy_config.clone(),
2536 config.firewall_config.clone(),
2537 );
2538
2539 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2540
2541 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2542 server.register_module(ReadApi::new(
2543 state.clone(),
2544 kv_store.clone(),
2545 metrics.clone(),
2546 ))?;
2547 server.register_module(CoinReadApi::new(
2548 state.clone(),
2549 kv_store.clone(),
2550 metrics.clone(),
2551 )?)?;
2552
2553 if config.run_with_range.is_none() {
2556 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2557 }
2558 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2559
2560 if let Some(transaction_orchestrator) = transaction_orchestrator {
2561 server.register_module(TransactionExecutionApi::new(
2562 state.clone(),
2563 transaction_orchestrator.clone(),
2564 metrics.clone(),
2565 ))?;
2566 }
2567
2568 let iota_names_config = config
2569 .iota_names_config
2570 .clone()
2571 .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2572
2573 server.register_module(IndexerApi::new(
2574 state.clone(),
2575 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2576 kv_store,
2577 metrics,
2578 iota_names_config,
2579 config.indexer_max_subscriptions,
2580 ))?;
2581 server.register_module(MoveUtils::new(state.clone()))?;
2582
2583 let server_type = config.jsonrpc_server_type();
2584
2585 server.to_router(server_type).await?
2586 };
2587
2588 router = router.merge(json_rpc_router);
2589
2590 router = router
2591 .route("/health", axum::routing::get(health_check_handler))
2592 .route_layer(axum::Extension(state));
2593
2594 let layers = ServiceBuilder::new()
2595 .map_request(|mut request: axum::http::Request<_>| {
2596 if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2597 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2598 request.extensions_mut().insert(axum_connect_info);
2599 }
2600 request
2601 })
2602 .layer(axum::middleware::from_fn(server_timing_middleware));
2603
2604 router = router.layer(layers);
2605
2606 let handle = iota_http::Builder::new()
2607 .serve(&config.json_rpc_address, router)
2608 .map_err(|e| anyhow::anyhow!("{e}"))?;
2609 info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2610
2611 Ok(Some(handle))
2612}
2613
2614#[derive(Debug, serde::Serialize, serde::Deserialize)]
2615pub struct Threshold {
2616 pub threshold_seconds: Option<u32>,
2617}
2618
2619async fn health_check_handler(
2620 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2621 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2622) -> impl axum::response::IntoResponse {
2623 if let Some(threshold_seconds) = threshold_seconds {
2624 let summary = match state
2626 .get_checkpoint_store()
2627 .get_highest_executed_checkpoint()
2628 {
2629 Ok(Some(summary)) => summary,
2630 Ok(None) => {
2631 warn!("Highest executed checkpoint not found");
2632 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2633 }
2634 Err(err) => {
2635 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2636 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2637 }
2638 };
2639
2640 let latest_chain_time = summary.timestamp();
2642 let threshold =
2643 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2644
2645 if latest_chain_time < threshold {
2647 warn!(
2648 ?latest_chain_time,
2649 ?threshold,
2650 "failing health check due to checkpoint lag"
2651 );
2652 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2653 }
2654 }
2655 (axum::http::StatusCode::OK, "up")
2657}
2658
2659#[cfg(not(test))]
2660fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2661 protocol_config.max_transactions_per_checkpoint() as usize
2662}
2663
2664#[cfg(test)]
2665fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2666 2
2667}