1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::{BTreeSet, HashMap, HashSet},
9 fmt,
10 net::SocketAddr,
11 path::PathBuf,
12 str::FromStr,
13 sync::{Arc, Weak},
14 time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19 callback::CallbackLayer,
20 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::TryFutureExt;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_config::{
29 ConsensusConfig, NodeConfig,
30 node::{DBCheckpointConfig, RunWithRange},
31 node_config_metrics::NodeConfigMetrics,
32 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
33};
34use iota_core::{
35 authority::{
36 AuthorityState, AuthorityStore, CHAIN_IDENTIFIER, RandomnessRoundReceiver,
37 authority_per_epoch_store::AuthorityPerEpochStore,
38 authority_store_tables::AuthorityPerpetualTables,
39 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
40 },
41 authority_aggregator::{AuthAggMetrics, AuthorityAggregator},
42 authority_client::NetworkAuthorityClient,
43 authority_server::{ValidatorService, ValidatorServiceMetrics},
44 checkpoints::{
45 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
46 SubmitCheckpointToConsensus,
47 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
48 },
49 connection_monitor::ConnectionMonitor,
50 consensus_adapter::{
51 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
52 ConsensusClient,
53 },
54 consensus_handler::ConsensusHandlerInitializer,
55 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
56 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
57 db_checkpoint_handler::DBCheckpointHandler,
58 epoch::{
59 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
60 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
61 reconfiguration::ReconfigurationInitiator,
62 },
63 execution_cache::build_execution_cache,
64 module_cache_metrics::ResolverMetrics,
65 overload_monitor::overload_monitor,
66 rest_index::RestIndexStore,
67 safe_client::SafeClientMetricsBase,
68 signature_verifier::SignatureVerifierMetrics,
69 state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
70 storage::{RestReadStore, RocksDbStore},
71 traffic_controller::metrics::TrafficControllerMetrics,
72 transaction_orchestrator::TransactionOrchestrator,
73 validator_tx_finalizer::ValidatorTxFinalizer,
74};
75use iota_json_rpc::{
76 JsonRpcServerBuilder, bridge_api::BridgeReadApi, coin_api::CoinReadApi,
77 governance_api::GovernanceReadApi, indexer_api::IndexerApi, move_utils::MoveUtils,
78 read_api::ReadApi, transaction_builder_api::TransactionBuilderApi,
79 transaction_execution_api::TransactionExecutionApi,
80};
81use iota_json_rpc_api::JsonRpcMetrics;
82use iota_macros::{fail_point, fail_point_async, replay_log};
83use iota_metrics::{
84 RegistryService,
85 hardware_metrics::register_hardware_metrics,
86 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
87 server_timing_middleware, spawn_monitored_task,
88};
89use iota_network::{
90 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
91};
92use iota_network_stack::server::ServerBuilder;
93use iota_protocol_config::ProtocolConfig;
94use iota_rest_api::RestMetrics;
95use iota_snapshot::uploader::StateSnapshotUploader;
96use iota_storage::{
97 FileCompression, IndexStore, StorageFormat,
98 http_key_value_store::HttpKVStore,
99 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
100 key_value_store_metrics::KeyValueStoreMetrics,
101};
102use iota_types::{
103 base_types::{AuthorityName, ConciseableName, EpochId},
104 committee::Committee,
105 crypto::{KeypairTraits, RandomnessRound},
106 digests::ChainIdentifier,
107 error::{IotaError, IotaResult},
108 execution_config_utils::to_binary_config,
109 iota_system_state::{
110 IotaSystemState, IotaSystemStateTrait,
111 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
112 },
113 messages_consensus::{AuthorityCapabilitiesV1, ConsensusTransaction, check_total_jwk_size},
114 quorum_driver_types::QuorumDriverEffectsQueueResult,
115 supported_protocol_versions::SupportedProtocolVersions,
116 transaction::Transaction,
117};
118use prometheus::Registry;
119#[cfg(msim)]
120pub use simulator::set_jwk_injector;
121#[cfg(msim)]
122use simulator::*;
123use tap::tap::TapFallible;
124use tokio::{
125 runtime::Handle,
126 sync::{Mutex, broadcast, mpsc, watch},
127 task::{JoinHandle, JoinSet},
128};
129use tower::ServiceBuilder;
130use tracing::{Instrument, debug, error, error_span, info, warn};
131use typed_store::{DBMetrics, rocks::default_db_options};
132
133use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
134
135pub mod admin;
136mod handle;
137pub mod metrics;
138
139pub struct ValidatorComponents {
140 validator_server_handle: JoinHandle<Result<()>>,
141 validator_overload_monitor_handle: Option<JoinHandle<()>>,
142 consensus_manager: ConsensusManager,
143 consensus_store_pruner: ConsensusStorePruner,
144 consensus_adapter: Arc<ConsensusAdapter>,
145 checkpoint_service_tasks: JoinSet<()>,
147 checkpoint_metrics: Arc<CheckpointMetrics>,
148 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
149}
150
151#[cfg(msim)]
152mod simulator {
153 use std::sync::atomic::AtomicBool;
154
155 use super::*;
156
157 pub(super) struct SimState {
158 pub sim_node: iota_simulator::runtime::NodeHandle,
159 pub sim_safe_mode_expected: AtomicBool,
160 _leak_detector: iota_simulator::NodeLeakDetector,
161 }
162
163 impl Default for SimState {
164 fn default() -> Self {
165 Self {
166 sim_node: iota_simulator::runtime::NodeHandle::current(),
167 sim_safe_mode_expected: AtomicBool::new(false),
168 _leak_detector: iota_simulator::NodeLeakDetector::new(),
169 }
170 }
171 }
172
173 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
174 + Send
175 + Sync
176 + 'static;
177
178 fn default_fetch_jwks(
179 _authority: AuthorityName,
180 _provider: &OIDCProvider,
181 ) -> IotaResult<Vec<(JwkId, JWK)>> {
182 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
183 parse_jwks(
185 iota_types::zk_login_util::DEFAULT_JWK_BYTES,
186 &OIDCProvider::Twitch,
187 )
188 .map_err(|_| IotaError::JWKRetrieval)
189 }
190
191 thread_local! {
192 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
193 }
194
195 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
196 JWK_INJECTOR.with(|injector| injector.borrow().clone())
197 }
198
199 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
200 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
201 }
202}
203
204pub struct IotaNode {
205 config: NodeConfig,
206 validator_components: Mutex<Option<ValidatorComponents>>,
207 _http_server: Option<tokio::task::JoinHandle<()>>,
210 state: Arc<AuthorityState>,
211 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
212 registry_service: RegistryService,
213 metrics: Arc<IotaNodeMetrics>,
214
215 _discovery: discovery::Handle,
216 state_sync_handle: state_sync::Handle,
217 randomness_handle: randomness::Handle,
218 checkpoint_store: Arc<CheckpointStore>,
219 accumulator: Mutex<Option<Arc<StateAccumulator>>>,
220 connection_monitor_status: Arc<ConnectionMonitorStatus>,
221
222 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
224
225 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
228
229 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
230
231 #[cfg(msim)]
232 sim_state: SimState,
233
234 _state_archive_handle: Option<broadcast::Sender<()>>,
235
236 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
237 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
239
240 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
246}
247
248impl fmt::Debug for IotaNode {
249 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
250 f.debug_struct("IotaNode")
251 .field("name", &self.state.name.concise())
252 .finish()
253 }
254}
255
256static MAX_JWK_KEYS_PER_FETCH: usize = 100;
257
258impl IotaNode {
259 pub async fn start(
260 config: NodeConfig,
261 registry_service: RegistryService,
262 custom_rpc_runtime: Option<Handle>,
263 ) -> Result<Arc<IotaNode>> {
264 Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
265 }
266
267 fn start_jwk_updater(
272 config: &NodeConfig,
273 metrics: Arc<IotaNodeMetrics>,
274 authority: AuthorityName,
275 epoch_store: Arc<AuthorityPerEpochStore>,
276 consensus_adapter: Arc<ConsensusAdapter>,
277 ) {
278 let epoch = epoch_store.epoch();
279
280 let supported_providers = config
281 .zklogin_oauth_providers
282 .get(&epoch_store.get_chain_identifier().chain())
283 .unwrap_or(&BTreeSet::new())
284 .iter()
285 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
286 .collect::<Vec<_>>();
287
288 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
289
290 info!(
291 ?fetch_interval,
292 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
293 );
294
295 fn validate_jwk(
296 metrics: &Arc<IotaNodeMetrics>,
297 provider: &OIDCProvider,
298 id: &JwkId,
299 jwk: &JWK,
300 ) -> bool {
301 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
302 warn!(
303 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
304 id.iss, provider
305 );
306 metrics
307 .invalid_jwks
308 .with_label_values(&[&provider.to_string()])
309 .inc();
310 return false;
311 };
312
313 if iss_provider != *provider {
314 warn!(
315 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
316 id.iss, provider, iss_provider
317 );
318 metrics
319 .invalid_jwks
320 .with_label_values(&[&provider.to_string()])
321 .inc();
322 return false;
323 }
324
325 if !check_total_jwk_size(id, jwk) {
326 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
327 metrics
328 .invalid_jwks
329 .with_label_values(&[&provider.to_string()])
330 .inc();
331 return false;
332 }
333
334 true
335 }
336
337 for p in supported_providers.into_iter() {
346 let provider_str = p.to_string();
347 let epoch_store = epoch_store.clone();
348 let consensus_adapter = consensus_adapter.clone();
349 let metrics = metrics.clone();
350 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
351 async move {
352 let mut seen = HashSet::new();
355 loop {
356 info!("fetching JWK for provider {:?}", p);
357 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
358 match Self::fetch_jwks(authority, &p).await {
359 Err(e) => {
360 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
361 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
362 tokio::time::sleep(Duration::from_secs(30)).await;
364 continue;
365 }
366 Ok(mut keys) => {
367 metrics.total_jwks
368 .with_label_values(&[&provider_str])
369 .inc_by(keys.len() as u64);
370
371 keys.retain(|(id, jwk)| {
372 validate_jwk(&metrics, &p, id, jwk) &&
373 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
374 seen.insert((id.clone(), jwk.clone()))
375 });
376
377 metrics.unique_jwks
378 .with_label_values(&[&provider_str])
379 .inc_by(keys.len() as u64);
380
381 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
384 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
385 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
386 }
387
388 for (id, jwk) in keys.into_iter() {
389 info!("Submitting JWK to consensus: {:?}", id);
390
391 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
392 consensus_adapter.submit(txn, None, &epoch_store)
393 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
394 .ok();
395 }
396 }
397 }
398 tokio::time::sleep(fetch_interval).await;
399 }
400 }
401 .instrument(error_span!("jwk_updater_task", epoch)),
402 ));
403 }
404 }
405
406 pub async fn start_async(
407 config: NodeConfig,
408 registry_service: RegistryService,
409 custom_rpc_runtime: Option<Handle>,
410 software_version: &'static str,
411 ) -> Result<Arc<IotaNode>> {
412 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
413 let mut config = config.clone();
414 if config.supported_protocol_versions.is_none() {
415 info!(
416 "populating config.supported_protocol_versions with default {:?}",
417 SupportedProtocolVersions::SYSTEM_DEFAULT
418 );
419 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
420 }
421
422 let run_with_range = config.run_with_range;
423 let is_validator = config.consensus_config().is_some();
424 let is_full_node = !is_validator;
425 let prometheus_registry = registry_service.default_registry();
426
427 info!(node =? config.authority_public_key(),
428 "Initializing iota-node listening on {}", config.network_address
429 );
430
431 let genesis = config.genesis()?.clone();
432
433 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
434 let _ = CHAIN_IDENTIFIER.set(chain_identifier);
436 info!("IOTA chain identifier: {chain_identifier}");
437
438 DBMetrics::init(&prometheus_registry);
440
441 iota_metrics::init_metrics(&prometheus_registry);
443 #[cfg(not(msim))]
446 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
447
448 register_hardware_metrics(®istry_service, &config.db_path)
450 .expect("Failed registering hardware metrics");
451 prometheus_registry
453 .register(iota_metrics::uptime_metric(
454 if is_validator {
455 "validator"
456 } else {
457 "fullnode"
458 },
459 software_version,
460 &chain_identifier.to_string(),
461 ))
462 .expect("Failed registering uptime metric");
463
464 let migration_tx_data = if genesis.contains_migrations() {
467 Some(config.load_migration_tx_data()?)
470 } else {
471 None
472 };
473
474 let secret = Arc::pin(config.authority_key_pair().copy());
475 let genesis_committee = genesis.committee()?;
476 let committee_store = Arc::new(CommitteeStore::new(
477 config.db_path().join("epochs"),
478 &genesis_committee,
479 None,
480 ));
481
482 let perpetual_options = default_db_options().optimize_db_for_write_throughput(4);
483 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
484 &config.db_path().join("store"),
485 Some(perpetual_options.options),
486 ));
487 let is_genesis = perpetual_tables
488 .database_is_empty()
489 .expect("Database read should not fail at init.");
490 let store = AuthorityStore::open(
491 perpetual_tables,
492 &genesis,
493 &config,
494 &prometheus_registry,
495 migration_tx_data.as_ref(),
496 )
497 .await?;
498
499 let cur_epoch = store.get_recovery_epoch_at_restart()?;
500 let committee = committee_store
501 .get_committee(&cur_epoch)?
502 .expect("Committee of the current epoch must exist");
503 let epoch_start_configuration = store
504 .get_epoch_start_configuration()?
505 .expect("EpochStartConfiguration of the current epoch must exist");
506 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
507 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
508
509 let cache_traits =
510 build_execution_cache(&epoch_start_configuration, &prometheus_registry, &store);
511
512 let auth_agg = {
513 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
514 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
515 Arc::new(ArcSwap::new(Arc::new(
516 AuthorityAggregator::new_from_epoch_start_state(
517 epoch_start_configuration.epoch_start_state(),
518 &committee_store,
519 safe_client_metrics_base,
520 auth_agg_metrics,
521 ),
522 )))
523 };
524
525 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
526 let epoch_store = AuthorityPerEpochStore::new(
527 config.authority_public_key(),
528 committee.clone(),
529 &config.db_path().join("store"),
530 Some(epoch_options.options),
531 EpochMetrics::new(®istry_service.default_registry()),
532 epoch_start_configuration,
533 cache_traits.backing_package_store.clone(),
534 cache_traits.object_store.clone(),
535 cache_metrics,
536 signature_verifier_metrics,
537 &config.expensive_safety_check_config,
538 ChainIdentifier::from(*genesis.checkpoint().digest()),
539 );
540
541 info!("created epoch store");
542
543 replay_log!(
544 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
545 epoch_store.epoch(),
546 epoch_store.protocol_config()
547 );
548
549 if is_genesis {
551 info!("checking IOTA conservation at genesis");
552 cache_traits
557 .reconfig_api
558 .expensive_check_iota_conservation(&epoch_store, None)
559 .expect("IOTA conservation check cannot fail at genesis");
560 }
561
562 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
563 let default_buffer_stake = epoch_store
564 .protocol_config()
565 .buffer_stake_for_protocol_upgrade_bps();
566 if effective_buffer_stake != default_buffer_stake {
567 warn!(
568 ?effective_buffer_stake,
569 ?default_buffer_stake,
570 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
571 );
572 }
573
574 info!("creating checkpoint store");
575
576 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
577 checkpoint_store.insert_genesis_checkpoint(
578 genesis.checkpoint(),
579 genesis.checkpoint_contents().clone(),
580 &epoch_store,
581 );
582
583 info!("creating state sync store");
584 let state_sync_store = RocksDbStore::new(
585 cache_traits.clone(),
586 committee_store.clone(),
587 checkpoint_store.clone(),
588 );
589
590 let index_store = if is_full_node && config.enable_index_processing {
591 info!("creating index store");
592 Some(Arc::new(IndexStore::new(
593 config.db_path().join("indexes"),
594 &prometheus_registry,
595 epoch_store
596 .protocol_config()
597 .max_move_identifier_len_as_option(),
598 config.remove_deprecated_tables,
599 )))
600 } else {
601 None
602 };
603
604 let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
605 {
606 Some(Arc::new(RestIndexStore::new(
607 config.db_path().join("rest_index"),
608 &store,
609 &checkpoint_store,
610 &epoch_store,
611 &cache_traits.backing_package_store,
612 )))
613 } else {
614 None
615 };
616
617 info!("creating archive reader");
618 let archive_readers =
623 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
624 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
625 let (randomness_tx, randomness_rx) = mpsc::channel(
626 config
627 .p2p_config
628 .randomness
629 .clone()
630 .unwrap_or_default()
631 .mailbox_capacity(),
632 );
633 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
634 Self::create_p2p_network(
635 &config,
636 state_sync_store.clone(),
637 chain_identifier,
638 trusted_peer_change_rx,
639 archive_readers.clone(),
640 randomness_tx,
641 &prometheus_registry,
642 )?;
643
644 send_trusted_peer_change(
647 &config,
648 &trusted_peer_change_tx,
649 epoch_store.epoch_start_state(),
650 );
651
652 info!("start state archival");
653 let state_archive_handle =
655 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
656 .await?;
657
658 info!("start snapshot upload");
659 let state_snapshot_handle =
661 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
662
663 info!("start db checkpoint");
665 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
666 &config,
667 &prometheus_registry,
668 state_snapshot_handle.is_some(),
669 )?;
670
671 let mut genesis_objects = genesis.objects().to_vec();
672 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
673 genesis_objects.extend(migration_tx_data.get_objects());
674 }
675
676 let authority_name = config.authority_public_key();
677 let validator_tx_finalizer =
678 config
679 .enable_validator_tx_finalizer
680 .then_some(Arc::new(ValidatorTxFinalizer::new(
681 auth_agg.clone(),
682 authority_name,
683 &prometheus_registry,
684 )));
685
686 info!("create authority state");
687 let state = AuthorityState::new(
688 authority_name,
689 secret,
690 config.supported_protocol_versions.unwrap(),
691 store.clone(),
692 cache_traits.clone(),
693 epoch_store.clone(),
694 committee_store.clone(),
695 index_store.clone(),
696 rest_index,
697 checkpoint_store.clone(),
698 &prometheus_registry,
699 &genesis_objects,
700 &db_checkpoint_config,
701 config.clone(),
702 config.indirect_objects_threshold,
703 archive_readers,
704 validator_tx_finalizer,
705 )
706 .await;
707
708 if epoch_store.epoch() == 0 {
710 let genesis_tx = &genesis.transaction();
711 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
712 Self::execute_transaction_immediately_at_zero_epoch(
714 &state,
715 &epoch_store,
716 genesis_tx,
717 span,
718 )
719 .await;
720
721 if let Some(migration_tx_data) = migration_tx_data {
723 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
724 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
725 Self::execute_transaction_immediately_at_zero_epoch(
726 &state,
727 &epoch_store,
728 tx,
729 span,
730 )
731 .await;
732 }
733 }
734 }
735
736 checkpoint_store
737 .reexecute_local_checkpoints(&state, &epoch_store)
738 .await;
739
740 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
743
744 if config
745 .expensive_safety_check_config
746 .enable_secondary_index_checks()
747 {
748 if let Some(indexes) = state.indexes.clone() {
749 iota_core::verify_indexes::verify_indexes(
750 state.get_accumulator_store().as_ref(),
751 indexes,
752 )
753 .expect("secondary indexes are inconsistent");
754 }
755 }
756
757 let (end_of_epoch_channel, end_of_epoch_receiver) =
758 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
759
760 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
761 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
762 auth_agg.load_full(),
763 state.clone(),
764 end_of_epoch_receiver,
765 &config.db_path(),
766 &prometheus_registry,
767 )))
768 } else {
769 None
770 };
771
772 let http_server = build_http_server(
773 state.clone(),
774 state_sync_store,
775 &transaction_orchestrator.clone(),
776 &config,
777 &prometheus_registry,
778 custom_rpc_runtime,
779 software_version,
780 )
781 .await?;
782
783 let accumulator = Arc::new(StateAccumulator::new(
784 cache_traits.accumulator_store.clone(),
785 StateAccumulatorMetrics::new(&prometheus_registry),
786 ));
787
788 let authority_names_to_peer_ids = epoch_store
789 .epoch_start_state()
790 .get_authority_names_to_peer_ids();
791
792 let network_connection_metrics =
793 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
794
795 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
796
797 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
798 p2p_network.downgrade(),
799 network_connection_metrics,
800 HashMap::new(),
801 None,
802 );
803
804 let connection_monitor_status = ConnectionMonitorStatus {
805 connection_statuses,
806 authority_names_to_peer_ids,
807 };
808
809 let connection_monitor_status = Arc::new(connection_monitor_status);
810 let iota_node_metrics =
811 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
812
813 let validator_components = if state.is_validator(&epoch_store) {
814 let components = Self::construct_validator_components(
815 config.clone(),
816 state.clone(),
817 committee,
818 epoch_store.clone(),
819 checkpoint_store.clone(),
820 state_sync_handle.clone(),
821 randomness_handle.clone(),
822 Arc::downgrade(&accumulator),
823 connection_monitor_status.clone(),
824 ®istry_service,
825 iota_node_metrics.clone(),
826 )
827 .await?;
828 components.consensus_adapter.submit_recovered(&epoch_store);
830
831 Some(components)
832 } else {
833 None
834 };
835
836 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
838
839 let node = Self {
840 config,
841 validator_components: Mutex::new(validator_components),
842 _http_server: http_server,
843 state,
844 transaction_orchestrator,
845 registry_service,
846 metrics: iota_node_metrics,
847
848 _discovery: discovery_handle,
849 state_sync_handle,
850 randomness_handle,
851 checkpoint_store,
852 accumulator: Mutex::new(Some(accumulator)),
853 end_of_epoch_channel,
854 connection_monitor_status,
855 trusted_peer_change_tx,
856
857 _db_checkpoint_handle: db_checkpoint_handle,
858
859 #[cfg(msim)]
860 sim_state: Default::default(),
861
862 _state_archive_handle: state_archive_handle,
863 _state_snapshot_uploader_handle: state_snapshot_handle,
864 shutdown_channel_tx: shutdown_channel,
865
866 auth_agg,
867 };
868
869 info!("IotaNode started!");
870 let node = Arc::new(node);
871 let node_copy = node.clone();
872 spawn_monitored_task!(async move {
873 let result = Self::monitor_reconfiguration(node_copy).await;
874 if let Err(error) = result {
875 warn!("Reconfiguration finished with error {:?}", error);
876 }
877 });
878
879 Ok(node)
880 }
881
882 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
883 self.end_of_epoch_channel.subscribe()
884 }
885
886 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
887 self.shutdown_channel_tx.subscribe()
888 }
889
890 pub fn current_epoch_for_testing(&self) -> EpochId {
891 self.state.current_epoch_for_testing()
892 }
893
894 pub fn db_checkpoint_path(&self) -> PathBuf {
895 self.config.db_checkpoint_path()
896 }
897
898 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
900 info!("close_epoch (current epoch = {})", epoch_store.epoch());
901 self.validator_components
902 .lock()
903 .await
904 .as_ref()
905 .ok_or_else(|| IotaError::from("Node is not a validator"))?
906 .consensus_adapter
907 .close_epoch(epoch_store);
908 Ok(())
909 }
910
911 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
912 self.state
913 .clear_override_protocol_upgrade_buffer_stake(epoch)
914 }
915
916 pub fn set_override_protocol_upgrade_buffer_stake(
917 &self,
918 epoch: EpochId,
919 buffer_stake_bps: u64,
920 ) -> IotaResult {
921 self.state
922 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
923 }
924
925 pub async fn close_epoch_for_testing(&self) -> IotaResult {
928 let epoch_store = self.state.epoch_store_for_testing();
929 self.close_epoch(&epoch_store).await
930 }
931
932 async fn start_state_archival(
933 config: &NodeConfig,
934 prometheus_registry: &Registry,
935 state_sync_store: RocksDbStore,
936 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
937 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
938 let local_store_config = ObjectStoreConfig {
939 object_store: Some(ObjectStoreType::File),
940 directory: Some(config.archive_path()),
941 ..Default::default()
942 };
943 let archive_writer = ArchiveWriter::new(
944 local_store_config,
945 remote_store_config.clone(),
946 FileCompression::Zstd,
947 StorageFormat::Blob,
948 Duration::from_secs(600),
949 256 * 1024 * 1024,
950 prometheus_registry,
951 )
952 .await?;
953 Ok(Some(archive_writer.start(state_sync_store).await?))
954 } else {
955 Ok(None)
956 }
957 }
958
959 fn start_state_snapshot(
962 config: &NodeConfig,
963 prometheus_registry: &Registry,
964 checkpoint_store: Arc<CheckpointStore>,
965 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
966 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
967 let snapshot_uploader = StateSnapshotUploader::new(
968 &config.db_checkpoint_path(),
969 &config.snapshot_path(),
970 remote_store_config.clone(),
971 60,
972 prometheus_registry,
973 checkpoint_store,
974 )?;
975 Ok(Some(snapshot_uploader.start()))
976 } else {
977 Ok(None)
978 }
979 }
980
981 fn start_db_checkpoint(
982 config: &NodeConfig,
983 prometheus_registry: &Registry,
984 state_snapshot_enabled: bool,
985 ) -> Result<(
986 DBCheckpointConfig,
987 Option<tokio::sync::broadcast::Sender<()>>,
988 )> {
989 let checkpoint_path = Some(
990 config
991 .db_checkpoint_config
992 .checkpoint_path
993 .clone()
994 .unwrap_or_else(|| config.db_checkpoint_path()),
995 );
996 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
997 DBCheckpointConfig {
998 checkpoint_path,
999 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1000 true
1001 } else {
1002 config
1003 .db_checkpoint_config
1004 .perform_db_checkpoints_at_epoch_end
1005 },
1006 ..config.db_checkpoint_config.clone()
1007 }
1008 } else {
1009 config.db_checkpoint_config.clone()
1010 };
1011
1012 match (
1013 db_checkpoint_config.object_store_config.as_ref(),
1014 state_snapshot_enabled,
1015 ) {
1016 (None, false) => Ok((db_checkpoint_config, None)),
1021 (_, _) => {
1022 let handler = DBCheckpointHandler::new(
1023 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1024 db_checkpoint_config.object_store_config.as_ref(),
1025 60,
1026 db_checkpoint_config
1027 .prune_and_compact_before_upload
1028 .unwrap_or(true),
1029 config.indirect_objects_threshold,
1030 config.authority_store_pruning_config.clone(),
1031 prometheus_registry,
1032 state_snapshot_enabled,
1033 )?;
1034 Ok((
1035 db_checkpoint_config,
1036 Some(DBCheckpointHandler::start(handler)),
1037 ))
1038 }
1039 }
1040 }
1041
1042 fn create_p2p_network(
1043 config: &NodeConfig,
1044 state_sync_store: RocksDbStore,
1045 chain_identifier: ChainIdentifier,
1046 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1047 archive_readers: ArchiveReaderBalancer,
1048 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1049 prometheus_registry: &Registry,
1050 ) -> Result<(
1051 Network,
1052 discovery::Handle,
1053 state_sync::Handle,
1054 randomness::Handle,
1055 )> {
1056 let (state_sync, state_sync_server) = state_sync::Builder::new()
1057 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1058 .store(state_sync_store)
1059 .archive_readers(archive_readers)
1060 .with_metrics(prometheus_registry)
1061 .build();
1062
1063 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1064 .config(config.p2p_config.clone())
1065 .build();
1066
1067 let (randomness, randomness_router) =
1068 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1069 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1070 .with_metrics(prometheus_registry)
1071 .build();
1072
1073 let p2p_network = {
1074 let routes = anemo::Router::new()
1075 .add_rpc_service(discovery_server)
1076 .add_rpc_service(state_sync_server);
1077 let routes = routes.merge(randomness_router);
1078
1079 let inbound_network_metrics =
1080 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1081 let outbound_network_metrics =
1082 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1083
1084 let service = ServiceBuilder::new()
1085 .layer(
1086 TraceLayer::new_for_server_errors()
1087 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1088 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1089 )
1090 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1091 Arc::new(inbound_network_metrics),
1092 config.p2p_config.excessive_message_size(),
1093 )))
1094 .service(routes);
1095
1096 let outbound_layer = ServiceBuilder::new()
1097 .layer(
1098 TraceLayer::new_for_client_and_server_errors()
1099 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1100 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1101 )
1102 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1103 Arc::new(outbound_network_metrics),
1104 config.p2p_config.excessive_message_size(),
1105 )))
1106 .into_inner();
1107
1108 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1109 anemo_config.max_frame_size = Some(1 << 30);
1112
1113 let mut quic_config = anemo_config.quic.unwrap_or_default();
1116 if quic_config.socket_send_buffer_size.is_none() {
1117 quic_config.socket_send_buffer_size = Some(20 << 20);
1118 }
1119 if quic_config.socket_receive_buffer_size.is_none() {
1120 quic_config.socket_receive_buffer_size = Some(20 << 20);
1121 }
1122 quic_config.allow_failed_socket_buffer_size_setting = true;
1123
1124 if quic_config.max_concurrent_bidi_streams.is_none() {
1127 quic_config.max_concurrent_bidi_streams = Some(500);
1128 }
1129 if quic_config.max_concurrent_uni_streams.is_none() {
1130 quic_config.max_concurrent_uni_streams = Some(500);
1131 }
1132 if quic_config.stream_receive_window.is_none() {
1133 quic_config.stream_receive_window = Some(100 << 20);
1134 }
1135 if quic_config.receive_window.is_none() {
1136 quic_config.receive_window = Some(200 << 20);
1137 }
1138 if quic_config.send_window.is_none() {
1139 quic_config.send_window = Some(200 << 20);
1140 }
1141 if quic_config.crypto_buffer_size.is_none() {
1142 quic_config.crypto_buffer_size = Some(1 << 20);
1143 }
1144 if quic_config.max_idle_timeout_ms.is_none() {
1145 quic_config.max_idle_timeout_ms = Some(30_000);
1146 }
1147 if quic_config.keep_alive_interval_ms.is_none() {
1148 quic_config.keep_alive_interval_ms = Some(5_000);
1149 }
1150 anemo_config.quic = Some(quic_config);
1151
1152 let server_name = format!("iota-{}", chain_identifier);
1153 let network = Network::bind(config.p2p_config.listen_address)
1154 .server_name(&server_name)
1155 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1156 .config(anemo_config)
1157 .outbound_request_layer(outbound_layer)
1158 .start(service)?;
1159 info!(
1160 server_name = server_name,
1161 "P2p network started on {}",
1162 network.local_addr()
1163 );
1164
1165 network
1166 };
1167
1168 let discovery_handle = discovery.start(p2p_network.clone());
1169 let state_sync_handle = state_sync.start(p2p_network.clone());
1170 let randomness_handle = randomness.start(p2p_network.clone());
1171
1172 Ok((
1173 p2p_network,
1174 discovery_handle,
1175 state_sync_handle,
1176 randomness_handle,
1177 ))
1178 }
1179
1180 async fn construct_validator_components(
1183 config: NodeConfig,
1184 state: Arc<AuthorityState>,
1185 committee: Arc<Committee>,
1186 epoch_store: Arc<AuthorityPerEpochStore>,
1187 checkpoint_store: Arc<CheckpointStore>,
1188 state_sync_handle: state_sync::Handle,
1189 randomness_handle: randomness::Handle,
1190 accumulator: Weak<StateAccumulator>,
1191 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1192 registry_service: &RegistryService,
1193 iota_node_metrics: Arc<IotaNodeMetrics>,
1194 ) -> Result<ValidatorComponents> {
1195 let mut config_clone = config.clone();
1196 let consensus_config = config_clone
1197 .consensus_config
1198 .as_mut()
1199 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1200
1201 let client = Arc::new(UpdatableConsensusClient::new());
1202 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1203 &committee,
1204 consensus_config,
1205 state.name,
1206 connection_monitor_status.clone(),
1207 ®istry_service.default_registry(),
1208 client.clone(),
1209 ));
1210 let consensus_manager =
1211 ConsensusManager::new(&config, consensus_config, registry_service, client);
1212
1213 let consensus_store_pruner = ConsensusStorePruner::new(
1216 consensus_manager.get_storage_base_path(),
1217 consensus_config.db_retention_epochs(),
1218 consensus_config.db_pruner_period(),
1219 ®istry_service.default_registry(),
1220 );
1221
1222 let checkpoint_metrics = CheckpointMetrics::new(®istry_service.default_registry());
1223 let iota_tx_validator_metrics =
1224 IotaTxValidatorMetrics::new(®istry_service.default_registry());
1225
1226 let validator_server_handle = Self::start_grpc_validator_service(
1227 &config,
1228 state.clone(),
1229 consensus_adapter.clone(),
1230 ®istry_service.default_registry(),
1231 )
1232 .await?;
1233
1234 let validator_overload_monitor_handle = if config
1237 .authority_overload_config
1238 .max_load_shedding_percentage
1239 > 0
1240 {
1241 let authority_state = Arc::downgrade(&state);
1242 let overload_config = config.authority_overload_config.clone();
1243 fail_point!("starting_overload_monitor");
1244 Some(spawn_monitored_task!(overload_monitor(
1245 authority_state,
1246 overload_config,
1247 )))
1248 } else {
1249 None
1250 };
1251
1252 Self::start_epoch_specific_validator_components(
1253 &config,
1254 state.clone(),
1255 consensus_adapter,
1256 checkpoint_store,
1257 epoch_store,
1258 state_sync_handle,
1259 randomness_handle,
1260 consensus_manager,
1261 consensus_store_pruner,
1262 accumulator,
1263 validator_server_handle,
1264 validator_overload_monitor_handle,
1265 checkpoint_metrics,
1266 iota_node_metrics,
1267 iota_tx_validator_metrics,
1268 )
1269 .await
1270 }
1271
1272 async fn start_epoch_specific_validator_components(
1275 config: &NodeConfig,
1276 state: Arc<AuthorityState>,
1277 consensus_adapter: Arc<ConsensusAdapter>,
1278 checkpoint_store: Arc<CheckpointStore>,
1279 epoch_store: Arc<AuthorityPerEpochStore>,
1280 state_sync_handle: state_sync::Handle,
1281 randomness_handle: randomness::Handle,
1282 consensus_manager: ConsensusManager,
1283 consensus_store_pruner: ConsensusStorePruner,
1284 accumulator: Weak<StateAccumulator>,
1285 validator_server_handle: JoinHandle<Result<()>>,
1286 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1287 checkpoint_metrics: Arc<CheckpointMetrics>,
1288 iota_node_metrics: Arc<IotaNodeMetrics>,
1289 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1290 ) -> Result<ValidatorComponents> {
1291 let (checkpoint_service, checkpoint_service_tasks) = Self::start_checkpoint_service(
1292 config,
1293 consensus_adapter.clone(),
1294 checkpoint_store,
1295 epoch_store.clone(),
1296 state.clone(),
1297 state_sync_handle,
1298 accumulator,
1299 checkpoint_metrics.clone(),
1300 );
1301
1302 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1307
1308 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1309
1310 let randomness_manager = RandomnessManager::try_new(
1311 Arc::downgrade(&epoch_store),
1312 Box::new(consensus_adapter.clone()),
1313 randomness_handle,
1314 config.authority_key_pair(),
1315 )
1316 .await;
1317 if let Some(randomness_manager) = randomness_manager {
1318 epoch_store
1319 .set_randomness_manager(randomness_manager)
1320 .await?;
1321 }
1322
1323 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1324 state.clone(),
1325 checkpoint_service.clone(),
1326 epoch_store.clone(),
1327 low_scoring_authorities,
1328 );
1329
1330 consensus_manager
1331 .start(
1332 config,
1333 epoch_store.clone(),
1334 consensus_handler_initializer,
1335 IotaTxValidator::new(
1336 epoch_store.clone(),
1337 checkpoint_service.clone(),
1338 state.transaction_manager().clone(),
1339 iota_tx_validator_metrics.clone(),
1340 ),
1341 )
1342 .await;
1343
1344 if epoch_store.authenticator_state_enabled() {
1345 Self::start_jwk_updater(
1346 config,
1347 iota_node_metrics,
1348 state.name,
1349 epoch_store.clone(),
1350 consensus_adapter.clone(),
1351 );
1352 }
1353
1354 Ok(ValidatorComponents {
1355 validator_server_handle,
1356 validator_overload_monitor_handle,
1357 consensus_manager,
1358 consensus_store_pruner,
1359 consensus_adapter,
1360 checkpoint_service_tasks,
1361 checkpoint_metrics,
1362 iota_tx_validator_metrics,
1363 })
1364 }
1365
1366 fn start_checkpoint_service(
1373 config: &NodeConfig,
1374 consensus_adapter: Arc<ConsensusAdapter>,
1375 checkpoint_store: Arc<CheckpointStore>,
1376 epoch_store: Arc<AuthorityPerEpochStore>,
1377 state: Arc<AuthorityState>,
1378 state_sync_handle: state_sync::Handle,
1379 accumulator: Weak<StateAccumulator>,
1380 checkpoint_metrics: Arc<CheckpointMetrics>,
1381 ) -> (Arc<CheckpointService>, JoinSet<()>) {
1382 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1383 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1384
1385 debug!(
1386 "Starting checkpoint service with epoch start timestamp {}
1387 and epoch duration {}",
1388 epoch_start_timestamp_ms, epoch_duration_ms
1389 );
1390
1391 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1392 sender: consensus_adapter,
1393 signer: state.secret.clone(),
1394 authority: config.authority_public_key(),
1395 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1396 .checked_add(epoch_duration_ms)
1397 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1398 metrics: checkpoint_metrics.clone(),
1399 });
1400
1401 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1402 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1403 let max_checkpoint_size_bytes =
1404 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1405
1406 CheckpointService::spawn(
1407 state.clone(),
1408 checkpoint_store,
1409 epoch_store,
1410 state.get_transaction_cache_reader().clone(),
1411 accumulator,
1412 checkpoint_output,
1413 Box::new(certified_checkpoint_output),
1414 checkpoint_metrics,
1415 max_tx_per_checkpoint,
1416 max_checkpoint_size_bytes,
1417 )
1418 }
1419
1420 fn construct_consensus_adapter(
1421 committee: &Committee,
1422 consensus_config: &ConsensusConfig,
1423 authority: AuthorityName,
1424 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1425 prometheus_registry: &Registry,
1426 consensus_client: Arc<dyn ConsensusClient>,
1427 ) -> ConsensusAdapter {
1428 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1429 ConsensusAdapter::new(
1433 consensus_client,
1434 authority,
1435 connection_monitor_status,
1436 consensus_config.max_pending_transactions(),
1437 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1438 consensus_config.max_submit_position,
1439 consensus_config.submit_delay_step_override(),
1440 ca_metrics,
1441 )
1442 }
1443
1444 async fn start_grpc_validator_service(
1445 config: &NodeConfig,
1446 state: Arc<AuthorityState>,
1447 consensus_adapter: Arc<ConsensusAdapter>,
1448 prometheus_registry: &Registry,
1449 ) -> Result<tokio::task::JoinHandle<Result<()>>> {
1450 let validator_service = ValidatorService::new(
1451 state.clone(),
1452 consensus_adapter,
1453 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1454 TrafficControllerMetrics::new(prometheus_registry),
1455 config.policy_config.clone(),
1456 config.firewall_config.clone(),
1457 );
1458
1459 let mut server_conf = iota_network_stack::config::Config::new();
1460 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1461 server_conf.load_shed = config.grpc_load_shed;
1462 let mut server_builder =
1463 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1464
1465 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1466
1467 let server = server_builder
1468 .bind(config.network_address())
1469 .await
1470 .map_err(|err| anyhow!(err.to_string()))?;
1471 let local_addr = server.local_addr();
1472 info!("Listening to traffic on {local_addr}");
1473 let grpc_server = spawn_monitored_task!(server.serve().map_err(Into::into));
1474
1475 Ok(grpc_server)
1476 }
1477
1478 pub fn state(&self) -> Arc<AuthorityState> {
1479 self.state.clone()
1480 }
1481
1482 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1484 self.state.reference_gas_price_for_testing()
1485 }
1486
1487 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1488 self.state.committee_store().clone()
1489 }
1490
1491 pub fn clone_authority_aggregator(
1501 &self,
1502 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1503 self.transaction_orchestrator
1504 .as_ref()
1505 .map(|to| to.clone_authority_aggregator())
1506 }
1507
1508 pub fn transaction_orchestrator(
1509 &self,
1510 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1511 self.transaction_orchestrator.clone()
1512 }
1513
1514 pub fn subscribe_to_transaction_orchestrator_effects(
1515 &self,
1516 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1517 self.transaction_orchestrator
1518 .as_ref()
1519 .map(|to| to.subscribe_to_effects_queue())
1520 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1521 }
1522
1523 pub async fn monitor_reconfiguration(self: Arc<Self>) -> Result<()> {
1529 let checkpoint_executor_metrics =
1530 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1531
1532 loop {
1533 let mut accumulator_guard = self.accumulator.lock().await;
1534 let accumulator = accumulator_guard.take().unwrap();
1535 let mut checkpoint_executor = CheckpointExecutor::new(
1536 self.state_sync_handle.subscribe_to_synced_checkpoints(),
1537 self.checkpoint_store.clone(),
1538 self.state.clone(),
1539 accumulator.clone(),
1540 self.config.checkpoint_executor_config.clone(),
1541 checkpoint_executor_metrics.clone(),
1542 );
1543
1544 let run_with_range = self.config.run_with_range;
1545
1546 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1547
1548 if let Some(components) = &*self.validator_components.lock().await {
1550 tokio::time::sleep(Duration::from_millis(1)).await;
1552
1553 let config = cur_epoch_store.protocol_config();
1554 let binary_config = to_binary_config(config);
1555 let transaction = ConsensusTransaction::new_capability_notification_v1(
1556 AuthorityCapabilitiesV1::new(
1557 self.state.name,
1558 cur_epoch_store.get_chain_identifier().chain(),
1559 self.config
1560 .supported_protocol_versions
1561 .expect("Supported versions should be populated")
1562 .truncate_below(config.version),
1564 self.state
1565 .get_available_system_packages(&binary_config)
1566 .await,
1567 ),
1568 );
1569 info!(?transaction, "submitting capabilities to consensus");
1570 components
1571 .consensus_adapter
1572 .submit(transaction, None, &cur_epoch_store)?;
1573 }
1574
1575 let stop_condition = checkpoint_executor
1576 .run_epoch(cur_epoch_store.clone(), run_with_range)
1577 .await;
1578 drop(checkpoint_executor);
1579
1580 if stop_condition == StopReason::RunWithRangeCondition {
1581 IotaNode::shutdown(&self).await;
1582 self.shutdown_channel_tx
1583 .send(run_with_range)
1584 .expect("RunWithRangeCondition met but failed to send shutdown message");
1585 return Ok(());
1586 }
1587
1588 let latest_system_state = self
1590 .state
1591 .get_object_cache_reader()
1592 .get_iota_system_state_object_unsafe()
1593 .expect("Read IOTA System State object cannot fail");
1594
1595 #[cfg(msim)]
1596 if !self
1597 .sim_state
1598 .sim_safe_mode_expected
1599 .load(Ordering::Relaxed)
1600 {
1601 debug_assert!(!latest_system_state.safe_mode());
1602 }
1603
1604 #[cfg(not(msim))]
1605 debug_assert!(!latest_system_state.safe_mode());
1606
1607 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1608 if self.state.is_fullnode(&cur_epoch_store) {
1609 warn!(
1610 "Failed to send end of epoch notification to subscriber: {:?}",
1611 err
1612 );
1613 }
1614 }
1615
1616 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1617 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1618
1619 self.auth_agg.store(Arc::new(
1620 self.auth_agg
1621 .load()
1622 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1623 ));
1624
1625 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1626 let next_epoch = next_epoch_committee.epoch();
1627 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1628
1629 info!(
1630 next_epoch,
1631 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1632 );
1633
1634 fail_point_async!("reconfig_delay");
1635
1636 let authority_names_to_peer_ids =
1641 new_epoch_start_state.get_authority_names_to_peer_ids();
1642 self.connection_monitor_status
1643 .update_mapping_for_epoch(authority_names_to_peer_ids);
1644
1645 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1646
1647 send_trusted_peer_change(
1648 &self.config,
1649 &self.trusted_peer_change_tx,
1650 &new_epoch_start_state,
1651 );
1652
1653 let new_validator_components = if let Some(ValidatorComponents {
1658 validator_server_handle,
1659 validator_overload_monitor_handle,
1660 consensus_manager,
1661 consensus_store_pruner,
1662 consensus_adapter,
1663 mut checkpoint_service_tasks,
1664 checkpoint_metrics,
1665 iota_tx_validator_metrics,
1666 }) = self.validator_components.lock().await.take()
1667 {
1668 info!("Reconfiguring the validator.");
1669 checkpoint_service_tasks.abort_all();
1674 while let Some(result) = checkpoint_service_tasks.join_next().await {
1675 if let Err(err) = result {
1676 if err.is_panic() {
1677 std::panic::resume_unwind(err.into_panic());
1678 }
1679 warn!("Error in checkpoint service task: {:?}", err);
1680 }
1681 }
1682 info!("Checkpoint service has shut down.");
1683
1684 consensus_manager.shutdown().await;
1685 info!("Consensus has shut down.");
1686
1687 let new_epoch_store = self
1688 .reconfigure_state(
1689 &self.state,
1690 &cur_epoch_store,
1691 next_epoch_committee.clone(),
1692 new_epoch_start_state,
1693 accumulator.clone(),
1694 )
1695 .await?;
1696 info!("Epoch store finished reconfiguration.");
1697
1698 let accumulator_metrics = Arc::into_inner(accumulator)
1701 .expect("Accumulator should have no other references at this point")
1702 .metrics();
1703 let new_accumulator = Arc::new(StateAccumulator::new(
1704 self.state.get_accumulator_store().clone(),
1705 accumulator_metrics,
1706 ));
1707 let weak_accumulator = Arc::downgrade(&new_accumulator);
1708 *accumulator_guard = Some(new_accumulator);
1709
1710 consensus_store_pruner.prune(next_epoch).await;
1711
1712 if self.state.is_validator(&new_epoch_store) {
1713 Some(
1715 Self::start_epoch_specific_validator_components(
1716 &self.config,
1717 self.state.clone(),
1718 consensus_adapter,
1719 self.checkpoint_store.clone(),
1720 new_epoch_store.clone(),
1721 self.state_sync_handle.clone(),
1722 self.randomness_handle.clone(),
1723 consensus_manager,
1724 consensus_store_pruner,
1725 weak_accumulator,
1726 validator_server_handle,
1727 validator_overload_monitor_handle,
1728 checkpoint_metrics,
1729 self.metrics.clone(),
1730 iota_tx_validator_metrics,
1731 )
1732 .await?,
1733 )
1734 } else {
1735 info!("This node is no longer a validator after reconfiguration");
1736
1737 consensus_adapter.unregister_consensus_adapter_metrics(
1738 &self.registry_service.default_registry(),
1739 );
1740 debug!("Unregistered consensus adapter metrics");
1741 None
1742 }
1743 } else {
1744 let new_epoch_store = self
1745 .reconfigure_state(
1746 &self.state,
1747 &cur_epoch_store,
1748 next_epoch_committee.clone(),
1749 new_epoch_start_state,
1750 accumulator.clone(),
1751 )
1752 .await?;
1753
1754 let accumulator_metrics = Arc::into_inner(accumulator)
1757 .expect("Accumulator should have no other references at this point")
1758 .metrics();
1759 let new_accumulator = Arc::new(StateAccumulator::new(
1760 self.state.get_accumulator_store().clone(),
1761 accumulator_metrics,
1762 ));
1763 let weak_accumulator = Arc::downgrade(&new_accumulator);
1764 *accumulator_guard = Some(new_accumulator);
1765
1766 if self.state.is_validator(&new_epoch_store) {
1767 info!("Promoting the node from fullnode to validator, starting grpc server");
1768
1769 Some(
1770 Self::construct_validator_components(
1771 self.config.clone(),
1772 self.state.clone(),
1773 Arc::new(next_epoch_committee.clone()),
1774 new_epoch_store.clone(),
1775 self.checkpoint_store.clone(),
1776 self.state_sync_handle.clone(),
1777 self.randomness_handle.clone(),
1778 weak_accumulator,
1779 self.connection_monitor_status.clone(),
1780 &self.registry_service,
1781 self.metrics.clone(),
1782 )
1783 .await?,
1784 )
1785 } else {
1786 None
1787 }
1788 };
1789 *self.validator_components.lock().await = new_validator_components;
1790
1791 cur_epoch_store.release_db_handles();
1794
1795 if cfg!(msim)
1796 && !matches!(
1797 self.config
1798 .authority_store_pruning_config
1799 .num_epochs_to_retain_for_checkpoints(),
1800 None | Some(u64::MAX) | Some(0)
1801 )
1802 {
1803 self.state
1804 .prune_checkpoints_for_eligible_epochs_for_testing(
1805 self.config.clone(),
1806 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
1807 )
1808 .await?;
1809 }
1810
1811 info!("Reconfiguration finished");
1812 }
1813 }
1814
1815 async fn shutdown(&self) {
1816 if let Some(validator_components) = &*self.validator_components.lock().await {
1817 validator_components.consensus_manager.shutdown().await;
1818 }
1819 }
1820
1821 async fn reconfigure_state(
1824 &self,
1825 state: &Arc<AuthorityState>,
1826 cur_epoch_store: &AuthorityPerEpochStore,
1827 next_epoch_committee: Committee,
1828 next_epoch_start_system_state: EpochStartSystemState,
1829 accumulator: Arc<StateAccumulator>,
1830 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
1831 let next_epoch = next_epoch_committee.epoch();
1832
1833 let last_checkpoint = self
1834 .checkpoint_store
1835 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
1836 .expect("Error loading last checkpoint for current epoch")
1837 .expect("Could not load last checkpoint for current epoch");
1838 let epoch_supply_change = last_checkpoint
1839 .end_of_epoch_data
1840 .as_ref()
1841 .ok_or_else(|| {
1842 IotaError::from("last checkpoint in epoch should contain end of epoch data")
1843 })?
1844 .epoch_supply_change;
1845
1846 let epoch_start_configuration = EpochStartConfiguration::new(
1847 next_epoch_start_system_state,
1848 *last_checkpoint.digest(),
1849 state.get_object_store().as_ref(),
1850 EpochFlag::default_flags_for_new_epoch(&state.config),
1851 )
1852 .expect("EpochStartConfiguration construction cannot fail");
1853
1854 let new_epoch_store = self
1855 .state
1856 .reconfigure(
1857 cur_epoch_store,
1858 self.config.supported_protocol_versions.unwrap(),
1859 next_epoch_committee,
1860 epoch_start_configuration,
1861 accumulator,
1862 &self.config.expensive_safety_check_config,
1863 epoch_supply_change,
1864 )
1865 .await
1866 .expect("Reconfigure authority state cannot fail");
1867 info!(next_epoch, "Node State has been reconfigured");
1868 assert_eq!(next_epoch, new_epoch_store.epoch());
1869 self.state.get_reconfig_api().update_epoch_flags_metrics(
1870 cur_epoch_store.epoch_start_config().flags(),
1871 new_epoch_store.epoch_start_config().flags(),
1872 );
1873
1874 Ok(new_epoch_store)
1875 }
1876
1877 pub fn get_config(&self) -> &NodeConfig {
1878 &self.config
1879 }
1880
1881 async fn execute_transaction_immediately_at_zero_epoch(
1882 state: &Arc<AuthorityState>,
1883 epoch_store: &Arc<AuthorityPerEpochStore>,
1884 tx: &Transaction,
1885 span: tracing::Span,
1886 ) {
1887 let transaction =
1888 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
1889 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
1890 tx.data().clone(),
1891 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
1892 ),
1893 );
1894 state
1895 .try_execute_immediately(&transaction, None, epoch_store)
1896 .instrument(span)
1897 .await
1898 .unwrap();
1899 }
1900
1901 pub fn randomness_handle(&self) -> randomness::Handle {
1902 self.randomness_handle.clone()
1903 }
1904}
1905
1906#[cfg(not(msim))]
1907impl IotaNode {
1908 async fn fetch_jwks(
1909 _authority: AuthorityName,
1910 provider: &OIDCProvider,
1911 ) -> IotaResult<Vec<(JwkId, JWK)>> {
1912 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
1913 let client = reqwest::Client::new();
1914 fetch_jwks(provider, &client)
1915 .await
1916 .map_err(|_| IotaError::JWKRetrieval)
1917 }
1918}
1919
1920#[cfg(msim)]
1921impl IotaNode {
1922 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
1923 self.sim_state.sim_node.id()
1924 }
1925
1926 pub fn set_safe_mode_expected(&self, new_value: bool) {
1927 info!("Setting safe mode expected to {}", new_value);
1928 self.sim_state
1929 .sim_safe_mode_expected
1930 .store(new_value, Ordering::Relaxed);
1931 }
1932
1933 async fn fetch_jwks(
1934 authority: AuthorityName,
1935 provider: &OIDCProvider,
1936 ) -> IotaResult<Vec<(JwkId, JWK)>> {
1937 get_jwk_injector()(authority, provider)
1938 }
1939}
1940
1941fn send_trusted_peer_change(
1944 config: &NodeConfig,
1945 sender: &watch::Sender<TrustedPeerChangeEvent>,
1946 new_epoch_start_state: &EpochStartSystemState,
1947) {
1948 let new_committee =
1949 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
1950
1951 sender.send_modify(|event| {
1952 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
1953 event.new_committee = new_committee;
1954 })
1955}
1956
1957fn build_kv_store(
1958 state: &Arc<AuthorityState>,
1959 config: &NodeConfig,
1960 registry: &Registry,
1961) -> Result<Arc<TransactionKeyValueStore>> {
1962 let metrics = KeyValueStoreMetrics::new(registry);
1963 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
1964
1965 let base_url = &config.transaction_kv_store_read_config.base_url;
1966
1967 if base_url.is_empty() {
1968 info!("no http kv store url provided, using local db only");
1969 return Ok(Arc::new(db_store));
1970 }
1971
1972 base_url.parse::<url::Url>().tap_err(|e| {
1973 error!(
1974 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
1975 base_url, e
1976 )
1977 })?;
1978
1979 let http_store = HttpKVStore::new_kv(base_url, metrics.clone())?;
1980 info!("using local key-value store with fallback to http key-value store");
1981 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
1982 db_store,
1983 http_store,
1984 metrics,
1985 "json_rpc_fallback",
1986 )))
1987}
1988
1989pub async fn build_http_server(
2006 state: Arc<AuthorityState>,
2007 store: RocksDbStore,
2008 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2009 config: &NodeConfig,
2010 prometheus_registry: &Registry,
2011 _custom_runtime: Option<Handle>,
2012 software_version: &'static str,
2013) -> Result<Option<tokio::task::JoinHandle<()>>> {
2014 if config.consensus_config().is_some() {
2016 return Ok(None);
2017 }
2018
2019 let mut router = axum::Router::new();
2020
2021 let json_rpc_router = {
2022 let mut server = JsonRpcServerBuilder::new(
2023 env!("CARGO_PKG_VERSION"),
2024 prometheus_registry,
2025 config.policy_config.clone(),
2026 config.firewall_config.clone(),
2027 );
2028
2029 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2030
2031 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2032 server.register_module(ReadApi::new(
2033 state.clone(),
2034 kv_store.clone(),
2035 metrics.clone(),
2036 ))?;
2037 server.register_module(CoinReadApi::new(
2038 state.clone(),
2039 kv_store.clone(),
2040 metrics.clone(),
2041 )?)?;
2042
2043 if config.run_with_range.is_none() {
2046 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2047 }
2048 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2049 server.register_module(BridgeReadApi::new(state.clone(), metrics.clone()))?;
2050
2051 if let Some(transaction_orchestrator) = transaction_orchestrator {
2052 server.register_module(TransactionExecutionApi::new(
2053 state.clone(),
2054 transaction_orchestrator.clone(),
2055 metrics.clone(),
2056 ))?;
2057 }
2058
2059 let iota_names_config = config.iota_names_config.clone().unwrap_or_default();
2062
2063 server.register_module(IndexerApi::new(
2064 state.clone(),
2065 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2066 kv_store,
2067 metrics,
2068 iota_names_config,
2069 config.indexer_max_subscriptions,
2070 ))?;
2071 server.register_module(MoveUtils::new(state.clone()))?;
2072
2073 let server_type = config.jsonrpc_server_type();
2074
2075 server.to_router(server_type).await?
2076 };
2077
2078 router = router.merge(json_rpc_router);
2079
2080 if config.enable_rest_api {
2081 let mut rest_service = iota_rest_api::RestService::new(
2082 Arc::new(RestReadStore::new(state, store)),
2083 software_version,
2084 );
2085
2086 rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2087
2088 if let Some(transaction_orchestrator) = transaction_orchestrator {
2089 rest_service.with_executor(transaction_orchestrator.clone())
2090 }
2091
2092 router = router.merge(rest_service.into_router());
2093 }
2094
2095 let listener = tokio::net::TcpListener::bind(&config.json_rpc_address)
2096 .await
2097 .unwrap();
2098 let addr = listener.local_addr().unwrap();
2099
2100 router = router.layer(axum::middleware::from_fn(server_timing_middleware));
2101
2102 let handle = tokio::spawn(async move {
2103 axum::serve(
2104 listener,
2105 router.into_make_service_with_connect_info::<SocketAddr>(),
2106 )
2107 .await
2108 .unwrap()
2109 });
2110
2111 info!(local_addr =? addr, "IOTA JSON-RPC server listening on {addr}");
2112
2113 Ok(Some(handle))
2114}
2115
2116#[cfg(not(test))]
2117fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2118 protocol_config.max_transactions_per_checkpoint() as usize
2119}
2120
2121#[cfg(test)]
2122fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2123 2
2124}