1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::{BTreeSet, HashMap, HashSet},
9 fmt,
10 future::Future,
11 path::PathBuf,
12 str::FromStr,
13 sync::{Arc, Weak},
14 time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19 callback::CallbackLayer,
20 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::future::BoxFuture;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30 ConsensusConfig, NodeConfig,
31 node::{DBCheckpointConfig, RunWithRange},
32 node_config_metrics::NodeConfigMetrics,
33 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36 authority::{
37 AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38 authority_per_epoch_store::AuthorityPerEpochStore,
39 authority_store_pruner::ObjectsCompactionFilter,
40 authority_store_tables::{
41 AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42 },
43 backpressure::BackpressureManager,
44 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45 },
46 authority_aggregator::{
47 AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
48 },
49 authority_client::NetworkAuthorityClient,
50 authority_server::{ValidatorService, ValidatorServiceMetrics},
51 checkpoints::{
52 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
53 SubmitCheckpointToConsensus,
54 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
55 },
56 connection_monitor::ConnectionMonitor,
57 consensus_adapter::{
58 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
59 ConsensusClient,
60 },
61 consensus_handler::ConsensusHandlerInitializer,
62 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
63 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
64 db_checkpoint_handler::DBCheckpointHandler,
65 epoch::{
66 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
67 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
68 reconfiguration::ReconfigurationInitiator,
69 },
70 execution_cache::build_execution_cache,
71 jsonrpc_index::IndexStore,
72 module_cache_metrics::ResolverMetrics,
73 overload_monitor::overload_monitor,
74 rest_index::RestIndexStore,
75 safe_client::SafeClientMetricsBase,
76 signature_verifier::SignatureVerifierMetrics,
77 state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
78 storage::{RestReadStore, RocksDbStore},
79 traffic_controller::metrics::TrafficControllerMetrics,
80 transaction_orchestrator::TransactionOrchestrator,
81 validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87 transaction_builder_api::TransactionBuilderApi,
88 transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93 RegistryID, RegistryService,
94 hardware_metrics::register_hardware_metrics,
95 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96 server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
101};
102use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
103use iota_protocol_config::ProtocolConfig;
104use iota_rest_api::{RestMetrics, ServerVersion};
105use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
106use iota_snapshot::uploader::StateSnapshotUploader;
107use iota_storage::{
108 FileCompression, StorageFormat,
109 http_key_value_store::HttpKVStore,
110 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
111 key_value_store_metrics::KeyValueStoreMetrics,
112};
113use iota_types::{
114 base_types::{AuthorityName, ConciseableName, EpochId},
115 committee::Committee,
116 crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
117 digests::ChainIdentifier,
118 error::{IotaError, IotaResult},
119 executable_transaction::VerifiedExecutableTransaction,
120 execution_config_utils::to_binary_config,
121 full_checkpoint_content::CheckpointData,
122 iota_system_state::{
123 IotaSystemState, IotaSystemStateTrait,
124 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
125 },
126 messages_consensus::{
127 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
128 SignedAuthorityCapabilitiesV1, check_total_jwk_size,
129 },
130 messages_grpc::HandleCapabilityNotificationRequestV1,
131 quorum_driver_types::QuorumDriverEffectsQueueResult,
132 supported_protocol_versions::SupportedProtocolVersions,
133 transaction::{Transaction, VerifiedCertificate},
134};
135use prometheus::Registry;
136#[cfg(msim)]
137pub use simulator::set_jwk_injector;
138#[cfg(msim)]
139use simulator::*;
140use tap::tap::TapFallible;
141use tokio::{
142 sync::{Mutex, broadcast, mpsc, watch},
143 task::{JoinHandle, JoinSet},
144};
145use tokio_util::sync::CancellationToken;
146use tower::ServiceBuilder;
147use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
148use typed_store::{
149 DBMetrics,
150 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
151};
152
153use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
154
155pub mod admin;
156mod handle;
157pub mod metrics;
158
159pub struct ValidatorComponents {
160 validator_server_handle: SpawnOnce,
161 validator_overload_monitor_handle: Option<JoinHandle<()>>,
162 consensus_manager: ConsensusManager,
163 consensus_store_pruner: ConsensusStorePruner,
164 consensus_adapter: Arc<ConsensusAdapter>,
165 checkpoint_service_tasks: JoinSet<()>,
167 checkpoint_metrics: Arc<CheckpointMetrics>,
168 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
169 validator_registry_id: RegistryID,
170}
171
172#[cfg(msim)]
173mod simulator {
174 use std::sync::atomic::AtomicBool;
175
176 use super::*;
177
178 pub(super) struct SimState {
179 pub sim_node: iota_simulator::runtime::NodeHandle,
180 pub sim_safe_mode_expected: AtomicBool,
181 _leak_detector: iota_simulator::NodeLeakDetector,
182 }
183
184 impl Default for SimState {
185 fn default() -> Self {
186 Self {
187 sim_node: iota_simulator::runtime::NodeHandle::current(),
188 sim_safe_mode_expected: AtomicBool::new(false),
189 _leak_detector: iota_simulator::NodeLeakDetector::new(),
190 }
191 }
192 }
193
194 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
195 + Send
196 + Sync
197 + 'static;
198
199 fn default_fetch_jwks(
200 _authority: AuthorityName,
201 _provider: &OIDCProvider,
202 ) -> IotaResult<Vec<(JwkId, JWK)>> {
203 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
204 parse_jwks(
206 iota_types::zk_login_util::DEFAULT_JWK_BYTES,
207 &OIDCProvider::Twitch,
208 )
209 .map_err(|_| IotaError::JWKRetrieval)
210 }
211
212 thread_local! {
213 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
214 }
215
216 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
217 JWK_INJECTOR.with(|injector| injector.borrow().clone())
218 }
219
220 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
221 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
222 }
223}
224
225pub struct IotaNode {
226 config: NodeConfig,
227 validator_components: Mutex<Option<ValidatorComponents>>,
228 _http_server: Option<iota_http::ServerHandle>,
231 state: Arc<AuthorityState>,
232 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
233 registry_service: RegistryService,
234 metrics: Arc<IotaNodeMetrics>,
235
236 _discovery: discovery::Handle,
237 state_sync_handle: state_sync::Handle,
238 randomness_handle: randomness::Handle,
239 checkpoint_store: Arc<CheckpointStore>,
240 accumulator: Mutex<Option<Arc<StateAccumulator>>>,
241 connection_monitor_status: Arc<ConnectionMonitorStatus>,
242
243 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
245
246 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
249
250 backpressure_manager: Arc<BackpressureManager>,
251
252 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
253
254 #[cfg(msim)]
255 sim_state: SimState,
256
257 _state_archive_handle: Option<broadcast::Sender<()>>,
258
259 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
260 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
262
263 grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
265
266 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
272}
273
274impl fmt::Debug for IotaNode {
275 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
276 f.debug_struct("IotaNode")
277 .field("name", &self.state.name.concise())
278 .finish()
279 }
280}
281
282static MAX_JWK_KEYS_PER_FETCH: usize = 100;
283
284impl IotaNode {
285 pub async fn start(
286 config: NodeConfig,
287 registry_service: RegistryService,
288 ) -> Result<Arc<IotaNode>> {
289 Self::start_async(
290 config,
291 registry_service,
292 ServerVersion::new("iota-node", "unknown"),
293 )
294 .await
295 }
296
297 fn start_jwk_updater(
302 config: &NodeConfig,
303 metrics: Arc<IotaNodeMetrics>,
304 authority: AuthorityName,
305 epoch_store: Arc<AuthorityPerEpochStore>,
306 consensus_adapter: Arc<ConsensusAdapter>,
307 ) {
308 let epoch = epoch_store.epoch();
309
310 let supported_providers = config
311 .zklogin_oauth_providers
312 .get(&epoch_store.get_chain_identifier().chain())
313 .unwrap_or(&BTreeSet::new())
314 .iter()
315 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
316 .collect::<Vec<_>>();
317
318 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
319
320 info!(
321 ?fetch_interval,
322 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
323 );
324
325 fn validate_jwk(
326 metrics: &Arc<IotaNodeMetrics>,
327 provider: &OIDCProvider,
328 id: &JwkId,
329 jwk: &JWK,
330 ) -> bool {
331 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
332 warn!(
333 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
334 id.iss, provider
335 );
336 metrics
337 .invalid_jwks
338 .with_label_values(&[&provider.to_string()])
339 .inc();
340 return false;
341 };
342
343 if iss_provider != *provider {
344 warn!(
345 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
346 id.iss, provider, iss_provider
347 );
348 metrics
349 .invalid_jwks
350 .with_label_values(&[&provider.to_string()])
351 .inc();
352 return false;
353 }
354
355 if !check_total_jwk_size(id, jwk) {
356 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
357 metrics
358 .invalid_jwks
359 .with_label_values(&[&provider.to_string()])
360 .inc();
361 return false;
362 }
363
364 true
365 }
366
367 for p in supported_providers.into_iter() {
376 let provider_str = p.to_string();
377 let epoch_store = epoch_store.clone();
378 let consensus_adapter = consensus_adapter.clone();
379 let metrics = metrics.clone();
380 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
381 async move {
382 let mut seen = HashSet::new();
385 loop {
386 info!("fetching JWK for provider {:?}", p);
387 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
388 match Self::fetch_jwks(authority, &p).await {
389 Err(e) => {
390 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
391 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
392 tokio::time::sleep(Duration::from_secs(30)).await;
394 continue;
395 }
396 Ok(mut keys) => {
397 metrics.total_jwks
398 .with_label_values(&[&provider_str])
399 .inc_by(keys.len() as u64);
400
401 keys.retain(|(id, jwk)| {
402 validate_jwk(&metrics, &p, id, jwk) &&
403 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
404 seen.insert((id.clone(), jwk.clone()))
405 });
406
407 metrics.unique_jwks
408 .with_label_values(&[&provider_str])
409 .inc_by(keys.len() as u64);
410
411 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
414 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
415 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
416 }
417
418 for (id, jwk) in keys.into_iter() {
419 info!("Submitting JWK to consensus: {:?}", id);
420
421 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
422 consensus_adapter.submit(txn, None, &epoch_store)
423 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
424 .ok();
425 }
426 }
427 }
428 tokio::time::sleep(fetch_interval).await;
429 }
430 }
431 .instrument(error_span!("jwk_updater_task", epoch)),
432 ));
433 }
434 }
435
436 pub async fn start_async(
437 config: NodeConfig,
438 registry_service: RegistryService,
439 server_version: ServerVersion,
440 ) -> Result<Arc<IotaNode>> {
441 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
442 let mut config = config.clone();
443 if config.supported_protocol_versions.is_none() {
444 info!(
445 "populating config.supported_protocol_versions with default {:?}",
446 SupportedProtocolVersions::SYSTEM_DEFAULT
447 );
448 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
449 }
450
451 let run_with_range = config.run_with_range;
452 let is_validator = config.consensus_config().is_some();
453 let is_full_node = !is_validator;
454 let prometheus_registry = registry_service.default_registry();
455
456 info!(node =? config.authority_public_key(),
457 "Initializing iota-node listening on {}", config.network_address
458 );
459
460 let genesis = config.genesis()?.clone();
461
462 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
463 info!("IOTA chain identifier: {chain_identifier}");
464
465 let db_corrupted_path = &config.db_path().join("status");
467 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
468 panic!("Failed to check database corruption: {err}");
469 }
470
471 DBMetrics::init(&prometheus_registry);
473
474 iota_metrics::init_metrics(&prometheus_registry);
476 #[cfg(not(msim))]
479 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
480
481 register_hardware_metrics(®istry_service, &config.db_path)
483 .expect("Failed registering hardware metrics");
484 prometheus_registry
486 .register(iota_metrics::uptime_metric(
487 if is_validator {
488 "validator"
489 } else {
490 "fullnode"
491 },
492 server_version.version,
493 &chain_identifier.to_string(),
494 ))
495 .expect("Failed registering uptime metric");
496
497 let migration_tx_data = if genesis.contains_migrations() {
500 Some(config.load_migration_tx_data()?)
503 } else {
504 None
505 };
506
507 let secret = Arc::pin(config.authority_key_pair().copy());
508 let genesis_committee = genesis.committee()?;
509 let committee_store = Arc::new(CommitteeStore::new(
510 config.db_path().join("epochs"),
511 &genesis_committee,
512 None,
513 ));
514
515 let mut pruner_db = None;
516 if config
517 .authority_store_pruning_config
518 .enable_compaction_filter
519 {
520 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
521 &config.db_path().join("store"),
522 )));
523 }
524 let compaction_filter = pruner_db
525 .clone()
526 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
527
528 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
530 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
531 enable_write_stall,
532 compaction_filter,
533 };
534 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
535 &config.db_path().join("store"),
536 Some(perpetual_tables_options),
537 ));
538 let is_genesis = perpetual_tables
539 .database_is_empty()
540 .expect("Database read should not fail at init.");
541 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
542 let backpressure_manager =
543 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
544
545 let store = AuthorityStore::open(
546 perpetual_tables,
547 &genesis,
548 &config,
549 &prometheus_registry,
550 migration_tx_data.as_ref(),
551 )
552 .await?;
553
554 let cur_epoch = store.get_recovery_epoch_at_restart()?;
555 let committee = committee_store
556 .get_committee(&cur_epoch)?
557 .expect("Committee of the current epoch must exist");
558 let epoch_start_configuration = store
559 .get_epoch_start_configuration()?
560 .expect("EpochStartConfiguration of the current epoch must exist");
561 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
562 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
563
564 let cache_traits = build_execution_cache(
565 &config.execution_cache_config,
566 &epoch_start_configuration,
567 &prometheus_registry,
568 &store,
569 backpressure_manager.clone(),
570 );
571
572 let auth_agg = {
573 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
574 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
575 Arc::new(ArcSwap::new(Arc::new(
576 AuthorityAggregator::new_from_epoch_start_state(
577 epoch_start_configuration.epoch_start_state(),
578 &committee_store,
579 safe_client_metrics_base,
580 auth_agg_metrics,
581 ),
582 )))
583 };
584
585 let chain = match config.chain_override_for_testing {
586 Some(chain) => chain,
587 None => chain_identifier.chain(),
588 };
589
590 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
591 let epoch_store = AuthorityPerEpochStore::new(
592 config.authority_public_key(),
593 committee.clone(),
594 &config.db_path().join("store"),
595 Some(epoch_options.options),
596 EpochMetrics::new(®istry_service.default_registry()),
597 epoch_start_configuration,
598 cache_traits.backing_package_store.clone(),
599 cache_traits.object_store.clone(),
600 cache_metrics,
601 signature_verifier_metrics,
602 &config.expensive_safety_check_config,
603 (chain_identifier, chain),
604 checkpoint_store
605 .get_highest_executed_checkpoint_seq_number()
606 .expect("checkpoint store read cannot fail")
607 .unwrap_or(0),
608 )?;
609
610 info!("created epoch store");
611
612 replay_log!(
613 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
614 epoch_store.epoch(),
615 epoch_store.protocol_config()
616 );
617
618 if is_genesis {
620 info!("checking IOTA conservation at genesis");
621 cache_traits
626 .reconfig_api
627 .try_expensive_check_iota_conservation(&epoch_store, None)
628 .expect("IOTA conservation check cannot fail at genesis");
629 }
630
631 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
632 let default_buffer_stake = epoch_store
633 .protocol_config()
634 .buffer_stake_for_protocol_upgrade_bps();
635 if effective_buffer_stake != default_buffer_stake {
636 warn!(
637 ?effective_buffer_stake,
638 ?default_buffer_stake,
639 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
640 );
641 }
642
643 checkpoint_store.insert_genesis_checkpoint(
644 genesis.checkpoint(),
645 genesis.checkpoint_contents().clone(),
646 &epoch_store,
647 );
648
649 unmark_db_corruption(db_corrupted_path)?;
651
652 info!("creating state sync store");
653 let state_sync_store = RocksDbStore::new(
654 cache_traits.clone(),
655 committee_store.clone(),
656 checkpoint_store.clone(),
657 );
658
659 let index_store = if is_full_node && config.enable_index_processing {
660 info!("creating index store");
661 Some(Arc::new(IndexStore::new(
662 config.db_path().join("indexes"),
663 &prometheus_registry,
664 epoch_store
665 .protocol_config()
666 .max_move_identifier_len_as_option(),
667 )))
668 } else {
669 None
670 };
671
672 let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
673 {
674 Some(Arc::new(
675 RestIndexStore::new(
676 config.db_path().join("rest_index"),
677 &store,
678 &checkpoint_store,
679 &epoch_store,
680 &cache_traits.backing_package_store,
681 )
682 .await,
683 ))
684 } else {
685 None
686 };
687
688 info!("creating archive reader");
689 let archive_readers =
694 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
695 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
696 let (randomness_tx, randomness_rx) = mpsc::channel(
697 config
698 .p2p_config
699 .randomness
700 .clone()
701 .unwrap_or_default()
702 .mailbox_capacity(),
703 );
704 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
705 Self::create_p2p_network(
706 &config,
707 state_sync_store.clone(),
708 chain_identifier,
709 trusted_peer_change_rx,
710 archive_readers.clone(),
711 randomness_tx,
712 &prometheus_registry,
713 )?;
714
715 send_trusted_peer_change(
718 &config,
719 &trusted_peer_change_tx,
720 epoch_store.epoch_start_state(),
721 );
722
723 info!("start state archival");
724 let state_archive_handle =
726 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
727 .await?;
728
729 info!("start snapshot upload");
730 let state_snapshot_handle =
732 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
733
734 info!("start db checkpoint");
736 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
737 &config,
738 &prometheus_registry,
739 state_snapshot_handle.is_some(),
740 )?;
741
742 let mut genesis_objects = genesis.objects().to_vec();
743 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
744 genesis_objects.extend(migration_tx_data.get_objects());
745 }
746
747 let authority_name = config.authority_public_key();
748 let validator_tx_finalizer =
749 config
750 .enable_validator_tx_finalizer
751 .then_some(Arc::new(ValidatorTxFinalizer::new(
752 auth_agg.clone(),
753 authority_name,
754 &prometheus_registry,
755 )));
756
757 info!("create authority state");
758 let state = AuthorityState::new(
759 authority_name,
760 secret,
761 config.supported_protocol_versions.unwrap(),
762 store.clone(),
763 cache_traits.clone(),
764 epoch_store.clone(),
765 committee_store.clone(),
766 index_store.clone(),
767 rest_index,
768 checkpoint_store.clone(),
769 &prometheus_registry,
770 &genesis_objects,
771 &db_checkpoint_config,
772 config.clone(),
773 archive_readers,
774 validator_tx_finalizer,
775 chain_identifier,
776 pruner_db,
777 )
778 .await;
779
780 if epoch_store.epoch() == 0 {
782 let genesis_tx = &genesis.transaction();
783 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
784 Self::execute_transaction_immediately_at_zero_epoch(
786 &state,
787 &epoch_store,
788 genesis_tx,
789 span,
790 )
791 .await;
792
793 if let Some(migration_tx_data) = migration_tx_data {
795 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
796 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
797 Self::execute_transaction_immediately_at_zero_epoch(
798 &state,
799 &epoch_store,
800 tx,
801 span,
802 )
803 .await;
804 }
805 }
806 }
807
808 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
811
812 if config
813 .expensive_safety_check_config
814 .enable_secondary_index_checks()
815 {
816 if let Some(indexes) = state.indexes.clone() {
817 iota_core::verify_indexes::verify_indexes(
818 state.get_accumulator_store().as_ref(),
819 indexes,
820 )
821 .expect("secondary indexes are inconsistent");
822 }
823 }
824
825 let (end_of_epoch_channel, end_of_epoch_receiver) =
826 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
827
828 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
829 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
830 auth_agg.load_full(),
831 state.clone(),
832 end_of_epoch_receiver,
833 &config.db_path(),
834 &prometheus_registry,
835 )))
836 } else {
837 None
838 };
839
840 let http_server = build_http_server(
841 state.clone(),
842 state_sync_store.clone(),
843 &transaction_orchestrator.clone(),
844 &config,
845 &prometheus_registry,
846 server_version.clone(),
847 )
848 .await?;
849
850 let accumulator = Arc::new(StateAccumulator::new(
851 cache_traits.accumulator_store.clone(),
852 StateAccumulatorMetrics::new(&prometheus_registry),
853 ));
854
855 let authority_names_to_peer_ids = epoch_store
856 .epoch_start_state()
857 .get_authority_names_to_peer_ids();
858
859 let network_connection_metrics =
860 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
861
862 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
863
864 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
865 p2p_network.downgrade(),
866 network_connection_metrics,
867 HashMap::new(),
868 None,
869 );
870
871 let connection_monitor_status = ConnectionMonitorStatus {
872 connection_statuses,
873 authority_names_to_peer_ids,
874 };
875
876 let connection_monitor_status = Arc::new(connection_monitor_status);
877 let iota_node_metrics =
878 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
879
880 let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
884 transaction_orchestrator
885 .clone()
886 .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
887
888 let grpc_server_handle = build_grpc_server(
889 &config,
890 state.clone(),
891 state_sync_store.clone(),
892 executor,
893 ®istry_service.default_registry(),
894 server_version,
895 )
896 .await?;
897
898 let validator_components = if state.is_committee_validator(&epoch_store) {
899 let (components, _) = futures::join!(
900 Self::construct_validator_components(
901 config.clone(),
902 state.clone(),
903 committee,
904 epoch_store.clone(),
905 checkpoint_store.clone(),
906 state_sync_handle.clone(),
907 randomness_handle.clone(),
908 Arc::downgrade(&accumulator),
909 backpressure_manager.clone(),
910 connection_monitor_status.clone(),
911 ®istry_service,
912 iota_node_metrics.clone(),
913 ),
914 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
915 );
916 let mut components = components?;
917
918 components.consensus_adapter.submit_recovered(&epoch_store);
919
920 components.validator_server_handle = components.validator_server_handle.start().await;
922
923 Some(components)
924 } else {
925 None
926 };
927
928 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
930
931 let node = Self {
932 config,
933 validator_components: Mutex::new(validator_components),
934 _http_server: http_server,
935 state,
936 transaction_orchestrator,
937 registry_service,
938 metrics: iota_node_metrics,
939
940 _discovery: discovery_handle,
941 state_sync_handle,
942 randomness_handle,
943 checkpoint_store,
944 accumulator: Mutex::new(Some(accumulator)),
945 end_of_epoch_channel,
946 connection_monitor_status,
947 trusted_peer_change_tx,
948 backpressure_manager,
949
950 _db_checkpoint_handle: db_checkpoint_handle,
951
952 #[cfg(msim)]
953 sim_state: Default::default(),
954
955 _state_archive_handle: state_archive_handle,
956 _state_snapshot_uploader_handle: state_snapshot_handle,
957 shutdown_channel_tx: shutdown_channel,
958
959 grpc_server_handle: Mutex::new(grpc_server_handle),
960
961 auth_agg,
962 };
963
964 info!("IotaNode started!");
965 let node = Arc::new(node);
966 let node_copy = node.clone();
967 spawn_monitored_task!(async move {
968 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
969 if let Err(error) = result {
970 warn!("Reconfiguration finished with error {:?}", error);
971 }
972 });
973
974 Ok(node)
975 }
976
977 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
978 self.end_of_epoch_channel.subscribe()
979 }
980
981 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
982 self.shutdown_channel_tx.subscribe()
983 }
984
985 pub fn current_epoch_for_testing(&self) -> EpochId {
986 self.state.current_epoch_for_testing()
987 }
988
989 pub fn db_checkpoint_path(&self) -> PathBuf {
990 self.config.db_checkpoint_path()
991 }
992
993 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
995 info!("close_epoch (current epoch = {})", epoch_store.epoch());
996 self.validator_components
997 .lock()
998 .await
999 .as_ref()
1000 .ok_or_else(|| IotaError::from("Node is not a validator"))?
1001 .consensus_adapter
1002 .close_epoch(epoch_store);
1003 Ok(())
1004 }
1005
1006 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
1007 self.state
1008 .clear_override_protocol_upgrade_buffer_stake(epoch)
1009 }
1010
1011 pub fn set_override_protocol_upgrade_buffer_stake(
1012 &self,
1013 epoch: EpochId,
1014 buffer_stake_bps: u64,
1015 ) -> IotaResult {
1016 self.state
1017 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1018 }
1019
1020 pub async fn close_epoch_for_testing(&self) -> IotaResult {
1023 let epoch_store = self.state.epoch_store_for_testing();
1024 self.close_epoch(&epoch_store).await
1025 }
1026
1027 async fn start_state_archival(
1028 config: &NodeConfig,
1029 prometheus_registry: &Registry,
1030 state_sync_store: RocksDbStore,
1031 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1032 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1033 let local_store_config = ObjectStoreConfig {
1034 object_store: Some(ObjectStoreType::File),
1035 directory: Some(config.archive_path()),
1036 ..Default::default()
1037 };
1038 let archive_writer = ArchiveWriter::new(
1039 local_store_config,
1040 remote_store_config.clone(),
1041 FileCompression::Zstd,
1042 StorageFormat::Blob,
1043 Duration::from_secs(600),
1044 256 * 1024 * 1024,
1045 prometheus_registry,
1046 )
1047 .await?;
1048 Ok(Some(archive_writer.start(state_sync_store).await?))
1049 } else {
1050 Ok(None)
1051 }
1052 }
1053
1054 fn start_state_snapshot(
1057 config: &NodeConfig,
1058 prometheus_registry: &Registry,
1059 checkpoint_store: Arc<CheckpointStore>,
1060 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1061 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1062 let snapshot_uploader = StateSnapshotUploader::new(
1063 &config.db_checkpoint_path(),
1064 &config.snapshot_path(),
1065 remote_store_config.clone(),
1066 60,
1067 prometheus_registry,
1068 checkpoint_store,
1069 )?;
1070 Ok(Some(snapshot_uploader.start()))
1071 } else {
1072 Ok(None)
1073 }
1074 }
1075
1076 fn start_db_checkpoint(
1077 config: &NodeConfig,
1078 prometheus_registry: &Registry,
1079 state_snapshot_enabled: bool,
1080 ) -> Result<(
1081 DBCheckpointConfig,
1082 Option<tokio::sync::broadcast::Sender<()>>,
1083 )> {
1084 let checkpoint_path = Some(
1085 config
1086 .db_checkpoint_config
1087 .checkpoint_path
1088 .clone()
1089 .unwrap_or_else(|| config.db_checkpoint_path()),
1090 );
1091 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1092 DBCheckpointConfig {
1093 checkpoint_path,
1094 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1095 true
1096 } else {
1097 config
1098 .db_checkpoint_config
1099 .perform_db_checkpoints_at_epoch_end
1100 },
1101 ..config.db_checkpoint_config.clone()
1102 }
1103 } else {
1104 config.db_checkpoint_config.clone()
1105 };
1106
1107 match (
1108 db_checkpoint_config.object_store_config.as_ref(),
1109 state_snapshot_enabled,
1110 ) {
1111 (None, false) => Ok((db_checkpoint_config, None)),
1116 (_, _) => {
1117 let handler = DBCheckpointHandler::new(
1118 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1119 db_checkpoint_config.object_store_config.as_ref(),
1120 60,
1121 db_checkpoint_config
1122 .prune_and_compact_before_upload
1123 .unwrap_or(true),
1124 config.authority_store_pruning_config.clone(),
1125 prometheus_registry,
1126 state_snapshot_enabled,
1127 )?;
1128 Ok((
1129 db_checkpoint_config,
1130 Some(DBCheckpointHandler::start(handler)),
1131 ))
1132 }
1133 }
1134 }
1135
1136 fn create_p2p_network(
1137 config: &NodeConfig,
1138 state_sync_store: RocksDbStore,
1139 chain_identifier: ChainIdentifier,
1140 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1141 archive_readers: ArchiveReaderBalancer,
1142 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1143 prometheus_registry: &Registry,
1144 ) -> Result<(
1145 Network,
1146 discovery::Handle,
1147 state_sync::Handle,
1148 randomness::Handle,
1149 )> {
1150 let (state_sync, state_sync_server) = state_sync::Builder::new()
1151 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1152 .store(state_sync_store)
1153 .archive_readers(archive_readers)
1154 .with_metrics(prometheus_registry)
1155 .build();
1156
1157 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1158 .config(config.p2p_config.clone())
1159 .build();
1160
1161 let (randomness, randomness_router) =
1162 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1163 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1164 .with_metrics(prometheus_registry)
1165 .build();
1166
1167 let p2p_network = {
1168 let routes = anemo::Router::new()
1169 .add_rpc_service(discovery_server)
1170 .add_rpc_service(state_sync_server);
1171 let routes = routes.merge(randomness_router);
1172
1173 let inbound_network_metrics =
1174 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1175 let outbound_network_metrics =
1176 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1177
1178 let service = ServiceBuilder::new()
1179 .layer(
1180 TraceLayer::new_for_server_errors()
1181 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1182 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1183 )
1184 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1185 Arc::new(inbound_network_metrics),
1186 config.p2p_config.excessive_message_size(),
1187 )))
1188 .service(routes);
1189
1190 let outbound_layer = ServiceBuilder::new()
1191 .layer(
1192 TraceLayer::new_for_client_and_server_errors()
1193 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1194 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1195 )
1196 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1197 Arc::new(outbound_network_metrics),
1198 config.p2p_config.excessive_message_size(),
1199 )))
1200 .into_inner();
1201
1202 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1203 anemo_config.max_frame_size = Some(1 << 30);
1206
1207 let mut quic_config = anemo_config.quic.unwrap_or_default();
1210 if quic_config.socket_send_buffer_size.is_none() {
1211 quic_config.socket_send_buffer_size = Some(20 << 20);
1212 }
1213 if quic_config.socket_receive_buffer_size.is_none() {
1214 quic_config.socket_receive_buffer_size = Some(20 << 20);
1215 }
1216 quic_config.allow_failed_socket_buffer_size_setting = true;
1217
1218 if quic_config.max_concurrent_bidi_streams.is_none() {
1221 quic_config.max_concurrent_bidi_streams = Some(500);
1222 }
1223 if quic_config.max_concurrent_uni_streams.is_none() {
1224 quic_config.max_concurrent_uni_streams = Some(500);
1225 }
1226 if quic_config.stream_receive_window.is_none() {
1227 quic_config.stream_receive_window = Some(100 << 20);
1228 }
1229 if quic_config.receive_window.is_none() {
1230 quic_config.receive_window = Some(200 << 20);
1231 }
1232 if quic_config.send_window.is_none() {
1233 quic_config.send_window = Some(200 << 20);
1234 }
1235 if quic_config.crypto_buffer_size.is_none() {
1236 quic_config.crypto_buffer_size = Some(1 << 20);
1237 }
1238 if quic_config.max_idle_timeout_ms.is_none() {
1239 quic_config.max_idle_timeout_ms = Some(30_000);
1240 }
1241 if quic_config.keep_alive_interval_ms.is_none() {
1242 quic_config.keep_alive_interval_ms = Some(5_000);
1243 }
1244 anemo_config.quic = Some(quic_config);
1245
1246 let server_name = format!("iota-{chain_identifier}");
1247 let network = Network::bind(config.p2p_config.listen_address)
1248 .server_name(&server_name)
1249 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1250 .config(anemo_config)
1251 .outbound_request_layer(outbound_layer)
1252 .start(service)?;
1253 info!(
1254 server_name = server_name,
1255 "P2p network started on {}",
1256 network.local_addr()
1257 );
1258
1259 network
1260 };
1261
1262 let discovery_handle =
1263 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1264 let state_sync_handle = state_sync.start(p2p_network.clone());
1265 let randomness_handle = randomness.start(p2p_network.clone());
1266
1267 Ok((
1268 p2p_network,
1269 discovery_handle,
1270 state_sync_handle,
1271 randomness_handle,
1272 ))
1273 }
1274
1275 async fn construct_validator_components(
1278 config: NodeConfig,
1279 state: Arc<AuthorityState>,
1280 committee: Arc<Committee>,
1281 epoch_store: Arc<AuthorityPerEpochStore>,
1282 checkpoint_store: Arc<CheckpointStore>,
1283 state_sync_handle: state_sync::Handle,
1284 randomness_handle: randomness::Handle,
1285 accumulator: Weak<StateAccumulator>,
1286 backpressure_manager: Arc<BackpressureManager>,
1287 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1288 registry_service: &RegistryService,
1289 iota_node_metrics: Arc<IotaNodeMetrics>,
1290 ) -> Result<ValidatorComponents> {
1291 let mut config_clone = config.clone();
1292 let consensus_config = config_clone
1293 .consensus_config
1294 .as_mut()
1295 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1296 let validator_registry = Registry::new();
1297 let validator_registry_id = registry_service.add(validator_registry.clone());
1298
1299 let client = Arc::new(UpdatableConsensusClient::new());
1300 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1301 &committee,
1302 consensus_config,
1303 state.name,
1304 connection_monitor_status.clone(),
1305 &validator_registry,
1306 client.clone(),
1307 checkpoint_store.clone(),
1308 ));
1309 let consensus_manager = ConsensusManager::new(
1310 &config,
1311 consensus_config,
1312 registry_service,
1313 &validator_registry,
1314 client,
1315 );
1316
1317 let consensus_store_pruner = ConsensusStorePruner::new(
1320 consensus_manager.get_storage_base_path(),
1321 consensus_config.db_retention_epochs(),
1322 consensus_config.db_pruner_period(),
1323 &validator_registry,
1324 );
1325
1326 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1327 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1328
1329 let validator_server_handle = Self::start_grpc_validator_service(
1330 &config,
1331 state.clone(),
1332 consensus_adapter.clone(),
1333 &validator_registry,
1334 )
1335 .await?;
1336
1337 let validator_overload_monitor_handle = if config
1340 .authority_overload_config
1341 .max_load_shedding_percentage
1342 > 0
1343 {
1344 let authority_state = Arc::downgrade(&state);
1345 let overload_config = config.authority_overload_config.clone();
1346 fail_point!("starting_overload_monitor");
1347 Some(spawn_monitored_task!(overload_monitor(
1348 authority_state,
1349 overload_config,
1350 )))
1351 } else {
1352 None
1353 };
1354
1355 Self::start_epoch_specific_validator_components(
1356 &config,
1357 state.clone(),
1358 consensus_adapter,
1359 checkpoint_store,
1360 epoch_store,
1361 state_sync_handle,
1362 randomness_handle,
1363 consensus_manager,
1364 consensus_store_pruner,
1365 accumulator,
1366 backpressure_manager,
1367 validator_server_handle,
1368 validator_overload_monitor_handle,
1369 checkpoint_metrics,
1370 iota_node_metrics,
1371 iota_tx_validator_metrics,
1372 validator_registry_id,
1373 )
1374 .await
1375 }
1376
1377 async fn start_epoch_specific_validator_components(
1380 config: &NodeConfig,
1381 state: Arc<AuthorityState>,
1382 consensus_adapter: Arc<ConsensusAdapter>,
1383 checkpoint_store: Arc<CheckpointStore>,
1384 epoch_store: Arc<AuthorityPerEpochStore>,
1385 state_sync_handle: state_sync::Handle,
1386 randomness_handle: randomness::Handle,
1387 consensus_manager: ConsensusManager,
1388 consensus_store_pruner: ConsensusStorePruner,
1389 accumulator: Weak<StateAccumulator>,
1390 backpressure_manager: Arc<BackpressureManager>,
1391 validator_server_handle: SpawnOnce,
1392 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1393 checkpoint_metrics: Arc<CheckpointMetrics>,
1394 iota_node_metrics: Arc<IotaNodeMetrics>,
1395 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1396 validator_registry_id: RegistryID,
1397 ) -> Result<ValidatorComponents> {
1398 let checkpoint_service = Self::build_checkpoint_service(
1399 config,
1400 consensus_adapter.clone(),
1401 checkpoint_store.clone(),
1402 epoch_store.clone(),
1403 state.clone(),
1404 state_sync_handle,
1405 accumulator,
1406 checkpoint_metrics.clone(),
1407 );
1408
1409 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1414
1415 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1416
1417 let randomness_manager = RandomnessManager::try_new(
1418 Arc::downgrade(&epoch_store),
1419 Box::new(consensus_adapter.clone()),
1420 randomness_handle,
1421 config.authority_key_pair(),
1422 )
1423 .await;
1424 if let Some(randomness_manager) = randomness_manager {
1425 epoch_store
1426 .set_randomness_manager(randomness_manager)
1427 .await?;
1428 }
1429
1430 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1431 state.clone(),
1432 checkpoint_service.clone(),
1433 epoch_store.clone(),
1434 low_scoring_authorities,
1435 backpressure_manager,
1436 );
1437
1438 info!("Starting consensus manager");
1439
1440 consensus_manager
1441 .start(
1442 config,
1443 epoch_store.clone(),
1444 consensus_handler_initializer,
1445 IotaTxValidator::new(
1446 epoch_store.clone(),
1447 checkpoint_service.clone(),
1448 state.transaction_manager().clone(),
1449 iota_tx_validator_metrics.clone(),
1450 ),
1451 )
1452 .await;
1453
1454 info!("Spawning checkpoint service");
1455 let checkpoint_service_tasks = checkpoint_service.spawn().await;
1456
1457 if epoch_store.authenticator_state_enabled() {
1458 Self::start_jwk_updater(
1459 config,
1460 iota_node_metrics,
1461 state.name,
1462 epoch_store.clone(),
1463 consensus_adapter.clone(),
1464 );
1465 }
1466
1467 Ok(ValidatorComponents {
1468 validator_server_handle,
1469 validator_overload_monitor_handle,
1470 consensus_manager,
1471 consensus_store_pruner,
1472 consensus_adapter,
1473 checkpoint_service_tasks,
1474 checkpoint_metrics,
1475 iota_tx_validator_metrics,
1476 validator_registry_id,
1477 })
1478 }
1479
1480 fn build_checkpoint_service(
1487 config: &NodeConfig,
1488 consensus_adapter: Arc<ConsensusAdapter>,
1489 checkpoint_store: Arc<CheckpointStore>,
1490 epoch_store: Arc<AuthorityPerEpochStore>,
1491 state: Arc<AuthorityState>,
1492 state_sync_handle: state_sync::Handle,
1493 accumulator: Weak<StateAccumulator>,
1494 checkpoint_metrics: Arc<CheckpointMetrics>,
1495 ) -> Arc<CheckpointService> {
1496 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1497 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1498
1499 debug!(
1500 "Starting checkpoint service with epoch start timestamp {}
1501 and epoch duration {}",
1502 epoch_start_timestamp_ms, epoch_duration_ms
1503 );
1504
1505 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1506 sender: consensus_adapter,
1507 signer: state.secret.clone(),
1508 authority: config.authority_public_key(),
1509 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1510 .checked_add(epoch_duration_ms)
1511 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1512 metrics: checkpoint_metrics.clone(),
1513 });
1514
1515 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1516 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1517 let max_checkpoint_size_bytes =
1518 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1519
1520 CheckpointService::build(
1521 state.clone(),
1522 checkpoint_store,
1523 epoch_store,
1524 state.get_transaction_cache_reader().clone(),
1525 accumulator,
1526 checkpoint_output,
1527 Box::new(certified_checkpoint_output),
1528 checkpoint_metrics,
1529 max_tx_per_checkpoint,
1530 max_checkpoint_size_bytes,
1531 )
1532 }
1533
1534 fn construct_consensus_adapter(
1535 committee: &Committee,
1536 consensus_config: &ConsensusConfig,
1537 authority: AuthorityName,
1538 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1539 prometheus_registry: &Registry,
1540 consensus_client: Arc<dyn ConsensusClient>,
1541 checkpoint_store: Arc<CheckpointStore>,
1542 ) -> ConsensusAdapter {
1543 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1544 ConsensusAdapter::new(
1548 consensus_client,
1549 checkpoint_store,
1550 authority,
1551 connection_monitor_status,
1552 consensus_config.max_pending_transactions(),
1553 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1554 consensus_config.max_submit_position,
1555 consensus_config.submit_delay_step_override(),
1556 ca_metrics,
1557 )
1558 }
1559
1560 async fn start_grpc_validator_service(
1561 config: &NodeConfig,
1562 state: Arc<AuthorityState>,
1563 consensus_adapter: Arc<ConsensusAdapter>,
1564 prometheus_registry: &Registry,
1565 ) -> Result<SpawnOnce> {
1566 let validator_service = ValidatorService::new(
1567 state,
1568 consensus_adapter,
1569 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1570 TrafficControllerMetrics::new(prometheus_registry),
1571 config.policy_config.clone(),
1572 config.firewall_config.clone(),
1573 );
1574
1575 let mut server_conf = iota_network_stack::config::Config::new();
1576 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1577 server_conf.load_shed = config.grpc_load_shed;
1578 let server_builder =
1579 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1580 .add_service(ValidatorServer::new(validator_service));
1581
1582 let tls_config = iota_tls::create_rustls_server_config(
1583 config.network_key_pair().copy().private(),
1584 IOTA_TLS_SERVER_NAME.to_string(),
1585 );
1586
1587 let network_address = config.network_address().clone();
1588
1589 let bind_future = async move {
1590 let server = server_builder
1591 .bind(&network_address, Some(tls_config))
1592 .await
1593 .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1594
1595 let local_addr = server.local_addr();
1596 info!("Listening to traffic on {local_addr}");
1597
1598 Ok(server)
1599 };
1600
1601 Ok(SpawnOnce::new(bind_future))
1602 }
1603
1604 async fn reexecute_pending_consensus_certs(
1622 epoch_store: &Arc<AuthorityPerEpochStore>,
1623 state: &Arc<AuthorityState>,
1624 ) {
1625 let mut pending_consensus_certificates = Vec::new();
1626 let mut additional_certs = Vec::new();
1627
1628 for tx in epoch_store.get_all_pending_consensus_transactions() {
1629 match tx.kind {
1630 ConsensusTransactionKind::CertifiedTransaction(tx)
1633 if !tx.contains_shared_object() =>
1634 {
1635 let tx = *tx;
1636 let tx = VerifiedExecutableTransaction::new_from_certificate(
1639 VerifiedCertificate::new_unchecked(tx),
1640 );
1641 if let Some(fx_digest) = epoch_store
1644 .get_signed_effects_digest(tx.digest())
1645 .expect("db error")
1646 {
1647 pending_consensus_certificates.push((tx, fx_digest));
1648 } else {
1649 additional_certs.push(tx);
1650 }
1651 }
1652 _ => (),
1653 }
1654 }
1655
1656 let digests = pending_consensus_certificates
1657 .iter()
1658 .map(|(tx, _)| *tx.digest())
1659 .collect::<Vec<_>>();
1660
1661 info!(
1662 "reexecuting {} pending consensus certificates: {:?}",
1663 digests.len(),
1664 digests
1665 );
1666
1667 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1668 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1669
1670 let timeout = if cfg!(msim) { 120 } else { 60 };
1676 if tokio::time::timeout(
1677 std::time::Duration::from_secs(timeout),
1678 state
1679 .get_transaction_cache_reader()
1680 .try_notify_read_executed_effects_digests(&digests),
1681 )
1682 .await
1683 .is_err()
1684 {
1685 if let Ok(executed_effects_digests) = state
1687 .get_transaction_cache_reader()
1688 .try_multi_get_executed_effects_digests(&digests)
1689 {
1690 let pending_digests = digests
1691 .iter()
1692 .zip(executed_effects_digests.iter())
1693 .filter_map(|(digest, executed_effects_digest)| {
1694 if executed_effects_digest.is_none() {
1695 Some(digest)
1696 } else {
1697 None
1698 }
1699 })
1700 .collect::<Vec<_>>();
1701 debug_fatal!(
1702 "Timed out waiting for effects digests to be executed: {:?}",
1703 pending_digests
1704 );
1705 } else {
1706 debug_fatal!(
1707 "Timed out waiting for effects digests to be executed, digests not found"
1708 );
1709 }
1710 }
1711 }
1712
1713 pub fn state(&self) -> Arc<AuthorityState> {
1714 self.state.clone()
1715 }
1716
1717 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1719 self.state.reference_gas_price_for_testing()
1720 }
1721
1722 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1723 self.state.committee_store().clone()
1724 }
1725
1726 pub fn clone_authority_aggregator(
1736 &self,
1737 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1738 self.transaction_orchestrator
1739 .as_ref()
1740 .map(|to| to.clone_authority_aggregator())
1741 }
1742
1743 pub fn transaction_orchestrator(
1744 &self,
1745 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1746 self.transaction_orchestrator.clone()
1747 }
1748
1749 pub fn subscribe_to_transaction_orchestrator_effects(
1750 &self,
1751 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1752 self.transaction_orchestrator
1753 .as_ref()
1754 .map(|to| to.subscribe_to_effects_queue())
1755 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1756 }
1757
1758 pub async fn monitor_reconfiguration(
1764 self: Arc<Self>,
1765 mut epoch_store: Arc<AuthorityPerEpochStore>,
1766 ) -> Result<()> {
1767 let checkpoint_executor_metrics =
1768 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1769
1770 loop {
1771 let mut accumulator_guard = self.accumulator.lock().await;
1772 let accumulator = accumulator_guard.take().unwrap();
1773 info!(
1774 "Creating checkpoint executor for epoch {}",
1775 epoch_store.epoch()
1776 );
1777
1778 let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1780 guard.as_ref().map(|handle| {
1781 let tx = handle.checkpoint_data_broadcaster().clone();
1782 Box::new(move |data: &CheckpointData| {
1783 tx.send_traced(data);
1784 }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1785 })
1786 } else {
1787 None
1788 };
1789
1790 let checkpoint_executor = CheckpointExecutor::new(
1791 epoch_store.clone(),
1792 self.checkpoint_store.clone(),
1793 self.state.clone(),
1794 accumulator.clone(),
1795 self.backpressure_manager.clone(),
1796 self.config.checkpoint_executor_config.clone(),
1797 checkpoint_executor_metrics.clone(),
1798 data_sender,
1799 );
1800
1801 let run_with_range = self.config.run_with_range;
1802
1803 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1804
1805 if let Some(components) = &*self.validator_components.lock().await {
1807 tokio::time::sleep(Duration::from_millis(1)).await;
1809
1810 let config = cur_epoch_store.protocol_config();
1811 let binary_config = to_binary_config(config);
1812 let transaction = ConsensusTransaction::new_capability_notification_v1(
1813 AuthorityCapabilitiesV1::new(
1814 self.state.name,
1815 cur_epoch_store.get_chain_identifier().chain(),
1816 self.config
1817 .supported_protocol_versions
1818 .expect("Supported versions should be populated")
1819 .truncate_below(config.version),
1821 self.state
1822 .get_available_system_packages(&binary_config)
1823 .await,
1824 ),
1825 );
1826 info!(?transaction, "submitting capabilities to consensus");
1827 components
1828 .consensus_adapter
1829 .submit(transaction, None, &cur_epoch_store)?;
1830 } else if self.state.is_active_validator(&cur_epoch_store)
1831 && cur_epoch_store
1832 .protocol_config()
1833 .track_non_committee_eligible_validators()
1834 {
1835 let epoch_store = cur_epoch_store.clone();
1839 let node_clone = self.clone();
1840 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1841 node_clone
1842 .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1843 .instrument(trace_span!(
1844 "send_signed_capability_notification_to_committee_with_retry"
1845 ))
1846 .await;
1847 }));
1848 }
1849
1850 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1851
1852 if stop_condition == StopReason::RunWithRangeCondition {
1853 IotaNode::shutdown(&self).await;
1854 self.shutdown_channel_tx
1855 .send(run_with_range)
1856 .expect("RunWithRangeCondition met but failed to send shutdown message");
1857 return Ok(());
1858 }
1859
1860 let latest_system_state = self
1862 .state
1863 .get_object_cache_reader()
1864 .try_get_iota_system_state_object_unsafe()
1865 .expect("Read IOTA System State object cannot fail");
1866
1867 #[cfg(msim)]
1868 if !self
1869 .sim_state
1870 .sim_safe_mode_expected
1871 .load(Ordering::Relaxed)
1872 {
1873 debug_assert!(!latest_system_state.safe_mode());
1874 }
1875
1876 #[cfg(not(msim))]
1877 debug_assert!(!latest_system_state.safe_mode());
1878
1879 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1880 if self.state.is_fullnode(&cur_epoch_store) {
1881 warn!(
1882 "Failed to send end of epoch notification to subscriber: {:?}",
1883 err
1884 );
1885 }
1886 }
1887
1888 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1889 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1890
1891 self.auth_agg.store(Arc::new(
1892 self.auth_agg
1893 .load()
1894 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1895 ));
1896
1897 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1898 let next_epoch = next_epoch_committee.epoch();
1899 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1900
1901 info!(
1902 next_epoch,
1903 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1904 );
1905
1906 fail_point_async!("reconfig_delay");
1907
1908 let authority_names_to_peer_ids =
1913 new_epoch_start_state.get_authority_names_to_peer_ids();
1914 self.connection_monitor_status
1915 .update_mapping_for_epoch(authority_names_to_peer_ids);
1916
1917 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1918
1919 send_trusted_peer_change(
1920 &self.config,
1921 &self.trusted_peer_change_tx,
1922 &new_epoch_start_state,
1923 );
1924
1925 let mut validator_components_lock_guard = self.validator_components.lock().await;
1926
1927 let new_epoch_store = self
1931 .reconfigure_state(
1932 &self.state,
1933 &cur_epoch_store,
1934 next_epoch_committee.clone(),
1935 new_epoch_start_state,
1936 accumulator.clone(),
1937 )
1938 .await?;
1939
1940 let new_validator_components = if let Some(ValidatorComponents {
1941 validator_server_handle,
1942 validator_overload_monitor_handle,
1943 consensus_manager,
1944 consensus_store_pruner,
1945 consensus_adapter,
1946 mut checkpoint_service_tasks,
1947 checkpoint_metrics,
1948 iota_tx_validator_metrics,
1949 validator_registry_id,
1950 }) = validator_components_lock_guard.take()
1951 {
1952 info!("Reconfiguring the validator.");
1953 checkpoint_service_tasks.abort_all();
1958 while let Some(result) = checkpoint_service_tasks.join_next().await {
1959 if let Err(err) = result {
1960 if err.is_panic() {
1961 std::panic::resume_unwind(err.into_panic());
1962 }
1963 warn!("Error in checkpoint service task: {:?}", err);
1964 }
1965 }
1966 info!("Checkpoint service has shut down.");
1967
1968 consensus_manager.shutdown().await;
1969 info!("Consensus has shut down.");
1970
1971 info!("Epoch store finished reconfiguration.");
1972
1973 let accumulator_metrics = Arc::into_inner(accumulator)
1976 .expect("Accumulator should have no other references at this point")
1977 .metrics();
1978 let new_accumulator = Arc::new(StateAccumulator::new(
1979 self.state.get_accumulator_store().clone(),
1980 accumulator_metrics,
1981 ));
1982 let weak_accumulator = Arc::downgrade(&new_accumulator);
1983 *accumulator_guard = Some(new_accumulator);
1984
1985 consensus_store_pruner.prune(next_epoch).await;
1986
1987 if self.state.is_committee_validator(&new_epoch_store) {
1988 Some(
1990 Self::start_epoch_specific_validator_components(
1991 &self.config,
1992 self.state.clone(),
1993 consensus_adapter,
1994 self.checkpoint_store.clone(),
1995 new_epoch_store.clone(),
1996 self.state_sync_handle.clone(),
1997 self.randomness_handle.clone(),
1998 consensus_manager,
1999 consensus_store_pruner,
2000 weak_accumulator,
2001 self.backpressure_manager.clone(),
2002 validator_server_handle,
2003 validator_overload_monitor_handle,
2004 checkpoint_metrics,
2005 self.metrics.clone(),
2006 iota_tx_validator_metrics,
2007 validator_registry_id,
2008 )
2009 .await?,
2010 )
2011 } else {
2012 info!("This node is no longer a validator after reconfiguration");
2013 if self.registry_service.remove(validator_registry_id) {
2014 debug!("Removed validator metrics registry");
2015 } else {
2016 warn!("Failed to remove validator metrics registry");
2017 }
2018 validator_server_handle.shutdown();
2019 debug!("Validator grpc server shutdown triggered");
2020
2021 None
2022 }
2023 } else {
2024 let accumulator_metrics = Arc::into_inner(accumulator)
2027 .expect("Accumulator should have no other references at this point")
2028 .metrics();
2029 let new_accumulator = Arc::new(StateAccumulator::new(
2030 self.state.get_accumulator_store().clone(),
2031 accumulator_metrics,
2032 ));
2033 let weak_accumulator = Arc::downgrade(&new_accumulator);
2034 *accumulator_guard = Some(new_accumulator);
2035
2036 if self.state.is_committee_validator(&new_epoch_store) {
2037 info!("Promoting the node from fullnode to validator, starting grpc server");
2038
2039 let mut components = Self::construct_validator_components(
2040 self.config.clone(),
2041 self.state.clone(),
2042 Arc::new(next_epoch_committee.clone()),
2043 new_epoch_store.clone(),
2044 self.checkpoint_store.clone(),
2045 self.state_sync_handle.clone(),
2046 self.randomness_handle.clone(),
2047 weak_accumulator,
2048 self.backpressure_manager.clone(),
2049 self.connection_monitor_status.clone(),
2050 &self.registry_service,
2051 self.metrics.clone(),
2052 )
2053 .await?;
2054
2055 components.validator_server_handle =
2056 components.validator_server_handle.start().await;
2057
2058 Some(components)
2059 } else {
2060 None
2061 }
2062 };
2063 *validator_components_lock_guard = new_validator_components;
2064
2065 cur_epoch_store.release_db_handles();
2068
2069 if cfg!(msim)
2070 && !matches!(
2071 self.config
2072 .authority_store_pruning_config
2073 .num_epochs_to_retain_for_checkpoints(),
2074 None | Some(u64::MAX) | Some(0)
2075 )
2076 {
2077 self.state
2078 .prune_checkpoints_for_eligible_epochs_for_testing(
2079 self.config.clone(),
2080 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2081 )
2082 .await?;
2083 }
2084
2085 epoch_store = new_epoch_store;
2086 info!("Reconfiguration finished");
2087 }
2088 }
2089
2090 async fn shutdown(&self) {
2091 if let Some(validator_components) = &*self.validator_components.lock().await {
2092 validator_components.consensus_manager.shutdown().await;
2093 }
2094
2095 if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2097 info!("Shutting down gRPC server");
2098 if let Err(e) = grpc_handle.shutdown().await {
2099 warn!("Failed to gracefully shutdown gRPC server: {e}");
2100 }
2101 }
2102 }
2103
2104 async fn reconfigure_state(
2107 &self,
2108 state: &Arc<AuthorityState>,
2109 cur_epoch_store: &AuthorityPerEpochStore,
2110 next_epoch_committee: Committee,
2111 next_epoch_start_system_state: EpochStartSystemState,
2112 accumulator: Arc<StateAccumulator>,
2113 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2114 let next_epoch = next_epoch_committee.epoch();
2115
2116 let last_checkpoint = self
2117 .checkpoint_store
2118 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2119 .expect("Error loading last checkpoint for current epoch")
2120 .expect("Could not load last checkpoint for current epoch");
2121 let epoch_supply_change = last_checkpoint
2122 .end_of_epoch_data
2123 .as_ref()
2124 .ok_or_else(|| {
2125 IotaError::from("last checkpoint in epoch should contain end of epoch data")
2126 })?
2127 .epoch_supply_change;
2128
2129 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2130
2131 assert_eq!(
2132 Some(last_checkpoint_seq),
2133 self.checkpoint_store
2134 .get_highest_executed_checkpoint_seq_number()
2135 .expect("Error loading highest executed checkpoint sequence number")
2136 );
2137
2138 let epoch_start_configuration = EpochStartConfiguration::new(
2139 next_epoch_start_system_state,
2140 *last_checkpoint.digest(),
2141 state.get_object_store().as_ref(),
2142 EpochFlag::default_flags_for_new_epoch(&state.config),
2143 )
2144 .expect("EpochStartConfiguration construction cannot fail");
2145
2146 let new_epoch_store = self
2147 .state
2148 .reconfigure(
2149 cur_epoch_store,
2150 self.config.supported_protocol_versions.unwrap(),
2151 next_epoch_committee,
2152 epoch_start_configuration,
2153 accumulator,
2154 &self.config.expensive_safety_check_config,
2155 epoch_supply_change,
2156 last_checkpoint_seq,
2157 )
2158 .await
2159 .expect("Reconfigure authority state cannot fail");
2160 info!(next_epoch, "Node State has been reconfigured");
2161 assert_eq!(next_epoch, new_epoch_store.epoch());
2162 self.state.get_reconfig_api().update_epoch_flags_metrics(
2163 cur_epoch_store.epoch_start_config().flags(),
2164 new_epoch_store.epoch_start_config().flags(),
2165 );
2166
2167 Ok(new_epoch_store)
2168 }
2169
2170 pub fn get_config(&self) -> &NodeConfig {
2171 &self.config
2172 }
2173
2174 async fn execute_transaction_immediately_at_zero_epoch(
2175 state: &Arc<AuthorityState>,
2176 epoch_store: &Arc<AuthorityPerEpochStore>,
2177 tx: &Transaction,
2178 span: tracing::Span,
2179 ) {
2180 let _guard = span.enter();
2181 let transaction =
2182 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2183 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2184 tx.data().clone(),
2185 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2186 ),
2187 );
2188 state
2189 .try_execute_immediately(&transaction, None, epoch_store)
2190 .unwrap();
2191 }
2192
2193 pub fn randomness_handle(&self) -> randomness::Handle {
2194 self.randomness_handle.clone()
2195 }
2196
2197 async fn send_signed_capability_notification_to_committee_with_retry(
2203 &self,
2204 epoch_store: &Arc<AuthorityPerEpochStore>,
2205 ) {
2206 const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2207 const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2208 const MAX_RETRY_INTERVAL_SECS: u64 = 300; let config = epoch_store.protocol_config();
2212 let binary_config = to_binary_config(config);
2213
2214 let capabilities = AuthorityCapabilitiesV1::new(
2216 self.state.name,
2217 epoch_store.get_chain_identifier().chain(),
2218 self.config
2219 .supported_protocol_versions
2220 .expect("Supported versions should be populated")
2221 .truncate_below(config.version),
2222 self.state
2223 .get_available_system_packages(&binary_config)
2224 .await,
2225 );
2226
2227 let signature = AuthoritySignature::new_secure(
2229 &IntentMessage::new(
2230 Intent::iota_app(IntentScope::AuthorityCapabilities),
2231 &capabilities,
2232 ),
2233 &epoch_store.epoch(),
2234 self.config.authority_key_pair(),
2235 );
2236
2237 let request = HandleCapabilityNotificationRequestV1 {
2238 message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2239 };
2240
2241 let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2242
2243 loop {
2244 let auth_agg = self.auth_agg.load();
2245 match auth_agg
2246 .send_capability_notification_to_quorum(request.clone())
2247 .await
2248 {
2249 Ok(_) => {
2250 info!("Successfully sent capability notification to committee");
2251 break;
2252 }
2253 Err(err) => {
2254 match &err {
2255 AggregatorSendCapabilityNotificationError::RetryableNotification {
2256 errors,
2257 } => {
2258 warn!(
2259 "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2260 retry_interval, errors
2261 );
2262 }
2263 AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2264 errors,
2265 } => {
2266 error!(
2267 "Failed to send capability notification to committee (non-retryable error): {:?}",
2268 errors
2269 );
2270 break;
2271 }
2272 };
2273
2274 tokio::time::sleep(retry_interval).await;
2276
2277 retry_interval = std::cmp::min(
2279 retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2280 Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2281 );
2282 }
2283 }
2284 }
2285 }
2286}
2287
2288#[cfg(not(msim))]
2289impl IotaNode {
2290 async fn fetch_jwks(
2291 _authority: AuthorityName,
2292 provider: &OIDCProvider,
2293 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2294 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2295 let client = reqwest::Client::new();
2296 fetch_jwks(provider, &client)
2297 .await
2298 .map_err(|_| IotaError::JWKRetrieval)
2299 }
2300}
2301
2302#[cfg(msim)]
2303impl IotaNode {
2304 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2305 self.sim_state.sim_node.id()
2306 }
2307
2308 pub fn set_safe_mode_expected(&self, new_value: bool) {
2309 info!("Setting safe mode expected to {}", new_value);
2310 self.sim_state
2311 .sim_safe_mode_expected
2312 .store(new_value, Ordering::Relaxed);
2313 }
2314
2315 async fn fetch_jwks(
2316 authority: AuthorityName,
2317 provider: &OIDCProvider,
2318 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2319 get_jwk_injector()(authority, provider)
2320 }
2321}
2322
2323enum SpawnOnce {
2324 Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2326 #[allow(unused)]
2327 Started(iota_http::ServerHandle),
2328}
2329
2330impl SpawnOnce {
2331 pub fn new(
2332 future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2333 ) -> Self {
2334 Self::Unstarted(Mutex::new(Box::pin(future)))
2335 }
2336
2337 pub async fn start(self) -> Self {
2338 match self {
2339 Self::Unstarted(future) => {
2340 let server = future
2341 .into_inner()
2342 .await
2343 .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2344 let handle = server.handle().clone();
2345 tokio::spawn(async move {
2346 if let Err(err) = server.serve().await {
2347 info!("Server stopped: {err}");
2348 }
2349 info!("Server stopped");
2350 });
2351 Self::Started(handle)
2352 }
2353 Self::Started(_) => self,
2354 }
2355 }
2356
2357 pub fn shutdown(self) {
2358 if let SpawnOnce::Started(handle) = self {
2359 handle.trigger_shutdown();
2360 }
2361 }
2362}
2363
2364fn send_trusted_peer_change(
2367 config: &NodeConfig,
2368 sender: &watch::Sender<TrustedPeerChangeEvent>,
2369 new_epoch_start_state: &EpochStartSystemState,
2370) {
2371 let new_committee =
2372 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2373
2374 sender.send_modify(|event| {
2375 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2376 event.new_committee = new_committee;
2377 })
2378}
2379
2380fn build_kv_store(
2381 state: &Arc<AuthorityState>,
2382 config: &NodeConfig,
2383 registry: &Registry,
2384) -> Result<Arc<TransactionKeyValueStore>> {
2385 let metrics = KeyValueStoreMetrics::new(registry);
2386 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2387
2388 let base_url = &config.transaction_kv_store_read_config.base_url;
2389
2390 if base_url.is_empty() {
2391 info!("no http kv store url provided, using local db only");
2392 return Ok(Arc::new(db_store));
2393 }
2394
2395 base_url.parse::<url::Url>().tap_err(|e| {
2396 error!(
2397 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2398 base_url, e
2399 )
2400 })?;
2401
2402 let http_store = HttpKVStore::new_kv(
2403 base_url,
2404 config.transaction_kv_store_read_config.cache_size,
2405 metrics.clone(),
2406 )?;
2407 info!("using local key-value store with fallback to http key-value store");
2408 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2409 db_store,
2410 http_store,
2411 metrics,
2412 "json_rpc_fallback",
2413 )))
2414}
2415
2416async fn build_grpc_server(
2431 config: &NodeConfig,
2432 state: Arc<AuthorityState>,
2433 state_sync_store: RocksDbStore,
2434 executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2435 prometheus_registry: &Registry,
2436 server_version: ServerVersion,
2437) -> Result<Option<GrpcServerHandle>> {
2438 if config.consensus_config().is_some() || !config.enable_grpc_api {
2440 return Ok(None);
2441 }
2442
2443 let Some(grpc_config) = &config.grpc_api_config else {
2444 return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2445 };
2446
2447 let chain_id = state.get_chain_identifier();
2449
2450 let rest_read_store = Arc::new(RestReadStore::new(state.clone(), state_sync_store));
2451
2452 let shutdown_token = CancellationToken::new();
2454
2455 let grpc_reader = Arc::new(GrpcReader::from_rest_state_reader(
2457 rest_read_store,
2458 Some(server_version.to_string()),
2459 ));
2460
2461 let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2463
2464 let handle = start_grpc_server(
2467 grpc_reader,
2468 executor,
2469 grpc_config.clone(),
2470 shutdown_token,
2471 chain_id,
2472 Some(grpc_server_metrics),
2473 )
2474 .await?;
2475
2476 Ok(Some(handle))
2477}
2478
2479pub async fn build_http_server(
2496 state: Arc<AuthorityState>,
2497 store: RocksDbStore,
2498 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2499 config: &NodeConfig,
2500 prometheus_registry: &Registry,
2501 server_version: ServerVersion,
2502) -> Result<Option<iota_http::ServerHandle>> {
2503 if config.consensus_config().is_some() {
2505 return Ok(None);
2506 }
2507
2508 let mut router = axum::Router::new();
2509
2510 let json_rpc_router = {
2511 let mut server = JsonRpcServerBuilder::new(
2512 env!("CARGO_PKG_VERSION"),
2513 prometheus_registry,
2514 config.policy_config.clone(),
2515 config.firewall_config.clone(),
2516 );
2517
2518 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2519
2520 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2521 server.register_module(ReadApi::new(
2522 state.clone(),
2523 kv_store.clone(),
2524 metrics.clone(),
2525 ))?;
2526 server.register_module(CoinReadApi::new(
2527 state.clone(),
2528 kv_store.clone(),
2529 metrics.clone(),
2530 )?)?;
2531
2532 if config.run_with_range.is_none() {
2535 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2536 }
2537 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2538
2539 if let Some(transaction_orchestrator) = transaction_orchestrator {
2540 server.register_module(TransactionExecutionApi::new(
2541 state.clone(),
2542 transaction_orchestrator.clone(),
2543 metrics.clone(),
2544 ))?;
2545 }
2546
2547 let iota_names_config = config
2548 .iota_names_config
2549 .clone()
2550 .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2551
2552 server.register_module(IndexerApi::new(
2553 state.clone(),
2554 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2555 kv_store,
2556 metrics,
2557 iota_names_config,
2558 config.indexer_max_subscriptions,
2559 ))?;
2560 server.register_module(MoveUtils::new(state.clone()))?;
2561
2562 let server_type = config.jsonrpc_server_type();
2563
2564 server.to_router(server_type).await?
2565 };
2566
2567 router = router.merge(json_rpc_router);
2568
2569 if config.enable_rest_api {
2570 let mut rest_service =
2571 iota_rest_api::RestService::new(Arc::new(RestReadStore::new(state.clone(), store)));
2572 rest_service.with_server_version(server_version);
2573
2574 if let Some(config) = config.rest.clone() {
2575 rest_service.with_config(config);
2576 }
2577
2578 rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2579
2580 if let Some(transaction_orchestrator) = transaction_orchestrator {
2581 rest_service.with_executor(transaction_orchestrator.clone())
2582 }
2583
2584 router = router.merge(rest_service.into_router());
2585 }
2586
2587 router = router
2590 .route("/health", axum::routing::get(health_check_handler))
2591 .route_layer(axum::Extension(state));
2592
2593 let layers = ServiceBuilder::new()
2594 .map_request(|mut request: axum::http::Request<_>| {
2595 if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2596 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2597 request.extensions_mut().insert(axum_connect_info);
2598 }
2599 request
2600 })
2601 .layer(axum::middleware::from_fn(server_timing_middleware));
2602
2603 router = router.layer(layers);
2604
2605 let handle = iota_http::Builder::new()
2606 .serve(&config.json_rpc_address, router)
2607 .map_err(|e| anyhow::anyhow!("{e}"))?;
2608 info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2609
2610 Ok(Some(handle))
2611}
2612
2613#[derive(Debug, serde::Serialize, serde::Deserialize)]
2614pub struct Threshold {
2615 pub threshold_seconds: Option<u32>,
2616}
2617
2618async fn health_check_handler(
2619 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2620 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2621) -> impl axum::response::IntoResponse {
2622 if let Some(threshold_seconds) = threshold_seconds {
2623 let summary = match state
2625 .get_checkpoint_store()
2626 .get_highest_executed_checkpoint()
2627 {
2628 Ok(Some(summary)) => summary,
2629 Ok(None) => {
2630 warn!("Highest executed checkpoint not found");
2631 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2632 }
2633 Err(err) => {
2634 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2635 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2636 }
2637 };
2638
2639 let latest_chain_time = summary.timestamp();
2641 let threshold =
2642 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2643
2644 if latest_chain_time < threshold {
2646 warn!(
2647 ?latest_chain_time,
2648 ?threshold,
2649 "failing health check due to checkpoint lag"
2650 );
2651 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2652 }
2653 }
2654 (axum::http::StatusCode::OK, "up")
2656}
2657
2658#[cfg(not(test))]
2659fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2660 protocol_config.max_transactions_per_checkpoint() as usize
2661}
2662
2663#[cfg(test)]
2664fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2665 2
2666}