1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::{BTreeSet, HashMap, HashSet},
9 fmt,
10 net::SocketAddr,
11 path::PathBuf,
12 str::FromStr,
13 sync::{Arc, Weak},
14 time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19 callback::CallbackLayer,
20 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::TryFutureExt;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_config::{
29 ConsensusConfig, NodeConfig,
30 node::{DBCheckpointConfig, RunWithRange},
31 node_config_metrics::NodeConfigMetrics,
32 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
33};
34use iota_core::{
35 authority::{
36 AuthorityState, AuthorityStore, CHAIN_IDENTIFIER, RandomnessRoundReceiver,
37 authority_per_epoch_store::AuthorityPerEpochStore,
38 authority_store_tables::{AuthorityPerpetualTables, AuthorityPerpetualTablesOptions},
39 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
40 },
41 authority_aggregator::{AuthAggMetrics, AuthorityAggregator},
42 authority_client::NetworkAuthorityClient,
43 authority_server::{ValidatorService, ValidatorServiceMetrics},
44 checkpoints::{
45 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
46 SubmitCheckpointToConsensus,
47 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
48 },
49 connection_monitor::ConnectionMonitor,
50 consensus_adapter::{
51 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
52 ConsensusClient,
53 },
54 consensus_handler::ConsensusHandlerInitializer,
55 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
56 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
57 db_checkpoint_handler::DBCheckpointHandler,
58 epoch::{
59 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
60 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
61 reconfiguration::ReconfigurationInitiator,
62 },
63 execution_cache::build_execution_cache,
64 module_cache_metrics::ResolverMetrics,
65 overload_monitor::overload_monitor,
66 rest_index::RestIndexStore,
67 safe_client::SafeClientMetricsBase,
68 signature_verifier::SignatureVerifierMetrics,
69 state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
70 storage::{RestReadStore, RocksDbStore},
71 traffic_controller::metrics::TrafficControllerMetrics,
72 transaction_orchestrator::TransactionOrchestrator,
73 validator_tx_finalizer::ValidatorTxFinalizer,
74};
75use iota_json_rpc::{
76 JsonRpcServerBuilder, bridge_api::BridgeReadApi, coin_api::CoinReadApi,
77 governance_api::GovernanceReadApi, indexer_api::IndexerApi, move_utils::MoveUtils,
78 read_api::ReadApi, transaction_builder_api::TransactionBuilderApi,
79 transaction_execution_api::TransactionExecutionApi,
80};
81use iota_json_rpc_api::JsonRpcMetrics;
82use iota_macros::{fail_point, fail_point_async, replay_log};
83use iota_metrics::{
84 RegistryID, RegistryService,
85 hardware_metrics::register_hardware_metrics,
86 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
87 server_timing_middleware, spawn_monitored_task,
88};
89use iota_network::{
90 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
91};
92use iota_network_stack::server::ServerBuilder;
93use iota_protocol_config::ProtocolConfig;
94use iota_rest_api::RestMetrics;
95use iota_snapshot::uploader::StateSnapshotUploader;
96use iota_storage::{
97 FileCompression, IndexStore, StorageFormat,
98 http_key_value_store::HttpKVStore,
99 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
100 key_value_store_metrics::KeyValueStoreMetrics,
101};
102use iota_types::{
103 base_types::{AuthorityName, ConciseableName, EpochId},
104 committee::Committee,
105 crypto::{KeypairTraits, RandomnessRound},
106 digests::ChainIdentifier,
107 error::{IotaError, IotaResult},
108 execution_config_utils::to_binary_config,
109 iota_system_state::{
110 IotaSystemState, IotaSystemStateTrait,
111 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
112 },
113 messages_consensus::{AuthorityCapabilitiesV1, ConsensusTransaction, check_total_jwk_size},
114 quorum_driver_types::QuorumDriverEffectsQueueResult,
115 supported_protocol_versions::SupportedProtocolVersions,
116 transaction::Transaction,
117};
118use prometheus::Registry;
119#[cfg(msim)]
120pub use simulator::set_jwk_injector;
121#[cfg(msim)]
122use simulator::*;
123use tap::tap::TapFallible;
124use tokio::{
125 runtime::Handle,
126 sync::{Mutex, broadcast, mpsc, watch},
127 task::{JoinHandle, JoinSet},
128};
129use tower::ServiceBuilder;
130use tracing::{Instrument, debug, error, error_span, info, warn};
131use typed_store::{DBMetrics, rocks::default_db_options};
132
133use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
134
135pub mod admin;
136mod handle;
137pub mod metrics;
138
139pub struct ValidatorComponents {
140 validator_server_handle: JoinHandle<Result<()>>,
141 validator_server_cancel_handle: tokio::sync::oneshot::Sender<()>,
142 validator_overload_monitor_handle: Option<JoinHandle<()>>,
143 consensus_manager: ConsensusManager,
144 consensus_store_pruner: ConsensusStorePruner,
145 consensus_adapter: Arc<ConsensusAdapter>,
146 checkpoint_service_tasks: JoinSet<()>,
148 checkpoint_metrics: Arc<CheckpointMetrics>,
149 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
150 validator_registry_id: RegistryID,
151}
152
153#[cfg(msim)]
154mod simulator {
155 use std::sync::atomic::AtomicBool;
156
157 use super::*;
158
159 pub(super) struct SimState {
160 pub sim_node: iota_simulator::runtime::NodeHandle,
161 pub sim_safe_mode_expected: AtomicBool,
162 _leak_detector: iota_simulator::NodeLeakDetector,
163 }
164
165 impl Default for SimState {
166 fn default() -> Self {
167 Self {
168 sim_node: iota_simulator::runtime::NodeHandle::current(),
169 sim_safe_mode_expected: AtomicBool::new(false),
170 _leak_detector: iota_simulator::NodeLeakDetector::new(),
171 }
172 }
173 }
174
175 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
176 + Send
177 + Sync
178 + 'static;
179
180 fn default_fetch_jwks(
181 _authority: AuthorityName,
182 _provider: &OIDCProvider,
183 ) -> IotaResult<Vec<(JwkId, JWK)>> {
184 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
185 parse_jwks(
187 iota_types::zk_login_util::DEFAULT_JWK_BYTES,
188 &OIDCProvider::Twitch,
189 )
190 .map_err(|_| IotaError::JWKRetrieval)
191 }
192
193 thread_local! {
194 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
195 }
196
197 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
198 JWK_INJECTOR.with(|injector| injector.borrow().clone())
199 }
200
201 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
202 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
203 }
204}
205
206pub struct IotaNode {
207 config: NodeConfig,
208 validator_components: Mutex<Option<ValidatorComponents>>,
209 _http_server: Option<tokio::task::JoinHandle<()>>,
212 state: Arc<AuthorityState>,
213 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
214 registry_service: RegistryService,
215 metrics: Arc<IotaNodeMetrics>,
216
217 _discovery: discovery::Handle,
218 state_sync_handle: state_sync::Handle,
219 randomness_handle: randomness::Handle,
220 checkpoint_store: Arc<CheckpointStore>,
221 accumulator: Mutex<Option<Arc<StateAccumulator>>>,
222 connection_monitor_status: Arc<ConnectionMonitorStatus>,
223
224 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
226
227 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
230
231 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
232
233 #[cfg(msim)]
234 sim_state: SimState,
235
236 _state_archive_handle: Option<broadcast::Sender<()>>,
237
238 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
239 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
241
242 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
248}
249
250impl fmt::Debug for IotaNode {
251 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
252 f.debug_struct("IotaNode")
253 .field("name", &self.state.name.concise())
254 .finish()
255 }
256}
257
258static MAX_JWK_KEYS_PER_FETCH: usize = 100;
259
260impl IotaNode {
261 pub async fn start(
262 config: NodeConfig,
263 registry_service: RegistryService,
264 custom_rpc_runtime: Option<Handle>,
265 ) -> Result<Arc<IotaNode>> {
266 Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
267 }
268
269 fn start_jwk_updater(
274 config: &NodeConfig,
275 metrics: Arc<IotaNodeMetrics>,
276 authority: AuthorityName,
277 epoch_store: Arc<AuthorityPerEpochStore>,
278 consensus_adapter: Arc<ConsensusAdapter>,
279 ) {
280 let epoch = epoch_store.epoch();
281
282 let supported_providers = config
283 .zklogin_oauth_providers
284 .get(&epoch_store.get_chain_identifier().chain())
285 .unwrap_or(&BTreeSet::new())
286 .iter()
287 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
288 .collect::<Vec<_>>();
289
290 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
291
292 info!(
293 ?fetch_interval,
294 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
295 );
296
297 fn validate_jwk(
298 metrics: &Arc<IotaNodeMetrics>,
299 provider: &OIDCProvider,
300 id: &JwkId,
301 jwk: &JWK,
302 ) -> bool {
303 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
304 warn!(
305 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
306 id.iss, provider
307 );
308 metrics
309 .invalid_jwks
310 .with_label_values(&[&provider.to_string()])
311 .inc();
312 return false;
313 };
314
315 if iss_provider != *provider {
316 warn!(
317 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
318 id.iss, provider, iss_provider
319 );
320 metrics
321 .invalid_jwks
322 .with_label_values(&[&provider.to_string()])
323 .inc();
324 return false;
325 }
326
327 if !check_total_jwk_size(id, jwk) {
328 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
329 metrics
330 .invalid_jwks
331 .with_label_values(&[&provider.to_string()])
332 .inc();
333 return false;
334 }
335
336 true
337 }
338
339 for p in supported_providers.into_iter() {
348 let provider_str = p.to_string();
349 let epoch_store = epoch_store.clone();
350 let consensus_adapter = consensus_adapter.clone();
351 let metrics = metrics.clone();
352 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
353 async move {
354 let mut seen = HashSet::new();
357 loop {
358 info!("fetching JWK for provider {:?}", p);
359 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
360 match Self::fetch_jwks(authority, &p).await {
361 Err(e) => {
362 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
363 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
364 tokio::time::sleep(Duration::from_secs(30)).await;
366 continue;
367 }
368 Ok(mut keys) => {
369 metrics.total_jwks
370 .with_label_values(&[&provider_str])
371 .inc_by(keys.len() as u64);
372
373 keys.retain(|(id, jwk)| {
374 validate_jwk(&metrics, &p, id, jwk) &&
375 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
376 seen.insert((id.clone(), jwk.clone()))
377 });
378
379 metrics.unique_jwks
380 .with_label_values(&[&provider_str])
381 .inc_by(keys.len() as u64);
382
383 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
386 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
387 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
388 }
389
390 for (id, jwk) in keys.into_iter() {
391 info!("Submitting JWK to consensus: {:?}", id);
392
393 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
394 consensus_adapter.submit(txn, None, &epoch_store)
395 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
396 .ok();
397 }
398 }
399 }
400 tokio::time::sleep(fetch_interval).await;
401 }
402 }
403 .instrument(error_span!("jwk_updater_task", epoch)),
404 ));
405 }
406 }
407
408 pub async fn start_async(
409 config: NodeConfig,
410 registry_service: RegistryService,
411 custom_rpc_runtime: Option<Handle>,
412 software_version: &'static str,
413 ) -> Result<Arc<IotaNode>> {
414 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
415 let mut config = config.clone();
416 if config.supported_protocol_versions.is_none() {
417 info!(
418 "populating config.supported_protocol_versions with default {:?}",
419 SupportedProtocolVersions::SYSTEM_DEFAULT
420 );
421 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
422 }
423
424 let run_with_range = config.run_with_range;
425 let is_validator = config.consensus_config().is_some();
426 let is_full_node = !is_validator;
427 let prometheus_registry = registry_service.default_registry();
428
429 info!(node =? config.authority_public_key(),
430 "Initializing iota-node listening on {}", config.network_address
431 );
432
433 let genesis = config.genesis()?.clone();
434
435 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
436 let _ = CHAIN_IDENTIFIER.set(chain_identifier);
438 info!("IOTA chain identifier: {chain_identifier}");
439
440 DBMetrics::init(&prometheus_registry);
442
443 iota_metrics::init_metrics(&prometheus_registry);
445 #[cfg(not(msim))]
448 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
449
450 register_hardware_metrics(®istry_service, &config.db_path)
452 .expect("Failed registering hardware metrics");
453 prometheus_registry
455 .register(iota_metrics::uptime_metric(
456 if is_validator {
457 "validator"
458 } else {
459 "fullnode"
460 },
461 software_version,
462 &chain_identifier.to_string(),
463 ))
464 .expect("Failed registering uptime metric");
465
466 let migration_tx_data = if genesis.contains_migrations() {
469 Some(config.load_migration_tx_data()?)
472 } else {
473 None
474 };
475
476 let secret = Arc::pin(config.authority_key_pair().copy());
477 let genesis_committee = genesis.committee()?;
478 let committee_store = Arc::new(CommitteeStore::new(
479 config.db_path().join("epochs"),
480 &genesis_committee,
481 None,
482 ));
483
484 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
486 let perpetual_tables_options = AuthorityPerpetualTablesOptions { enable_write_stall };
487 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
488 &config.db_path().join("store"),
489 Some(perpetual_tables_options),
490 ));
491 let is_genesis = perpetual_tables
492 .database_is_empty()
493 .expect("Database read should not fail at init.");
494 let store = AuthorityStore::open(
495 perpetual_tables,
496 &genesis,
497 &config,
498 &prometheus_registry,
499 migration_tx_data.as_ref(),
500 )
501 .await?;
502
503 let cur_epoch = store.get_recovery_epoch_at_restart()?;
504 let committee = committee_store
505 .get_committee(&cur_epoch)?
506 .expect("Committee of the current epoch must exist");
507 let epoch_start_configuration = store
508 .get_epoch_start_configuration()?
509 .expect("EpochStartConfiguration of the current epoch must exist");
510 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
511 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
512
513 let cache_traits =
514 build_execution_cache(&epoch_start_configuration, &prometheus_registry, &store);
515
516 let auth_agg = {
517 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
518 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
519 Arc::new(ArcSwap::new(Arc::new(
520 AuthorityAggregator::new_from_epoch_start_state(
521 epoch_start_configuration.epoch_start_state(),
522 &committee_store,
523 safe_client_metrics_base,
524 auth_agg_metrics,
525 ),
526 )))
527 };
528
529 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
530 let epoch_store = AuthorityPerEpochStore::new(
531 config.authority_public_key(),
532 committee.clone(),
533 &config.db_path().join("store"),
534 Some(epoch_options.options),
535 EpochMetrics::new(®istry_service.default_registry()),
536 epoch_start_configuration,
537 cache_traits.backing_package_store.clone(),
538 cache_traits.object_store.clone(),
539 cache_metrics,
540 signature_verifier_metrics,
541 &config.expensive_safety_check_config,
542 ChainIdentifier::from(*genesis.checkpoint().digest()),
543 );
544
545 info!("created epoch store");
546
547 replay_log!(
548 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
549 epoch_store.epoch(),
550 epoch_store.protocol_config()
551 );
552
553 if is_genesis {
555 info!("checking IOTA conservation at genesis");
556 cache_traits
561 .reconfig_api
562 .expensive_check_iota_conservation(&epoch_store, None)
563 .expect("IOTA conservation check cannot fail at genesis");
564 }
565
566 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
567 let default_buffer_stake = epoch_store
568 .protocol_config()
569 .buffer_stake_for_protocol_upgrade_bps();
570 if effective_buffer_stake != default_buffer_stake {
571 warn!(
572 ?effective_buffer_stake,
573 ?default_buffer_stake,
574 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
575 );
576 }
577
578 info!("creating checkpoint store");
579
580 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
581 checkpoint_store.insert_genesis_checkpoint(
582 genesis.checkpoint(),
583 genesis.checkpoint_contents().clone(),
584 &epoch_store,
585 );
586
587 info!("creating state sync store");
588 let state_sync_store = RocksDbStore::new(
589 cache_traits.clone(),
590 committee_store.clone(),
591 checkpoint_store.clone(),
592 );
593
594 let index_store = if is_full_node && config.enable_index_processing {
595 info!("creating index store");
596 Some(Arc::new(IndexStore::new(
597 config.db_path().join("indexes"),
598 &prometheus_registry,
599 epoch_store
600 .protocol_config()
601 .max_move_identifier_len_as_option(),
602 config.remove_deprecated_tables,
603 )))
604 } else {
605 None
606 };
607
608 let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
609 {
610 Some(Arc::new(RestIndexStore::new(
611 config.db_path().join("rest_index"),
612 &store,
613 &checkpoint_store,
614 &epoch_store,
615 &cache_traits.backing_package_store,
616 )))
617 } else {
618 None
619 };
620
621 info!("creating archive reader");
622 let archive_readers =
627 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
628 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
629 let (randomness_tx, randomness_rx) = mpsc::channel(
630 config
631 .p2p_config
632 .randomness
633 .clone()
634 .unwrap_or_default()
635 .mailbox_capacity(),
636 );
637 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
638 Self::create_p2p_network(
639 &config,
640 state_sync_store.clone(),
641 chain_identifier,
642 trusted_peer_change_rx,
643 archive_readers.clone(),
644 randomness_tx,
645 &prometheus_registry,
646 )?;
647
648 send_trusted_peer_change(
651 &config,
652 &trusted_peer_change_tx,
653 epoch_store.epoch_start_state(),
654 );
655
656 info!("start state archival");
657 let state_archive_handle =
659 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
660 .await?;
661
662 info!("start snapshot upload");
663 let state_snapshot_handle =
665 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
666
667 info!("start db checkpoint");
669 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
670 &config,
671 &prometheus_registry,
672 state_snapshot_handle.is_some(),
673 )?;
674
675 let mut genesis_objects = genesis.objects().to_vec();
676 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
677 genesis_objects.extend(migration_tx_data.get_objects());
678 }
679
680 let authority_name = config.authority_public_key();
681 let validator_tx_finalizer =
682 config
683 .enable_validator_tx_finalizer
684 .then_some(Arc::new(ValidatorTxFinalizer::new(
685 auth_agg.clone(),
686 authority_name,
687 &prometheus_registry,
688 )));
689
690 info!("create authority state");
691 let state = AuthorityState::new(
692 authority_name,
693 secret,
694 config.supported_protocol_versions.unwrap(),
695 store.clone(),
696 cache_traits.clone(),
697 epoch_store.clone(),
698 committee_store.clone(),
699 index_store.clone(),
700 rest_index,
701 checkpoint_store.clone(),
702 &prometheus_registry,
703 &genesis_objects,
704 &db_checkpoint_config,
705 config.clone(),
706 config.indirect_objects_threshold,
707 archive_readers,
708 validator_tx_finalizer,
709 )
710 .await;
711
712 if epoch_store.epoch() == 0 {
714 let genesis_tx = &genesis.transaction();
715 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
716 Self::execute_transaction_immediately_at_zero_epoch(
718 &state,
719 &epoch_store,
720 genesis_tx,
721 span,
722 )
723 .await;
724
725 if let Some(migration_tx_data) = migration_tx_data {
727 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
728 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
729 Self::execute_transaction_immediately_at_zero_epoch(
730 &state,
731 &epoch_store,
732 tx,
733 span,
734 )
735 .await;
736 }
737 }
738 }
739
740 checkpoint_store
741 .reexecute_local_checkpoints(&state, &epoch_store)
742 .await;
743
744 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
747
748 if config
749 .expensive_safety_check_config
750 .enable_secondary_index_checks()
751 {
752 if let Some(indexes) = state.indexes.clone() {
753 iota_core::verify_indexes::verify_indexes(
754 state.get_accumulator_store().as_ref(),
755 indexes,
756 )
757 .expect("secondary indexes are inconsistent");
758 }
759 }
760
761 let (end_of_epoch_channel, end_of_epoch_receiver) =
762 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
763
764 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
765 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
766 auth_agg.load_full(),
767 state.clone(),
768 end_of_epoch_receiver,
769 &config.db_path(),
770 &prometheus_registry,
771 )))
772 } else {
773 None
774 };
775
776 let http_server = build_http_server(
777 state.clone(),
778 state_sync_store,
779 &transaction_orchestrator.clone(),
780 &config,
781 &prometheus_registry,
782 custom_rpc_runtime,
783 software_version,
784 )
785 .await?;
786
787 let accumulator = Arc::new(StateAccumulator::new(
788 cache_traits.accumulator_store.clone(),
789 StateAccumulatorMetrics::new(&prometheus_registry),
790 ));
791
792 let authority_names_to_peer_ids = epoch_store
793 .epoch_start_state()
794 .get_authority_names_to_peer_ids();
795
796 let network_connection_metrics =
797 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
798
799 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
800
801 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
802 p2p_network.downgrade(),
803 network_connection_metrics,
804 HashMap::new(),
805 None,
806 );
807
808 let connection_monitor_status = ConnectionMonitorStatus {
809 connection_statuses,
810 authority_names_to_peer_ids,
811 };
812
813 let connection_monitor_status = Arc::new(connection_monitor_status);
814 let iota_node_metrics =
815 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
816
817 let validator_components = if state.is_validator(&epoch_store) {
818 let components = Self::construct_validator_components(
819 config.clone(),
820 state.clone(),
821 committee,
822 epoch_store.clone(),
823 checkpoint_store.clone(),
824 state_sync_handle.clone(),
825 randomness_handle.clone(),
826 Arc::downgrade(&accumulator),
827 connection_monitor_status.clone(),
828 ®istry_service,
829 iota_node_metrics.clone(),
830 )
831 .await?;
832 components.consensus_adapter.submit_recovered(&epoch_store);
834
835 Some(components)
836 } else {
837 None
838 };
839
840 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
842
843 let node = Self {
844 config,
845 validator_components: Mutex::new(validator_components),
846 _http_server: http_server,
847 state,
848 transaction_orchestrator,
849 registry_service,
850 metrics: iota_node_metrics,
851
852 _discovery: discovery_handle,
853 state_sync_handle,
854 randomness_handle,
855 checkpoint_store,
856 accumulator: Mutex::new(Some(accumulator)),
857 end_of_epoch_channel,
858 connection_monitor_status,
859 trusted_peer_change_tx,
860
861 _db_checkpoint_handle: db_checkpoint_handle,
862
863 #[cfg(msim)]
864 sim_state: Default::default(),
865
866 _state_archive_handle: state_archive_handle,
867 _state_snapshot_uploader_handle: state_snapshot_handle,
868 shutdown_channel_tx: shutdown_channel,
869
870 auth_agg,
871 };
872
873 info!("IotaNode started!");
874 let node = Arc::new(node);
875 let node_copy = node.clone();
876 spawn_monitored_task!(async move {
877 let result = Self::monitor_reconfiguration(node_copy).await;
878 if let Err(error) = result {
879 warn!("Reconfiguration finished with error {:?}", error);
880 }
881 });
882
883 Ok(node)
884 }
885
886 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
887 self.end_of_epoch_channel.subscribe()
888 }
889
890 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
891 self.shutdown_channel_tx.subscribe()
892 }
893
894 pub fn current_epoch_for_testing(&self) -> EpochId {
895 self.state.current_epoch_for_testing()
896 }
897
898 pub fn db_checkpoint_path(&self) -> PathBuf {
899 self.config.db_checkpoint_path()
900 }
901
902 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
904 info!("close_epoch (current epoch = {})", epoch_store.epoch());
905 self.validator_components
906 .lock()
907 .await
908 .as_ref()
909 .ok_or_else(|| IotaError::from("Node is not a validator"))?
910 .consensus_adapter
911 .close_epoch(epoch_store);
912 Ok(())
913 }
914
915 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
916 self.state
917 .clear_override_protocol_upgrade_buffer_stake(epoch)
918 }
919
920 pub fn set_override_protocol_upgrade_buffer_stake(
921 &self,
922 epoch: EpochId,
923 buffer_stake_bps: u64,
924 ) -> IotaResult {
925 self.state
926 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
927 }
928
929 pub async fn close_epoch_for_testing(&self) -> IotaResult {
932 let epoch_store = self.state.epoch_store_for_testing();
933 self.close_epoch(&epoch_store).await
934 }
935
936 async fn start_state_archival(
937 config: &NodeConfig,
938 prometheus_registry: &Registry,
939 state_sync_store: RocksDbStore,
940 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
941 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
942 let local_store_config = ObjectStoreConfig {
943 object_store: Some(ObjectStoreType::File),
944 directory: Some(config.archive_path()),
945 ..Default::default()
946 };
947 let archive_writer = ArchiveWriter::new(
948 local_store_config,
949 remote_store_config.clone(),
950 FileCompression::Zstd,
951 StorageFormat::Blob,
952 Duration::from_secs(600),
953 256 * 1024 * 1024,
954 prometheus_registry,
955 )
956 .await?;
957 Ok(Some(archive_writer.start(state_sync_store).await?))
958 } else {
959 Ok(None)
960 }
961 }
962
963 fn start_state_snapshot(
966 config: &NodeConfig,
967 prometheus_registry: &Registry,
968 checkpoint_store: Arc<CheckpointStore>,
969 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
970 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
971 let snapshot_uploader = StateSnapshotUploader::new(
972 &config.db_checkpoint_path(),
973 &config.snapshot_path(),
974 remote_store_config.clone(),
975 60,
976 prometheus_registry,
977 checkpoint_store,
978 )?;
979 Ok(Some(snapshot_uploader.start()))
980 } else {
981 Ok(None)
982 }
983 }
984
985 fn start_db_checkpoint(
986 config: &NodeConfig,
987 prometheus_registry: &Registry,
988 state_snapshot_enabled: bool,
989 ) -> Result<(
990 DBCheckpointConfig,
991 Option<tokio::sync::broadcast::Sender<()>>,
992 )> {
993 let checkpoint_path = Some(
994 config
995 .db_checkpoint_config
996 .checkpoint_path
997 .clone()
998 .unwrap_or_else(|| config.db_checkpoint_path()),
999 );
1000 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1001 DBCheckpointConfig {
1002 checkpoint_path,
1003 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1004 true
1005 } else {
1006 config
1007 .db_checkpoint_config
1008 .perform_db_checkpoints_at_epoch_end
1009 },
1010 ..config.db_checkpoint_config.clone()
1011 }
1012 } else {
1013 config.db_checkpoint_config.clone()
1014 };
1015
1016 match (
1017 db_checkpoint_config.object_store_config.as_ref(),
1018 state_snapshot_enabled,
1019 ) {
1020 (None, false) => Ok((db_checkpoint_config, None)),
1025 (_, _) => {
1026 let handler = DBCheckpointHandler::new(
1027 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1028 db_checkpoint_config.object_store_config.as_ref(),
1029 60,
1030 db_checkpoint_config
1031 .prune_and_compact_before_upload
1032 .unwrap_or(true),
1033 config.indirect_objects_threshold,
1034 config.authority_store_pruning_config.clone(),
1035 prometheus_registry,
1036 state_snapshot_enabled,
1037 )?;
1038 Ok((
1039 db_checkpoint_config,
1040 Some(DBCheckpointHandler::start(handler)),
1041 ))
1042 }
1043 }
1044 }
1045
1046 fn create_p2p_network(
1047 config: &NodeConfig,
1048 state_sync_store: RocksDbStore,
1049 chain_identifier: ChainIdentifier,
1050 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1051 archive_readers: ArchiveReaderBalancer,
1052 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1053 prometheus_registry: &Registry,
1054 ) -> Result<(
1055 Network,
1056 discovery::Handle,
1057 state_sync::Handle,
1058 randomness::Handle,
1059 )> {
1060 let (state_sync, state_sync_server) = state_sync::Builder::new()
1061 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1062 .store(state_sync_store)
1063 .archive_readers(archive_readers)
1064 .with_metrics(prometheus_registry)
1065 .build();
1066
1067 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1068 .config(config.p2p_config.clone())
1069 .build();
1070
1071 let (randomness, randomness_router) =
1072 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1073 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1074 .with_metrics(prometheus_registry)
1075 .build();
1076
1077 let p2p_network = {
1078 let routes = anemo::Router::new()
1079 .add_rpc_service(discovery_server)
1080 .add_rpc_service(state_sync_server);
1081 let routes = routes.merge(randomness_router);
1082
1083 let inbound_network_metrics =
1084 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1085 let outbound_network_metrics =
1086 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1087
1088 let service = ServiceBuilder::new()
1089 .layer(
1090 TraceLayer::new_for_server_errors()
1091 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1092 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1093 )
1094 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1095 Arc::new(inbound_network_metrics),
1096 config.p2p_config.excessive_message_size(),
1097 )))
1098 .service(routes);
1099
1100 let outbound_layer = ServiceBuilder::new()
1101 .layer(
1102 TraceLayer::new_for_client_and_server_errors()
1103 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1104 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1105 )
1106 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1107 Arc::new(outbound_network_metrics),
1108 config.p2p_config.excessive_message_size(),
1109 )))
1110 .into_inner();
1111
1112 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1113 anemo_config.max_frame_size = Some(1 << 30);
1116
1117 let mut quic_config = anemo_config.quic.unwrap_or_default();
1120 if quic_config.socket_send_buffer_size.is_none() {
1121 quic_config.socket_send_buffer_size = Some(20 << 20);
1122 }
1123 if quic_config.socket_receive_buffer_size.is_none() {
1124 quic_config.socket_receive_buffer_size = Some(20 << 20);
1125 }
1126 quic_config.allow_failed_socket_buffer_size_setting = true;
1127
1128 if quic_config.max_concurrent_bidi_streams.is_none() {
1131 quic_config.max_concurrent_bidi_streams = Some(500);
1132 }
1133 if quic_config.max_concurrent_uni_streams.is_none() {
1134 quic_config.max_concurrent_uni_streams = Some(500);
1135 }
1136 if quic_config.stream_receive_window.is_none() {
1137 quic_config.stream_receive_window = Some(100 << 20);
1138 }
1139 if quic_config.receive_window.is_none() {
1140 quic_config.receive_window = Some(200 << 20);
1141 }
1142 if quic_config.send_window.is_none() {
1143 quic_config.send_window = Some(200 << 20);
1144 }
1145 if quic_config.crypto_buffer_size.is_none() {
1146 quic_config.crypto_buffer_size = Some(1 << 20);
1147 }
1148 if quic_config.max_idle_timeout_ms.is_none() {
1149 quic_config.max_idle_timeout_ms = Some(30_000);
1150 }
1151 if quic_config.keep_alive_interval_ms.is_none() {
1152 quic_config.keep_alive_interval_ms = Some(5_000);
1153 }
1154 anemo_config.quic = Some(quic_config);
1155
1156 let server_name = format!("iota-{}", chain_identifier);
1157 let network = Network::bind(config.p2p_config.listen_address)
1158 .server_name(&server_name)
1159 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1160 .config(anemo_config)
1161 .outbound_request_layer(outbound_layer)
1162 .start(service)?;
1163 info!(
1164 server_name = server_name,
1165 "P2p network started on {}",
1166 network.local_addr()
1167 );
1168
1169 network
1170 };
1171
1172 let discovery_handle = discovery.start(p2p_network.clone());
1173 let state_sync_handle = state_sync.start(p2p_network.clone());
1174 let randomness_handle = randomness.start(p2p_network.clone());
1175
1176 Ok((
1177 p2p_network,
1178 discovery_handle,
1179 state_sync_handle,
1180 randomness_handle,
1181 ))
1182 }
1183
1184 async fn construct_validator_components(
1187 config: NodeConfig,
1188 state: Arc<AuthorityState>,
1189 committee: Arc<Committee>,
1190 epoch_store: Arc<AuthorityPerEpochStore>,
1191 checkpoint_store: Arc<CheckpointStore>,
1192 state_sync_handle: state_sync::Handle,
1193 randomness_handle: randomness::Handle,
1194 accumulator: Weak<StateAccumulator>,
1195 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1196 registry_service: &RegistryService,
1197 iota_node_metrics: Arc<IotaNodeMetrics>,
1198 ) -> Result<ValidatorComponents> {
1199 let mut config_clone = config.clone();
1200 let consensus_config = config_clone
1201 .consensus_config
1202 .as_mut()
1203 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1204 let validator_registry = Registry::new();
1205 let validator_registry_id = registry_service.add(validator_registry.clone());
1206
1207 let client = Arc::new(UpdatableConsensusClient::new());
1208 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1209 &committee,
1210 consensus_config,
1211 state.name,
1212 connection_monitor_status.clone(),
1213 &validator_registry,
1214 client.clone(),
1215 ));
1216 let consensus_manager = ConsensusManager::new(
1217 &config,
1218 consensus_config,
1219 registry_service,
1220 &validator_registry,
1221 client,
1222 );
1223
1224 let consensus_store_pruner = ConsensusStorePruner::new(
1227 consensus_manager.get_storage_base_path(),
1228 consensus_config.db_retention_epochs(),
1229 consensus_config.db_pruner_period(),
1230 &validator_registry,
1231 );
1232
1233 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1234 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1235
1236 let (validator_server_handle, validator_server_cancel_handle) =
1237 Self::start_grpc_validator_service(
1238 &config,
1239 state.clone(),
1240 consensus_adapter.clone(),
1241 &validator_registry,
1242 )
1243 .await?;
1244
1245 let validator_overload_monitor_handle = if config
1248 .authority_overload_config
1249 .max_load_shedding_percentage
1250 > 0
1251 {
1252 let authority_state = Arc::downgrade(&state);
1253 let overload_config = config.authority_overload_config.clone();
1254 fail_point!("starting_overload_monitor");
1255 Some(spawn_monitored_task!(overload_monitor(
1256 authority_state,
1257 overload_config,
1258 )))
1259 } else {
1260 None
1261 };
1262
1263 Self::start_epoch_specific_validator_components(
1264 &config,
1265 state.clone(),
1266 consensus_adapter,
1267 checkpoint_store,
1268 epoch_store,
1269 state_sync_handle,
1270 randomness_handle,
1271 consensus_manager,
1272 consensus_store_pruner,
1273 accumulator,
1274 validator_server_handle,
1275 validator_server_cancel_handle,
1276 validator_overload_monitor_handle,
1277 checkpoint_metrics,
1278 iota_node_metrics,
1279 iota_tx_validator_metrics,
1280 validator_registry_id,
1281 )
1282 .await
1283 }
1284
1285 async fn start_epoch_specific_validator_components(
1288 config: &NodeConfig,
1289 state: Arc<AuthorityState>,
1290 consensus_adapter: Arc<ConsensusAdapter>,
1291 checkpoint_store: Arc<CheckpointStore>,
1292 epoch_store: Arc<AuthorityPerEpochStore>,
1293 state_sync_handle: state_sync::Handle,
1294 randomness_handle: randomness::Handle,
1295 consensus_manager: ConsensusManager,
1296 consensus_store_pruner: ConsensusStorePruner,
1297 accumulator: Weak<StateAccumulator>,
1298 validator_server_handle: JoinHandle<Result<()>>,
1299 validator_server_cancel_handle: tokio::sync::oneshot::Sender<()>,
1300 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1301 checkpoint_metrics: Arc<CheckpointMetrics>,
1302 iota_node_metrics: Arc<IotaNodeMetrics>,
1303 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1304 validator_registry_id: RegistryID,
1305 ) -> Result<ValidatorComponents> {
1306 let checkpoint_service = Self::start_checkpoint_service(
1307 config,
1308 consensus_adapter.clone(),
1309 checkpoint_store,
1310 epoch_store.clone(),
1311 state.clone(),
1312 state_sync_handle,
1313 accumulator,
1314 checkpoint_metrics.clone(),
1315 );
1316
1317 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1322
1323 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1324
1325 let randomness_manager = RandomnessManager::try_new(
1326 Arc::downgrade(&epoch_store),
1327 Box::new(consensus_adapter.clone()),
1328 randomness_handle,
1329 config.authority_key_pair(),
1330 )
1331 .await;
1332 if let Some(randomness_manager) = randomness_manager {
1333 epoch_store
1334 .set_randomness_manager(randomness_manager)
1335 .await?;
1336 }
1337
1338 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1339 state.clone(),
1340 checkpoint_service.clone(),
1341 epoch_store.clone(),
1342 low_scoring_authorities,
1343 );
1344
1345 consensus_manager
1346 .start(
1347 config,
1348 epoch_store.clone(),
1349 consensus_handler_initializer,
1350 IotaTxValidator::new(
1351 epoch_store.clone(),
1352 checkpoint_service.clone(),
1353 state.transaction_manager().clone(),
1354 iota_tx_validator_metrics.clone(),
1355 ),
1356 )
1357 .await;
1358
1359 let checkpoint_service_tasks = checkpoint_service.spawn().await;
1360
1361 if epoch_store.authenticator_state_enabled() {
1362 Self::start_jwk_updater(
1363 config,
1364 iota_node_metrics,
1365 state.name,
1366 epoch_store.clone(),
1367 consensus_adapter.clone(),
1368 );
1369 }
1370
1371 Ok(ValidatorComponents {
1372 validator_server_handle,
1373 validator_server_cancel_handle,
1374 validator_overload_monitor_handle,
1375 consensus_manager,
1376 consensus_store_pruner,
1377 consensus_adapter,
1378 checkpoint_service_tasks,
1379 checkpoint_metrics,
1380 iota_tx_validator_metrics,
1381 validator_registry_id,
1382 })
1383 }
1384
1385 fn start_checkpoint_service(
1392 config: &NodeConfig,
1393 consensus_adapter: Arc<ConsensusAdapter>,
1394 checkpoint_store: Arc<CheckpointStore>,
1395 epoch_store: Arc<AuthorityPerEpochStore>,
1396 state: Arc<AuthorityState>,
1397 state_sync_handle: state_sync::Handle,
1398 accumulator: Weak<StateAccumulator>,
1399 checkpoint_metrics: Arc<CheckpointMetrics>,
1400 ) -> Arc<CheckpointService> {
1401 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1402 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1403
1404 debug!(
1405 "Starting checkpoint service with epoch start timestamp {}
1406 and epoch duration {}",
1407 epoch_start_timestamp_ms, epoch_duration_ms
1408 );
1409
1410 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1411 sender: consensus_adapter,
1412 signer: state.secret.clone(),
1413 authority: config.authority_public_key(),
1414 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1415 .checked_add(epoch_duration_ms)
1416 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1417 metrics: checkpoint_metrics.clone(),
1418 });
1419
1420 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1421 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1422 let max_checkpoint_size_bytes =
1423 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1424
1425 CheckpointService::build(
1426 state.clone(),
1427 checkpoint_store,
1428 epoch_store,
1429 state.get_transaction_cache_reader().clone(),
1430 accumulator,
1431 checkpoint_output,
1432 Box::new(certified_checkpoint_output),
1433 checkpoint_metrics,
1434 max_tx_per_checkpoint,
1435 max_checkpoint_size_bytes,
1436 )
1437 }
1438
1439 fn construct_consensus_adapter(
1440 committee: &Committee,
1441 consensus_config: &ConsensusConfig,
1442 authority: AuthorityName,
1443 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1444 prometheus_registry: &Registry,
1445 consensus_client: Arc<dyn ConsensusClient>,
1446 ) -> ConsensusAdapter {
1447 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1448 ConsensusAdapter::new(
1452 consensus_client,
1453 authority,
1454 connection_monitor_status,
1455 consensus_config.max_pending_transactions(),
1456 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1457 consensus_config.max_submit_position,
1458 consensus_config.submit_delay_step_override(),
1459 ca_metrics,
1460 )
1461 }
1462
1463 async fn start_grpc_validator_service(
1464 config: &NodeConfig,
1465 state: Arc<AuthorityState>,
1466 consensus_adapter: Arc<ConsensusAdapter>,
1467 prometheus_registry: &Registry,
1468 ) -> Result<(
1469 tokio::task::JoinHandle<Result<()>>,
1470 tokio::sync::oneshot::Sender<()>,
1471 )> {
1472 let validator_service = ValidatorService::new(
1473 state.clone(),
1474 consensus_adapter,
1475 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1476 TrafficControllerMetrics::new(prometheus_registry),
1477 config.policy_config.clone(),
1478 config.firewall_config.clone(),
1479 );
1480
1481 let mut server_conf = iota_network_stack::config::Config::new();
1482 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1483 server_conf.load_shed = config.grpc_load_shed;
1484 let mut server_builder =
1485 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1486
1487 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1488
1489 let mut server = server_builder
1490 .bind(config.network_address())
1491 .await
1492 .map_err(|err| anyhow!(err.to_string()))?;
1493 let cancel_handle = server
1494 .take_cancel_handle()
1495 .expect("GRPC server should still have a cancel handle");
1496 let local_addr = server.local_addr();
1497 info!("Listening to traffic on {local_addr}");
1498 let grpc_server = spawn_monitored_task!(server.serve().map_err(Into::into));
1499
1500 Ok((grpc_server, cancel_handle))
1501 }
1502
1503 pub fn state(&self) -> Arc<AuthorityState> {
1504 self.state.clone()
1505 }
1506
1507 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1509 self.state.reference_gas_price_for_testing()
1510 }
1511
1512 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1513 self.state.committee_store().clone()
1514 }
1515
1516 pub fn clone_authority_aggregator(
1526 &self,
1527 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1528 self.transaction_orchestrator
1529 .as_ref()
1530 .map(|to| to.clone_authority_aggregator())
1531 }
1532
1533 pub fn transaction_orchestrator(
1534 &self,
1535 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1536 self.transaction_orchestrator.clone()
1537 }
1538
1539 pub fn subscribe_to_transaction_orchestrator_effects(
1540 &self,
1541 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1542 self.transaction_orchestrator
1543 .as_ref()
1544 .map(|to| to.subscribe_to_effects_queue())
1545 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1546 }
1547
1548 pub async fn monitor_reconfiguration(self: Arc<Self>) -> Result<()> {
1554 let checkpoint_executor_metrics =
1555 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1556
1557 loop {
1558 let mut accumulator_guard = self.accumulator.lock().await;
1559 let accumulator = accumulator_guard.take().unwrap();
1560 let mut checkpoint_executor = CheckpointExecutor::new(
1561 self.state_sync_handle.subscribe_to_synced_checkpoints(),
1562 self.checkpoint_store.clone(),
1563 self.state.clone(),
1564 accumulator.clone(),
1565 self.config.checkpoint_executor_config.clone(),
1566 checkpoint_executor_metrics.clone(),
1567 );
1568
1569 let run_with_range = self.config.run_with_range;
1570
1571 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1572
1573 if let Some(components) = &*self.validator_components.lock().await {
1575 tokio::time::sleep(Duration::from_millis(1)).await;
1577
1578 let config = cur_epoch_store.protocol_config();
1579 let binary_config = to_binary_config(config);
1580 let transaction = ConsensusTransaction::new_capability_notification_v1(
1581 AuthorityCapabilitiesV1::new(
1582 self.state.name,
1583 cur_epoch_store.get_chain_identifier().chain(),
1584 self.config
1585 .supported_protocol_versions
1586 .expect("Supported versions should be populated")
1587 .truncate_below(config.version),
1589 self.state
1590 .get_available_system_packages(&binary_config)
1591 .await,
1592 ),
1593 );
1594 info!(?transaction, "submitting capabilities to consensus");
1595 components
1596 .consensus_adapter
1597 .submit(transaction, None, &cur_epoch_store)?;
1598 }
1599
1600 let stop_condition = checkpoint_executor
1601 .run_epoch(cur_epoch_store.clone(), run_with_range)
1602 .await;
1603 drop(checkpoint_executor);
1604
1605 if stop_condition == StopReason::RunWithRangeCondition {
1606 IotaNode::shutdown(&self).await;
1607 self.shutdown_channel_tx
1608 .send(run_with_range)
1609 .expect("RunWithRangeCondition met but failed to send shutdown message");
1610 return Ok(());
1611 }
1612
1613 let latest_system_state = self
1615 .state
1616 .get_object_cache_reader()
1617 .get_iota_system_state_object_unsafe()
1618 .expect("Read IOTA System State object cannot fail");
1619
1620 #[cfg(msim)]
1621 if !self
1622 .sim_state
1623 .sim_safe_mode_expected
1624 .load(Ordering::Relaxed)
1625 {
1626 debug_assert!(!latest_system_state.safe_mode());
1627 }
1628
1629 #[cfg(not(msim))]
1630 debug_assert!(!latest_system_state.safe_mode());
1631
1632 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1633 if self.state.is_fullnode(&cur_epoch_store) {
1634 warn!(
1635 "Failed to send end of epoch notification to subscriber: {:?}",
1636 err
1637 );
1638 }
1639 }
1640
1641 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1642 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1643
1644 self.auth_agg.store(Arc::new(
1645 self.auth_agg
1646 .load()
1647 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1648 ));
1649
1650 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1651 let next_epoch = next_epoch_committee.epoch();
1652 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1653
1654 info!(
1655 next_epoch,
1656 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1657 );
1658
1659 fail_point_async!("reconfig_delay");
1660
1661 let authority_names_to_peer_ids =
1666 new_epoch_start_state.get_authority_names_to_peer_ids();
1667 self.connection_monitor_status
1668 .update_mapping_for_epoch(authority_names_to_peer_ids);
1669
1670 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1671
1672 send_trusted_peer_change(
1673 &self.config,
1674 &self.trusted_peer_change_tx,
1675 &new_epoch_start_state,
1676 );
1677
1678 let new_validator_components = if let Some(ValidatorComponents {
1683 validator_server_handle,
1684 validator_server_cancel_handle,
1685 validator_overload_monitor_handle,
1686 consensus_manager,
1687 consensus_store_pruner,
1688 consensus_adapter,
1689 mut checkpoint_service_tasks,
1690 checkpoint_metrics,
1691 iota_tx_validator_metrics,
1692 validator_registry_id,
1693 }) = self.validator_components.lock().await.take()
1694 {
1695 info!("Reconfiguring the validator.");
1696 checkpoint_service_tasks.abort_all();
1701 while let Some(result) = checkpoint_service_tasks.join_next().await {
1702 if let Err(err) = result {
1703 if err.is_panic() {
1704 std::panic::resume_unwind(err.into_panic());
1705 }
1706 warn!("Error in checkpoint service task: {:?}", err);
1707 }
1708 }
1709 info!("Checkpoint service has shut down.");
1710
1711 consensus_manager.shutdown().await;
1712 info!("Consensus has shut down.");
1713
1714 let new_epoch_store = self
1715 .reconfigure_state(
1716 &self.state,
1717 &cur_epoch_store,
1718 next_epoch_committee.clone(),
1719 new_epoch_start_state,
1720 accumulator.clone(),
1721 )
1722 .await?;
1723 info!("Epoch store finished reconfiguration.");
1724
1725 let accumulator_metrics = Arc::into_inner(accumulator)
1728 .expect("Accumulator should have no other references at this point")
1729 .metrics();
1730 let new_accumulator = Arc::new(StateAccumulator::new(
1731 self.state.get_accumulator_store().clone(),
1732 accumulator_metrics,
1733 ));
1734 let weak_accumulator = Arc::downgrade(&new_accumulator);
1735 *accumulator_guard = Some(new_accumulator);
1736
1737 consensus_store_pruner.prune(next_epoch).await;
1738
1739 if self.state.is_validator(&new_epoch_store) {
1740 Some(
1742 Self::start_epoch_specific_validator_components(
1743 &self.config,
1744 self.state.clone(),
1745 consensus_adapter,
1746 self.checkpoint_store.clone(),
1747 new_epoch_store.clone(),
1748 self.state_sync_handle.clone(),
1749 self.randomness_handle.clone(),
1750 consensus_manager,
1751 consensus_store_pruner,
1752 weak_accumulator,
1753 validator_server_handle,
1754 validator_server_cancel_handle,
1755 validator_overload_monitor_handle,
1756 checkpoint_metrics,
1757 self.metrics.clone(),
1758 iota_tx_validator_metrics,
1759 validator_registry_id,
1760 )
1761 .await?,
1762 )
1763 } else {
1764 info!("This node is no longer a validator after reconfiguration");
1765 if self.registry_service.remove(validator_registry_id) {
1766 debug!("Removed validator metrics registry");
1767 } else {
1768 warn!("Failed to remove validator metrics registry");
1769 }
1770 if validator_server_cancel_handle.send(()).is_ok() {
1771 debug!("Validator grpc server cancelled");
1772 } else {
1773 warn!("Failed to cancel validator grpc server");
1774 }
1775
1776 None
1777 }
1778 } else {
1779 let new_epoch_store = self
1780 .reconfigure_state(
1781 &self.state,
1782 &cur_epoch_store,
1783 next_epoch_committee.clone(),
1784 new_epoch_start_state,
1785 accumulator.clone(),
1786 )
1787 .await?;
1788
1789 let accumulator_metrics = Arc::into_inner(accumulator)
1792 .expect("Accumulator should have no other references at this point")
1793 .metrics();
1794 let new_accumulator = Arc::new(StateAccumulator::new(
1795 self.state.get_accumulator_store().clone(),
1796 accumulator_metrics,
1797 ));
1798 let weak_accumulator = Arc::downgrade(&new_accumulator);
1799 *accumulator_guard = Some(new_accumulator);
1800
1801 if self.state.is_validator(&new_epoch_store) {
1802 info!("Promoting the node from fullnode to validator, starting grpc server");
1803
1804 Some(
1805 Self::construct_validator_components(
1806 self.config.clone(),
1807 self.state.clone(),
1808 Arc::new(next_epoch_committee.clone()),
1809 new_epoch_store.clone(),
1810 self.checkpoint_store.clone(),
1811 self.state_sync_handle.clone(),
1812 self.randomness_handle.clone(),
1813 weak_accumulator,
1814 self.connection_monitor_status.clone(),
1815 &self.registry_service,
1816 self.metrics.clone(),
1817 )
1818 .await?,
1819 )
1820 } else {
1821 None
1822 }
1823 };
1824 *self.validator_components.lock().await = new_validator_components;
1825
1826 cur_epoch_store.release_db_handles();
1829
1830 if cfg!(msim)
1831 && !matches!(
1832 self.config
1833 .authority_store_pruning_config
1834 .num_epochs_to_retain_for_checkpoints(),
1835 None | Some(u64::MAX) | Some(0)
1836 )
1837 {
1838 self.state
1839 .prune_checkpoints_for_eligible_epochs_for_testing(
1840 self.config.clone(),
1841 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1842 )
1843 .await?;
1844 }
1845
1846 info!("Reconfiguration finished");
1847 }
1848 }
1849
1850 async fn shutdown(&self) {
1851 if let Some(validator_components) = &*self.validator_components.lock().await {
1852 validator_components.consensus_manager.shutdown().await;
1853 }
1854 }
1855
1856 async fn reconfigure_state(
1859 &self,
1860 state: &Arc<AuthorityState>,
1861 cur_epoch_store: &AuthorityPerEpochStore,
1862 next_epoch_committee: Committee,
1863 next_epoch_start_system_state: EpochStartSystemState,
1864 accumulator: Arc<StateAccumulator>,
1865 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
1866 let next_epoch = next_epoch_committee.epoch();
1867
1868 let last_checkpoint = self
1869 .checkpoint_store
1870 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1871 .expect("Error loading last checkpoint for current epoch")
1872 .expect("Could not load last checkpoint for current epoch");
1873 let epoch_supply_change = last_checkpoint
1874 .end_of_epoch_data
1875 .as_ref()
1876 .ok_or_else(|| {
1877 IotaError::from("last checkpoint in epoch should contain end of epoch data")
1878 })?
1879 .epoch_supply_change;
1880
1881 let epoch_start_configuration = EpochStartConfiguration::new(
1882 next_epoch_start_system_state,
1883 *last_checkpoint.digest(),
1884 state.get_object_store().as_ref(),
1885 EpochFlag::default_flags_for_new_epoch(&state.config),
1886 )
1887 .expect("EpochStartConfiguration construction cannot fail");
1888
1889 let new_epoch_store = self
1890 .state
1891 .reconfigure(
1892 cur_epoch_store,
1893 self.config.supported_protocol_versions.unwrap(),
1894 next_epoch_committee,
1895 epoch_start_configuration,
1896 accumulator,
1897 &self.config.expensive_safety_check_config,
1898 epoch_supply_change,
1899 )
1900 .await
1901 .expect("Reconfigure authority state cannot fail");
1902 info!(next_epoch, "Node State has been reconfigured");
1903 assert_eq!(next_epoch, new_epoch_store.epoch());
1904 self.state.get_reconfig_api().update_epoch_flags_metrics(
1905 cur_epoch_store.epoch_start_config().flags(),
1906 new_epoch_store.epoch_start_config().flags(),
1907 );
1908
1909 Ok(new_epoch_store)
1910 }
1911
1912 pub fn get_config(&self) -> &NodeConfig {
1913 &self.config
1914 }
1915
1916 async fn execute_transaction_immediately_at_zero_epoch(
1917 state: &Arc<AuthorityState>,
1918 epoch_store: &Arc<AuthorityPerEpochStore>,
1919 tx: &Transaction,
1920 span: tracing::Span,
1921 ) {
1922 let transaction =
1923 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
1924 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
1925 tx.data().clone(),
1926 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
1927 ),
1928 );
1929 state
1930 .try_execute_immediately(&transaction, None, epoch_store)
1931 .instrument(span)
1932 .await
1933 .unwrap();
1934 }
1935
1936 pub fn randomness_handle(&self) -> randomness::Handle {
1937 self.randomness_handle.clone()
1938 }
1939}
1940
1941#[cfg(not(msim))]
1942impl IotaNode {
1943 async fn fetch_jwks(
1944 _authority: AuthorityName,
1945 provider: &OIDCProvider,
1946 ) -> IotaResult<Vec<(JwkId, JWK)>> {
1947 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
1948 let client = reqwest::Client::new();
1949 fetch_jwks(provider, &client)
1950 .await
1951 .map_err(|_| IotaError::JWKRetrieval)
1952 }
1953}
1954
1955#[cfg(msim)]
1956impl IotaNode {
1957 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
1958 self.sim_state.sim_node.id()
1959 }
1960
1961 pub fn set_safe_mode_expected(&self, new_value: bool) {
1962 info!("Setting safe mode expected to {}", new_value);
1963 self.sim_state
1964 .sim_safe_mode_expected
1965 .store(new_value, Ordering::Relaxed);
1966 }
1967
1968 async fn fetch_jwks(
1969 authority: AuthorityName,
1970 provider: &OIDCProvider,
1971 ) -> IotaResult<Vec<(JwkId, JWK)>> {
1972 get_jwk_injector()(authority, provider)
1973 }
1974}
1975
1976fn send_trusted_peer_change(
1979 config: &NodeConfig,
1980 sender: &watch::Sender<TrustedPeerChangeEvent>,
1981 new_epoch_start_state: &EpochStartSystemState,
1982) {
1983 let new_committee =
1984 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
1985
1986 sender.send_modify(|event| {
1987 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
1988 event.new_committee = new_committee;
1989 })
1990}
1991
1992fn build_kv_store(
1993 state: &Arc<AuthorityState>,
1994 config: &NodeConfig,
1995 registry: &Registry,
1996) -> Result<Arc<TransactionKeyValueStore>> {
1997 let metrics = KeyValueStoreMetrics::new(registry);
1998 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
1999
2000 let base_url = &config.transaction_kv_store_read_config.base_url;
2001
2002 if base_url.is_empty() {
2003 info!("no http kv store url provided, using local db only");
2004 return Ok(Arc::new(db_store));
2005 }
2006
2007 base_url.parse::<url::Url>().tap_err(|e| {
2008 error!(
2009 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2010 base_url, e
2011 )
2012 })?;
2013
2014 let http_store = HttpKVStore::new_kv(
2015 base_url,
2016 config.transaction_kv_store_read_config.cache_size,
2017 metrics.clone(),
2018 )?;
2019 info!("using local key-value store with fallback to http key-value store");
2020 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2021 db_store,
2022 http_store,
2023 metrics,
2024 "json_rpc_fallback",
2025 )))
2026}
2027
2028pub async fn build_http_server(
2045 state: Arc<AuthorityState>,
2046 store: RocksDbStore,
2047 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2048 config: &NodeConfig,
2049 prometheus_registry: &Registry,
2050 _custom_runtime: Option<Handle>,
2051 software_version: &'static str,
2052) -> Result<Option<tokio::task::JoinHandle<()>>> {
2053 if config.consensus_config().is_some() {
2055 return Ok(None);
2056 }
2057
2058 let mut router = axum::Router::new();
2059
2060 let json_rpc_router = {
2061 let mut server = JsonRpcServerBuilder::new(
2062 env!("CARGO_PKG_VERSION"),
2063 prometheus_registry,
2064 config.policy_config.clone(),
2065 config.firewall_config.clone(),
2066 );
2067
2068 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2069
2070 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2071 server.register_module(ReadApi::new(
2072 state.clone(),
2073 kv_store.clone(),
2074 metrics.clone(),
2075 ))?;
2076 server.register_module(CoinReadApi::new(
2077 state.clone(),
2078 kv_store.clone(),
2079 metrics.clone(),
2080 )?)?;
2081
2082 if config.run_with_range.is_none() {
2085 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2086 }
2087 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2088 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2089
2090 if let Some(transaction_orchestrator) = transaction_orchestrator {
2091 server.register_module(TransactionExecutionApi::new(
2092 state.clone(),
2093 transaction_orchestrator.clone(),
2094 metrics.clone(),
2095 ))?;
2096 }
2097
2098 let iota_names_config = config.iota_names_config.clone().unwrap_or_default();
2101
2102 server.register_module(IndexerApi::new(
2103 state.clone(),
2104 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2105 kv_store,
2106 metrics,
2107 iota_names_config,
2108 config.indexer_max_subscriptions,
2109 ))?;
2110 server.register_module(MoveUtils::new(state.clone()))?;
2111
2112 let server_type = config.jsonrpc_server_type();
2113
2114 server.to_router(server_type).await?
2115 };
2116
2117 router = router.merge(json_rpc_router);
2118
2119 if config.enable_rest_api {
2120 let mut rest_service = iota_rest_api::RestService::new(
2121 Arc::new(RestReadStore::new(state, store)),
2122 software_version,
2123 );
2124
2125 rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2126
2127 if let Some(transaction_orchestrator) = transaction_orchestrator {
2128 rest_service.with_executor(transaction_orchestrator.clone())
2129 }
2130
2131 router = router.merge(rest_service.into_router());
2132 }
2133
2134 let listener = tokio::net::TcpListener::bind(&config.json_rpc_address)
2135 .await
2136 .unwrap();
2137 let addr = listener.local_addr().unwrap();
2138
2139 router = router.layer(axum::middleware::from_fn(server_timing_middleware));
2140
2141 let handle = tokio::spawn(async move {
2142 axum::serve(
2143 listener,
2144 router.into_make_service_with_connect_info::<SocketAddr>(),
2145 )
2146 .await
2147 .unwrap()
2148 });
2149
2150 info!(local_addr =? addr, "IOTA JSON-RPC server listening on {addr}");
2151
2152 Ok(Some(handle))
2153}
2154
2155#[cfg(not(test))]
2156fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2157 protocol_config.max_transactions_per_checkpoint() as usize
2158}
2159
2160#[cfg(test)]
2161fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2162 2
2163}