1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::{BTreeSet, HashMap, HashSet},
9 fmt,
10 future::Future,
11 path::PathBuf,
12 str::FromStr,
13 sync::{Arc, Weak},
14 time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19 callback::CallbackLayer,
20 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::future::BoxFuture;
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30 ConsensusConfig, NodeConfig,
31 node::{DBCheckpointConfig, RunWithRange},
32 node_config_metrics::NodeConfigMetrics,
33 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36 authority::{
37 AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38 authority_per_epoch_store::AuthorityPerEpochStore,
39 authority_store_pruner::ObjectsCompactionFilter,
40 authority_store_tables::{
41 AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42 },
43 backpressure::BackpressureManager,
44 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45 },
46 authority_aggregator::{
47 AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
48 },
49 authority_client::NetworkAuthorityClient,
50 authority_server::{ValidatorService, ValidatorServiceMetrics},
51 checkpoints::{
52 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
53 SubmitCheckpointToConsensus,
54 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
55 },
56 connection_monitor::ConnectionMonitor,
57 consensus_adapter::{
58 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
59 ConsensusClient,
60 },
61 consensus_handler::ConsensusHandlerInitializer,
62 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
63 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
64 db_checkpoint_handler::DBCheckpointHandler,
65 epoch::{
66 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
67 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
68 reconfiguration::ReconfigurationInitiator,
69 },
70 execution_cache::build_execution_cache,
71 jsonrpc_index::IndexStore,
72 module_cache_metrics::ResolverMetrics,
73 overload_monitor::overload_monitor,
74 rest_index::RestIndexStore,
75 safe_client::SafeClientMetricsBase,
76 signature_verifier::SignatureVerifierMetrics,
77 state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
78 storage::{RestReadStore, RocksDbStore},
79 traffic_controller::metrics::TrafficControllerMetrics,
80 transaction_orchestrator::TransactionOrchestrator,
81 validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87 transaction_builder_api::TransactionBuilderApi,
88 transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93 RegistryID, RegistryService,
94 hardware_metrics::register_hardware_metrics,
95 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96 server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
101};
102use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
103use iota_protocol_config::ProtocolConfig;
104use iota_rest_api::RestMetrics;
105use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
106use iota_snapshot::uploader::StateSnapshotUploader;
107use iota_storage::{
108 FileCompression, StorageFormat,
109 http_key_value_store::HttpKVStore,
110 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
111 key_value_store_metrics::KeyValueStoreMetrics,
112};
113use iota_types::{
114 base_types::{AuthorityName, ConciseableName, EpochId},
115 committee::Committee,
116 crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
117 digests::ChainIdentifier,
118 error::{IotaError, IotaResult},
119 executable_transaction::VerifiedExecutableTransaction,
120 execution_config_utils::to_binary_config,
121 full_checkpoint_content::CheckpointData,
122 iota_system_state::{
123 IotaSystemState, IotaSystemStateTrait,
124 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
125 },
126 messages_consensus::{
127 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
128 SignedAuthorityCapabilitiesV1, check_total_jwk_size,
129 },
130 messages_grpc::HandleCapabilityNotificationRequestV1,
131 quorum_driver_types::QuorumDriverEffectsQueueResult,
132 supported_protocol_versions::SupportedProtocolVersions,
133 transaction::{Transaction, VerifiedCertificate},
134};
135use prometheus::Registry;
136#[cfg(msim)]
137pub use simulator::set_jwk_injector;
138#[cfg(msim)]
139use simulator::*;
140use tap::tap::TapFallible;
141use tokio::{
142 runtime::Handle,
143 sync::{Mutex, broadcast, mpsc, watch},
144 task::{JoinHandle, JoinSet},
145};
146use tokio_util::sync::CancellationToken;
147use tower::ServiceBuilder;
148use tracing::{Instrument, debug, error, error_span, info, trace_span, warn};
149use typed_store::{
150 DBMetrics,
151 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
152};
153
154use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
155
156pub mod admin;
157mod handle;
158pub mod metrics;
159
160pub struct ValidatorComponents {
161 validator_server_handle: SpawnOnce,
162 validator_overload_monitor_handle: Option<JoinHandle<()>>,
163 consensus_manager: ConsensusManager,
164 consensus_store_pruner: ConsensusStorePruner,
165 consensus_adapter: Arc<ConsensusAdapter>,
166 checkpoint_service_tasks: JoinSet<()>,
168 checkpoint_metrics: Arc<CheckpointMetrics>,
169 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
170 validator_registry_id: RegistryID,
171}
172
173#[cfg(msim)]
174mod simulator {
175 use std::sync::atomic::AtomicBool;
176
177 use super::*;
178
179 pub(super) struct SimState {
180 pub sim_node: iota_simulator::runtime::NodeHandle,
181 pub sim_safe_mode_expected: AtomicBool,
182 _leak_detector: iota_simulator::NodeLeakDetector,
183 }
184
185 impl Default for SimState {
186 fn default() -> Self {
187 Self {
188 sim_node: iota_simulator::runtime::NodeHandle::current(),
189 sim_safe_mode_expected: AtomicBool::new(false),
190 _leak_detector: iota_simulator::NodeLeakDetector::new(),
191 }
192 }
193 }
194
195 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
196 + Send
197 + Sync
198 + 'static;
199
200 fn default_fetch_jwks(
201 _authority: AuthorityName,
202 _provider: &OIDCProvider,
203 ) -> IotaResult<Vec<(JwkId, JWK)>> {
204 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
205 parse_jwks(
207 iota_types::zk_login_util::DEFAULT_JWK_BYTES,
208 &OIDCProvider::Twitch,
209 )
210 .map_err(|_| IotaError::JWKRetrieval)
211 }
212
213 thread_local! {
214 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
215 }
216
217 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
218 JWK_INJECTOR.with(|injector| injector.borrow().clone())
219 }
220
221 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
222 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
223 }
224}
225
226pub struct IotaNode {
227 config: NodeConfig,
228 validator_components: Mutex<Option<ValidatorComponents>>,
229 _http_server: Option<iota_http::ServerHandle>,
232 state: Arc<AuthorityState>,
233 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
234 registry_service: RegistryService,
235 metrics: Arc<IotaNodeMetrics>,
236
237 _discovery: discovery::Handle,
238 state_sync_handle: state_sync::Handle,
239 randomness_handle: randomness::Handle,
240 checkpoint_store: Arc<CheckpointStore>,
241 accumulator: Mutex<Option<Arc<StateAccumulator>>>,
242 connection_monitor_status: Arc<ConnectionMonitorStatus>,
243
244 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
246
247 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
250
251 backpressure_manager: Arc<BackpressureManager>,
252
253 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
254
255 #[cfg(msim)]
256 sim_state: SimState,
257
258 _state_archive_handle: Option<broadcast::Sender<()>>,
259
260 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
261 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
263
264 grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
266
267 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
273}
274
275impl fmt::Debug for IotaNode {
276 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
277 f.debug_struct("IotaNode")
278 .field("name", &self.state.name.concise())
279 .finish()
280 }
281}
282
283static MAX_JWK_KEYS_PER_FETCH: usize = 100;
284
285impl IotaNode {
286 pub async fn start(
287 config: NodeConfig,
288 registry_service: RegistryService,
289 custom_rpc_runtime: Option<Handle>,
290 ) -> Result<Arc<IotaNode>> {
291 Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
292 }
293
294 fn start_jwk_updater(
299 config: &NodeConfig,
300 metrics: Arc<IotaNodeMetrics>,
301 authority: AuthorityName,
302 epoch_store: Arc<AuthorityPerEpochStore>,
303 consensus_adapter: Arc<ConsensusAdapter>,
304 ) {
305 let epoch = epoch_store.epoch();
306
307 let supported_providers = config
308 .zklogin_oauth_providers
309 .get(&epoch_store.get_chain_identifier().chain())
310 .unwrap_or(&BTreeSet::new())
311 .iter()
312 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
313 .collect::<Vec<_>>();
314
315 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
316
317 info!(
318 ?fetch_interval,
319 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
320 );
321
322 fn validate_jwk(
323 metrics: &Arc<IotaNodeMetrics>,
324 provider: &OIDCProvider,
325 id: &JwkId,
326 jwk: &JWK,
327 ) -> bool {
328 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
329 warn!(
330 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
331 id.iss, provider
332 );
333 metrics
334 .invalid_jwks
335 .with_label_values(&[&provider.to_string()])
336 .inc();
337 return false;
338 };
339
340 if iss_provider != *provider {
341 warn!(
342 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
343 id.iss, provider, iss_provider
344 );
345 metrics
346 .invalid_jwks
347 .with_label_values(&[&provider.to_string()])
348 .inc();
349 return false;
350 }
351
352 if !check_total_jwk_size(id, jwk) {
353 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
354 metrics
355 .invalid_jwks
356 .with_label_values(&[&provider.to_string()])
357 .inc();
358 return false;
359 }
360
361 true
362 }
363
364 for p in supported_providers.into_iter() {
373 let provider_str = p.to_string();
374 let epoch_store = epoch_store.clone();
375 let consensus_adapter = consensus_adapter.clone();
376 let metrics = metrics.clone();
377 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
378 async move {
379 let mut seen = HashSet::new();
382 loop {
383 info!("fetching JWK for provider {:?}", p);
384 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
385 match Self::fetch_jwks(authority, &p).await {
386 Err(e) => {
387 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
388 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
389 tokio::time::sleep(Duration::from_secs(30)).await;
391 continue;
392 }
393 Ok(mut keys) => {
394 metrics.total_jwks
395 .with_label_values(&[&provider_str])
396 .inc_by(keys.len() as u64);
397
398 keys.retain(|(id, jwk)| {
399 validate_jwk(&metrics, &p, id, jwk) &&
400 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
401 seen.insert((id.clone(), jwk.clone()))
402 });
403
404 metrics.unique_jwks
405 .with_label_values(&[&provider_str])
406 .inc_by(keys.len() as u64);
407
408 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
411 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
412 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
413 }
414
415 for (id, jwk) in keys.into_iter() {
416 info!("Submitting JWK to consensus: {:?}", id);
417
418 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
419 consensus_adapter.submit(txn, None, &epoch_store)
420 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
421 .ok();
422 }
423 }
424 }
425 tokio::time::sleep(fetch_interval).await;
426 }
427 }
428 .instrument(error_span!("jwk_updater_task", epoch)),
429 ));
430 }
431 }
432
433 pub async fn start_async(
434 config: NodeConfig,
435 registry_service: RegistryService,
436 custom_rpc_runtime: Option<Handle>,
437 software_version: &'static str,
438 ) -> Result<Arc<IotaNode>> {
439 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
440 let mut config = config.clone();
441 if config.supported_protocol_versions.is_none() {
442 info!(
443 "populating config.supported_protocol_versions with default {:?}",
444 SupportedProtocolVersions::SYSTEM_DEFAULT
445 );
446 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
447 }
448
449 let run_with_range = config.run_with_range;
450 let is_validator = config.consensus_config().is_some();
451 let is_full_node = !is_validator;
452 let prometheus_registry = registry_service.default_registry();
453
454 info!(node =? config.authority_public_key(),
455 "Initializing iota-node listening on {}", config.network_address
456 );
457
458 let genesis = config.genesis()?.clone();
459
460 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
461 info!("IOTA chain identifier: {chain_identifier}");
462
463 let db_corrupted_path = &config.db_path().join("status");
465 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
466 panic!("Failed to check database corruption: {err}");
467 }
468
469 DBMetrics::init(&prometheus_registry);
471
472 iota_metrics::init_metrics(&prometheus_registry);
474 #[cfg(not(msim))]
477 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
478
479 register_hardware_metrics(®istry_service, &config.db_path)
481 .expect("Failed registering hardware metrics");
482 prometheus_registry
484 .register(iota_metrics::uptime_metric(
485 if is_validator {
486 "validator"
487 } else {
488 "fullnode"
489 },
490 software_version,
491 &chain_identifier.to_string(),
492 ))
493 .expect("Failed registering uptime metric");
494
495 let migration_tx_data = if genesis.contains_migrations() {
498 Some(config.load_migration_tx_data()?)
501 } else {
502 None
503 };
504
505 let secret = Arc::pin(config.authority_key_pair().copy());
506 let genesis_committee = genesis.committee()?;
507 let committee_store = Arc::new(CommitteeStore::new(
508 config.db_path().join("epochs"),
509 &genesis_committee,
510 None,
511 ));
512
513 let mut pruner_db = None;
514 if config
515 .authority_store_pruning_config
516 .enable_compaction_filter
517 {
518 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
519 &config.db_path().join("store"),
520 )));
521 }
522 let compaction_filter = pruner_db
523 .clone()
524 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
525
526 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
528 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
529 enable_write_stall,
530 compaction_filter,
531 };
532 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
533 &config.db_path().join("store"),
534 Some(perpetual_tables_options),
535 ));
536 let is_genesis = perpetual_tables
537 .database_is_empty()
538 .expect("Database read should not fail at init.");
539 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
540 let backpressure_manager =
541 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
542
543 let store = AuthorityStore::open(
544 perpetual_tables,
545 &genesis,
546 &config,
547 &prometheus_registry,
548 migration_tx_data.as_ref(),
549 )
550 .await?;
551
552 let cur_epoch = store.get_recovery_epoch_at_restart()?;
553 let committee = committee_store
554 .get_committee(&cur_epoch)?
555 .expect("Committee of the current epoch must exist");
556 let epoch_start_configuration = store
557 .get_epoch_start_configuration()?
558 .expect("EpochStartConfiguration of the current epoch must exist");
559 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
560 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
561
562 let cache_traits = build_execution_cache(
563 &config.execution_cache_config,
564 &epoch_start_configuration,
565 &prometheus_registry,
566 &store,
567 backpressure_manager.clone(),
568 );
569
570 let auth_agg = {
571 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
572 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
573 Arc::new(ArcSwap::new(Arc::new(
574 AuthorityAggregator::new_from_epoch_start_state(
575 epoch_start_configuration.epoch_start_state(),
576 &committee_store,
577 safe_client_metrics_base,
578 auth_agg_metrics,
579 ),
580 )))
581 };
582
583 let chain = match config.chain_override_for_testing {
584 Some(chain) => chain,
585 None => chain_identifier.chain(),
586 };
587
588 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
589 let epoch_store = AuthorityPerEpochStore::new(
590 config.authority_public_key(),
591 committee.clone(),
592 &config.db_path().join("store"),
593 Some(epoch_options.options),
594 EpochMetrics::new(®istry_service.default_registry()),
595 epoch_start_configuration,
596 cache_traits.backing_package_store.clone(),
597 cache_traits.object_store.clone(),
598 cache_metrics,
599 signature_verifier_metrics,
600 &config.expensive_safety_check_config,
601 (chain_identifier, chain),
602 checkpoint_store
603 .get_highest_executed_checkpoint_seq_number()
604 .expect("checkpoint store read cannot fail")
605 .unwrap_or(0),
606 )?;
607
608 info!("created epoch store");
609
610 replay_log!(
611 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
612 epoch_store.epoch(),
613 epoch_store.protocol_config()
614 );
615
616 if is_genesis {
618 info!("checking IOTA conservation at genesis");
619 cache_traits
624 .reconfig_api
625 .try_expensive_check_iota_conservation(&epoch_store, None)
626 .expect("IOTA conservation check cannot fail at genesis");
627 }
628
629 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
630 let default_buffer_stake = epoch_store
631 .protocol_config()
632 .buffer_stake_for_protocol_upgrade_bps();
633 if effective_buffer_stake != default_buffer_stake {
634 warn!(
635 ?effective_buffer_stake,
636 ?default_buffer_stake,
637 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
638 );
639 }
640
641 checkpoint_store.insert_genesis_checkpoint(
642 genesis.checkpoint(),
643 genesis.checkpoint_contents().clone(),
644 &epoch_store,
645 );
646
647 unmark_db_corruption(db_corrupted_path)?;
649
650 info!("creating state sync store");
651 let state_sync_store = RocksDbStore::new(
652 cache_traits.clone(),
653 committee_store.clone(),
654 checkpoint_store.clone(),
655 );
656
657 let index_store = if is_full_node && config.enable_index_processing {
658 info!("creating index store");
659 Some(Arc::new(IndexStore::new(
660 config.db_path().join("indexes"),
661 &prometheus_registry,
662 epoch_store
663 .protocol_config()
664 .max_move_identifier_len_as_option(),
665 )))
666 } else {
667 None
668 };
669
670 let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
671 {
672 Some(Arc::new(
673 RestIndexStore::new(
674 config.db_path().join("rest_index"),
675 &store,
676 &checkpoint_store,
677 &epoch_store,
678 &cache_traits.backing_package_store,
679 )
680 .await,
681 ))
682 } else {
683 None
684 };
685
686 info!("creating archive reader");
687 let archive_readers =
692 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
693 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
694 let (randomness_tx, randomness_rx) = mpsc::channel(
695 config
696 .p2p_config
697 .randomness
698 .clone()
699 .unwrap_or_default()
700 .mailbox_capacity(),
701 );
702 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
703 Self::create_p2p_network(
704 &config,
705 state_sync_store.clone(),
706 chain_identifier,
707 trusted_peer_change_rx,
708 archive_readers.clone(),
709 randomness_tx,
710 &prometheus_registry,
711 )?;
712
713 send_trusted_peer_change(
716 &config,
717 &trusted_peer_change_tx,
718 epoch_store.epoch_start_state(),
719 );
720
721 info!("start state archival");
722 let state_archive_handle =
724 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
725 .await?;
726
727 info!("start snapshot upload");
728 let state_snapshot_handle =
730 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
731
732 info!("start db checkpoint");
734 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
735 &config,
736 &prometheus_registry,
737 state_snapshot_handle.is_some(),
738 )?;
739
740 let mut genesis_objects = genesis.objects().to_vec();
741 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
742 genesis_objects.extend(migration_tx_data.get_objects());
743 }
744
745 let authority_name = config.authority_public_key();
746 let validator_tx_finalizer =
747 config
748 .enable_validator_tx_finalizer
749 .then_some(Arc::new(ValidatorTxFinalizer::new(
750 auth_agg.clone(),
751 authority_name,
752 &prometheus_registry,
753 )));
754
755 info!("create authority state");
756 let state = AuthorityState::new(
757 authority_name,
758 secret,
759 config.supported_protocol_versions.unwrap(),
760 store.clone(),
761 cache_traits.clone(),
762 epoch_store.clone(),
763 committee_store.clone(),
764 index_store.clone(),
765 rest_index,
766 checkpoint_store.clone(),
767 &prometheus_registry,
768 &genesis_objects,
769 &db_checkpoint_config,
770 config.clone(),
771 archive_readers,
772 validator_tx_finalizer,
773 chain_identifier,
774 pruner_db,
775 )
776 .await;
777
778 if epoch_store.epoch() == 0 {
780 let genesis_tx = &genesis.transaction();
781 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
782 Self::execute_transaction_immediately_at_zero_epoch(
784 &state,
785 &epoch_store,
786 genesis_tx,
787 span,
788 )
789 .await;
790
791 if let Some(migration_tx_data) = migration_tx_data {
793 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
794 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
795 Self::execute_transaction_immediately_at_zero_epoch(
796 &state,
797 &epoch_store,
798 tx,
799 span,
800 )
801 .await;
802 }
803 }
804 }
805
806 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
809
810 if config
811 .expensive_safety_check_config
812 .enable_secondary_index_checks()
813 {
814 if let Some(indexes) = state.indexes.clone() {
815 iota_core::verify_indexes::verify_indexes(
816 state.get_accumulator_store().as_ref(),
817 indexes,
818 )
819 .expect("secondary indexes are inconsistent");
820 }
821 }
822
823 let (end_of_epoch_channel, end_of_epoch_receiver) =
824 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
825
826 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
827 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
828 auth_agg.load_full(),
829 state.clone(),
830 end_of_epoch_receiver,
831 &config.db_path(),
832 &prometheus_registry,
833 )))
834 } else {
835 None
836 };
837
838 let http_server = build_http_server(
839 state.clone(),
840 state_sync_store.clone(),
841 &transaction_orchestrator.clone(),
842 &config,
843 &prometheus_registry,
844 custom_rpc_runtime,
845 software_version,
846 )
847 .await?;
848
849 let accumulator = Arc::new(StateAccumulator::new(
850 cache_traits.accumulator_store.clone(),
851 StateAccumulatorMetrics::new(&prometheus_registry),
852 ));
853
854 let authority_names_to_peer_ids = epoch_store
855 .epoch_start_state()
856 .get_authority_names_to_peer_ids();
857
858 let network_connection_metrics =
859 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
860
861 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
862
863 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
864 p2p_network.downgrade(),
865 network_connection_metrics,
866 HashMap::new(),
867 None,
868 );
869
870 let connection_monitor_status = ConnectionMonitorStatus {
871 connection_statuses,
872 authority_names_to_peer_ids,
873 };
874
875 let connection_monitor_status = Arc::new(connection_monitor_status);
876 let iota_node_metrics =
877 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
878
879 let executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>> =
883 transaction_orchestrator
884 .clone()
885 .map(|o| o as Arc<dyn iota_types::transaction_executor::TransactionExecutor>);
886
887 let grpc_server_handle = build_grpc_server(
888 &config,
889 state.clone(),
890 state_sync_store.clone(),
891 executor,
892 ®istry_service.default_registry(),
893 )
894 .await?;
895
896 let validator_components = if state.is_committee_validator(&epoch_store) {
897 let (components, _) = futures::join!(
898 Self::construct_validator_components(
899 config.clone(),
900 state.clone(),
901 committee,
902 epoch_store.clone(),
903 checkpoint_store.clone(),
904 state_sync_handle.clone(),
905 randomness_handle.clone(),
906 Arc::downgrade(&accumulator),
907 backpressure_manager.clone(),
908 connection_monitor_status.clone(),
909 ®istry_service,
910 iota_node_metrics.clone(),
911 ),
912 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
913 );
914 let mut components = components?;
915
916 components.consensus_adapter.submit_recovered(&epoch_store);
917
918 components.validator_server_handle = components.validator_server_handle.start().await;
920
921 Some(components)
922 } else {
923 None
924 };
925
926 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
928
929 let node = Self {
930 config,
931 validator_components: Mutex::new(validator_components),
932 _http_server: http_server,
933 state,
934 transaction_orchestrator,
935 registry_service,
936 metrics: iota_node_metrics,
937
938 _discovery: discovery_handle,
939 state_sync_handle,
940 randomness_handle,
941 checkpoint_store,
942 accumulator: Mutex::new(Some(accumulator)),
943 end_of_epoch_channel,
944 connection_monitor_status,
945 trusted_peer_change_tx,
946 backpressure_manager,
947
948 _db_checkpoint_handle: db_checkpoint_handle,
949
950 #[cfg(msim)]
951 sim_state: Default::default(),
952
953 _state_archive_handle: state_archive_handle,
954 _state_snapshot_uploader_handle: state_snapshot_handle,
955 shutdown_channel_tx: shutdown_channel,
956
957 grpc_server_handle: Mutex::new(grpc_server_handle),
958
959 auth_agg,
960 };
961
962 info!("IotaNode started!");
963 let node = Arc::new(node);
964 let node_copy = node.clone();
965 spawn_monitored_task!(async move {
966 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
967 if let Err(error) = result {
968 warn!("Reconfiguration finished with error {:?}", error);
969 }
970 });
971
972 Ok(node)
973 }
974
975 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
976 self.end_of_epoch_channel.subscribe()
977 }
978
979 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
980 self.shutdown_channel_tx.subscribe()
981 }
982
983 pub fn current_epoch_for_testing(&self) -> EpochId {
984 self.state.current_epoch_for_testing()
985 }
986
987 pub fn db_checkpoint_path(&self) -> PathBuf {
988 self.config.db_checkpoint_path()
989 }
990
991 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
993 info!("close_epoch (current epoch = {})", epoch_store.epoch());
994 self.validator_components
995 .lock()
996 .await
997 .as_ref()
998 .ok_or_else(|| IotaError::from("Node is not a validator"))?
999 .consensus_adapter
1000 .close_epoch(epoch_store);
1001 Ok(())
1002 }
1003
1004 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
1005 self.state
1006 .clear_override_protocol_upgrade_buffer_stake(epoch)
1007 }
1008
1009 pub fn set_override_protocol_upgrade_buffer_stake(
1010 &self,
1011 epoch: EpochId,
1012 buffer_stake_bps: u64,
1013 ) -> IotaResult {
1014 self.state
1015 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1016 }
1017
1018 pub async fn close_epoch_for_testing(&self) -> IotaResult {
1021 let epoch_store = self.state.epoch_store_for_testing();
1022 self.close_epoch(&epoch_store).await
1023 }
1024
1025 async fn start_state_archival(
1026 config: &NodeConfig,
1027 prometheus_registry: &Registry,
1028 state_sync_store: RocksDbStore,
1029 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1030 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1031 let local_store_config = ObjectStoreConfig {
1032 object_store: Some(ObjectStoreType::File),
1033 directory: Some(config.archive_path()),
1034 ..Default::default()
1035 };
1036 let archive_writer = ArchiveWriter::new(
1037 local_store_config,
1038 remote_store_config.clone(),
1039 FileCompression::Zstd,
1040 StorageFormat::Blob,
1041 Duration::from_secs(600),
1042 256 * 1024 * 1024,
1043 prometheus_registry,
1044 )
1045 .await?;
1046 Ok(Some(archive_writer.start(state_sync_store).await?))
1047 } else {
1048 Ok(None)
1049 }
1050 }
1051
1052 fn start_state_snapshot(
1055 config: &NodeConfig,
1056 prometheus_registry: &Registry,
1057 checkpoint_store: Arc<CheckpointStore>,
1058 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1059 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1060 let snapshot_uploader = StateSnapshotUploader::new(
1061 &config.db_checkpoint_path(),
1062 &config.snapshot_path(),
1063 remote_store_config.clone(),
1064 60,
1065 prometheus_registry,
1066 checkpoint_store,
1067 )?;
1068 Ok(Some(snapshot_uploader.start()))
1069 } else {
1070 Ok(None)
1071 }
1072 }
1073
1074 fn start_db_checkpoint(
1075 config: &NodeConfig,
1076 prometheus_registry: &Registry,
1077 state_snapshot_enabled: bool,
1078 ) -> Result<(
1079 DBCheckpointConfig,
1080 Option<tokio::sync::broadcast::Sender<()>>,
1081 )> {
1082 let checkpoint_path = Some(
1083 config
1084 .db_checkpoint_config
1085 .checkpoint_path
1086 .clone()
1087 .unwrap_or_else(|| config.db_checkpoint_path()),
1088 );
1089 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1090 DBCheckpointConfig {
1091 checkpoint_path,
1092 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1093 true
1094 } else {
1095 config
1096 .db_checkpoint_config
1097 .perform_db_checkpoints_at_epoch_end
1098 },
1099 ..config.db_checkpoint_config.clone()
1100 }
1101 } else {
1102 config.db_checkpoint_config.clone()
1103 };
1104
1105 match (
1106 db_checkpoint_config.object_store_config.as_ref(),
1107 state_snapshot_enabled,
1108 ) {
1109 (None, false) => Ok((db_checkpoint_config, None)),
1114 (_, _) => {
1115 let handler = DBCheckpointHandler::new(
1116 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1117 db_checkpoint_config.object_store_config.as_ref(),
1118 60,
1119 db_checkpoint_config
1120 .prune_and_compact_before_upload
1121 .unwrap_or(true),
1122 config.authority_store_pruning_config.clone(),
1123 prometheus_registry,
1124 state_snapshot_enabled,
1125 )?;
1126 Ok((
1127 db_checkpoint_config,
1128 Some(DBCheckpointHandler::start(handler)),
1129 ))
1130 }
1131 }
1132 }
1133
1134 fn create_p2p_network(
1135 config: &NodeConfig,
1136 state_sync_store: RocksDbStore,
1137 chain_identifier: ChainIdentifier,
1138 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1139 archive_readers: ArchiveReaderBalancer,
1140 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1141 prometheus_registry: &Registry,
1142 ) -> Result<(
1143 Network,
1144 discovery::Handle,
1145 state_sync::Handle,
1146 randomness::Handle,
1147 )> {
1148 let (state_sync, state_sync_server) = state_sync::Builder::new()
1149 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1150 .store(state_sync_store)
1151 .archive_readers(archive_readers)
1152 .with_metrics(prometheus_registry)
1153 .build();
1154
1155 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1156 .config(config.p2p_config.clone())
1157 .build();
1158
1159 let (randomness, randomness_router) =
1160 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1161 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1162 .with_metrics(prometheus_registry)
1163 .build();
1164
1165 let p2p_network = {
1166 let routes = anemo::Router::new()
1167 .add_rpc_service(discovery_server)
1168 .add_rpc_service(state_sync_server);
1169 let routes = routes.merge(randomness_router);
1170
1171 let inbound_network_metrics =
1172 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1173 let outbound_network_metrics =
1174 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1175
1176 let service = ServiceBuilder::new()
1177 .layer(
1178 TraceLayer::new_for_server_errors()
1179 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1180 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1181 )
1182 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1183 Arc::new(inbound_network_metrics),
1184 config.p2p_config.excessive_message_size(),
1185 )))
1186 .service(routes);
1187
1188 let outbound_layer = ServiceBuilder::new()
1189 .layer(
1190 TraceLayer::new_for_client_and_server_errors()
1191 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1192 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1193 )
1194 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1195 Arc::new(outbound_network_metrics),
1196 config.p2p_config.excessive_message_size(),
1197 )))
1198 .into_inner();
1199
1200 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1201 anemo_config.max_frame_size = Some(1 << 30);
1204
1205 let mut quic_config = anemo_config.quic.unwrap_or_default();
1208 if quic_config.socket_send_buffer_size.is_none() {
1209 quic_config.socket_send_buffer_size = Some(20 << 20);
1210 }
1211 if quic_config.socket_receive_buffer_size.is_none() {
1212 quic_config.socket_receive_buffer_size = Some(20 << 20);
1213 }
1214 quic_config.allow_failed_socket_buffer_size_setting = true;
1215
1216 if quic_config.max_concurrent_bidi_streams.is_none() {
1219 quic_config.max_concurrent_bidi_streams = Some(500);
1220 }
1221 if quic_config.max_concurrent_uni_streams.is_none() {
1222 quic_config.max_concurrent_uni_streams = Some(500);
1223 }
1224 if quic_config.stream_receive_window.is_none() {
1225 quic_config.stream_receive_window = Some(100 << 20);
1226 }
1227 if quic_config.receive_window.is_none() {
1228 quic_config.receive_window = Some(200 << 20);
1229 }
1230 if quic_config.send_window.is_none() {
1231 quic_config.send_window = Some(200 << 20);
1232 }
1233 if quic_config.crypto_buffer_size.is_none() {
1234 quic_config.crypto_buffer_size = Some(1 << 20);
1235 }
1236 if quic_config.max_idle_timeout_ms.is_none() {
1237 quic_config.max_idle_timeout_ms = Some(30_000);
1238 }
1239 if quic_config.keep_alive_interval_ms.is_none() {
1240 quic_config.keep_alive_interval_ms = Some(5_000);
1241 }
1242 anemo_config.quic = Some(quic_config);
1243
1244 let server_name = format!("iota-{chain_identifier}");
1245 let network = Network::bind(config.p2p_config.listen_address)
1246 .server_name(&server_name)
1247 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1248 .config(anemo_config)
1249 .outbound_request_layer(outbound_layer)
1250 .start(service)?;
1251 info!(
1252 server_name = server_name,
1253 "P2p network started on {}",
1254 network.local_addr()
1255 );
1256
1257 network
1258 };
1259
1260 let discovery_handle =
1261 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1262 let state_sync_handle = state_sync.start(p2p_network.clone());
1263 let randomness_handle = randomness.start(p2p_network.clone());
1264
1265 Ok((
1266 p2p_network,
1267 discovery_handle,
1268 state_sync_handle,
1269 randomness_handle,
1270 ))
1271 }
1272
1273 async fn construct_validator_components(
1276 config: NodeConfig,
1277 state: Arc<AuthorityState>,
1278 committee: Arc<Committee>,
1279 epoch_store: Arc<AuthorityPerEpochStore>,
1280 checkpoint_store: Arc<CheckpointStore>,
1281 state_sync_handle: state_sync::Handle,
1282 randomness_handle: randomness::Handle,
1283 accumulator: Weak<StateAccumulator>,
1284 backpressure_manager: Arc<BackpressureManager>,
1285 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1286 registry_service: &RegistryService,
1287 iota_node_metrics: Arc<IotaNodeMetrics>,
1288 ) -> Result<ValidatorComponents> {
1289 let mut config_clone = config.clone();
1290 let consensus_config = config_clone
1291 .consensus_config
1292 .as_mut()
1293 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1294 let validator_registry = Registry::new();
1295 let validator_registry_id = registry_service.add(validator_registry.clone());
1296
1297 let client = Arc::new(UpdatableConsensusClient::new());
1298 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1299 &committee,
1300 consensus_config,
1301 state.name,
1302 connection_monitor_status.clone(),
1303 &validator_registry,
1304 client.clone(),
1305 checkpoint_store.clone(),
1306 ));
1307 let consensus_manager = ConsensusManager::new(
1308 &config,
1309 consensus_config,
1310 registry_service,
1311 &validator_registry,
1312 client,
1313 );
1314
1315 let consensus_store_pruner = ConsensusStorePruner::new(
1318 consensus_manager.get_storage_base_path(),
1319 consensus_config.db_retention_epochs(),
1320 consensus_config.db_pruner_period(),
1321 &validator_registry,
1322 );
1323
1324 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1325 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1326
1327 let validator_server_handle = Self::start_grpc_validator_service(
1328 &config,
1329 state.clone(),
1330 consensus_adapter.clone(),
1331 &validator_registry,
1332 )
1333 .await?;
1334
1335 let validator_overload_monitor_handle = if config
1338 .authority_overload_config
1339 .max_load_shedding_percentage
1340 > 0
1341 {
1342 let authority_state = Arc::downgrade(&state);
1343 let overload_config = config.authority_overload_config.clone();
1344 fail_point!("starting_overload_monitor");
1345 Some(spawn_monitored_task!(overload_monitor(
1346 authority_state,
1347 overload_config,
1348 )))
1349 } else {
1350 None
1351 };
1352
1353 Self::start_epoch_specific_validator_components(
1354 &config,
1355 state.clone(),
1356 consensus_adapter,
1357 checkpoint_store,
1358 epoch_store,
1359 state_sync_handle,
1360 randomness_handle,
1361 consensus_manager,
1362 consensus_store_pruner,
1363 accumulator,
1364 backpressure_manager,
1365 validator_server_handle,
1366 validator_overload_monitor_handle,
1367 checkpoint_metrics,
1368 iota_node_metrics,
1369 iota_tx_validator_metrics,
1370 validator_registry_id,
1371 )
1372 .await
1373 }
1374
1375 async fn start_epoch_specific_validator_components(
1378 config: &NodeConfig,
1379 state: Arc<AuthorityState>,
1380 consensus_adapter: Arc<ConsensusAdapter>,
1381 checkpoint_store: Arc<CheckpointStore>,
1382 epoch_store: Arc<AuthorityPerEpochStore>,
1383 state_sync_handle: state_sync::Handle,
1384 randomness_handle: randomness::Handle,
1385 consensus_manager: ConsensusManager,
1386 consensus_store_pruner: ConsensusStorePruner,
1387 accumulator: Weak<StateAccumulator>,
1388 backpressure_manager: Arc<BackpressureManager>,
1389 validator_server_handle: SpawnOnce,
1390 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1391 checkpoint_metrics: Arc<CheckpointMetrics>,
1392 iota_node_metrics: Arc<IotaNodeMetrics>,
1393 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1394 validator_registry_id: RegistryID,
1395 ) -> Result<ValidatorComponents> {
1396 let checkpoint_service = Self::build_checkpoint_service(
1397 config,
1398 consensus_adapter.clone(),
1399 checkpoint_store.clone(),
1400 epoch_store.clone(),
1401 state.clone(),
1402 state_sync_handle,
1403 accumulator,
1404 checkpoint_metrics.clone(),
1405 );
1406
1407 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1412
1413 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1414
1415 let randomness_manager = RandomnessManager::try_new(
1416 Arc::downgrade(&epoch_store),
1417 Box::new(consensus_adapter.clone()),
1418 randomness_handle,
1419 config.authority_key_pair(),
1420 )
1421 .await;
1422 if let Some(randomness_manager) = randomness_manager {
1423 epoch_store
1424 .set_randomness_manager(randomness_manager)
1425 .await?;
1426 }
1427
1428 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1429 state.clone(),
1430 checkpoint_service.clone(),
1431 epoch_store.clone(),
1432 low_scoring_authorities,
1433 backpressure_manager,
1434 );
1435
1436 info!("Starting consensus manager");
1437
1438 consensus_manager
1439 .start(
1440 config,
1441 epoch_store.clone(),
1442 consensus_handler_initializer,
1443 IotaTxValidator::new(
1444 epoch_store.clone(),
1445 checkpoint_service.clone(),
1446 state.transaction_manager().clone(),
1447 iota_tx_validator_metrics.clone(),
1448 ),
1449 )
1450 .await;
1451
1452 info!("Spawning checkpoint service");
1453 let checkpoint_service_tasks = checkpoint_service.spawn().await;
1454
1455 if epoch_store.authenticator_state_enabled() {
1456 Self::start_jwk_updater(
1457 config,
1458 iota_node_metrics,
1459 state.name,
1460 epoch_store.clone(),
1461 consensus_adapter.clone(),
1462 );
1463 }
1464
1465 Ok(ValidatorComponents {
1466 validator_server_handle,
1467 validator_overload_monitor_handle,
1468 consensus_manager,
1469 consensus_store_pruner,
1470 consensus_adapter,
1471 checkpoint_service_tasks,
1472 checkpoint_metrics,
1473 iota_tx_validator_metrics,
1474 validator_registry_id,
1475 })
1476 }
1477
1478 fn build_checkpoint_service(
1485 config: &NodeConfig,
1486 consensus_adapter: Arc<ConsensusAdapter>,
1487 checkpoint_store: Arc<CheckpointStore>,
1488 epoch_store: Arc<AuthorityPerEpochStore>,
1489 state: Arc<AuthorityState>,
1490 state_sync_handle: state_sync::Handle,
1491 accumulator: Weak<StateAccumulator>,
1492 checkpoint_metrics: Arc<CheckpointMetrics>,
1493 ) -> Arc<CheckpointService> {
1494 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1495 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1496
1497 debug!(
1498 "Starting checkpoint service with epoch start timestamp {}
1499 and epoch duration {}",
1500 epoch_start_timestamp_ms, epoch_duration_ms
1501 );
1502
1503 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1504 sender: consensus_adapter,
1505 signer: state.secret.clone(),
1506 authority: config.authority_public_key(),
1507 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1508 .checked_add(epoch_duration_ms)
1509 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1510 metrics: checkpoint_metrics.clone(),
1511 });
1512
1513 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1514 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1515 let max_checkpoint_size_bytes =
1516 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1517
1518 CheckpointService::build(
1519 state.clone(),
1520 checkpoint_store,
1521 epoch_store,
1522 state.get_transaction_cache_reader().clone(),
1523 accumulator,
1524 checkpoint_output,
1525 Box::new(certified_checkpoint_output),
1526 checkpoint_metrics,
1527 max_tx_per_checkpoint,
1528 max_checkpoint_size_bytes,
1529 )
1530 }
1531
1532 fn construct_consensus_adapter(
1533 committee: &Committee,
1534 consensus_config: &ConsensusConfig,
1535 authority: AuthorityName,
1536 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1537 prometheus_registry: &Registry,
1538 consensus_client: Arc<dyn ConsensusClient>,
1539 checkpoint_store: Arc<CheckpointStore>,
1540 ) -> ConsensusAdapter {
1541 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1542 ConsensusAdapter::new(
1546 consensus_client,
1547 checkpoint_store,
1548 authority,
1549 connection_monitor_status,
1550 consensus_config.max_pending_transactions(),
1551 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1552 consensus_config.max_submit_position,
1553 consensus_config.submit_delay_step_override(),
1554 ca_metrics,
1555 )
1556 }
1557
1558 async fn start_grpc_validator_service(
1559 config: &NodeConfig,
1560 state: Arc<AuthorityState>,
1561 consensus_adapter: Arc<ConsensusAdapter>,
1562 prometheus_registry: &Registry,
1563 ) -> Result<SpawnOnce> {
1564 let validator_service = ValidatorService::new(
1565 state.clone(),
1566 consensus_adapter,
1567 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1568 TrafficControllerMetrics::new(prometheus_registry),
1569 config.policy_config.clone(),
1570 config.firewall_config.clone(),
1571 );
1572
1573 let mut server_conf = iota_network_stack::config::Config::new();
1574 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1575 server_conf.load_shed = config.grpc_load_shed;
1576 let server_builder =
1577 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry))
1578 .add_service(ValidatorServer::new(validator_service));
1579
1580 let tls_config = iota_tls::create_rustls_server_config(
1581 config.network_key_pair().copy().private(),
1582 IOTA_TLS_SERVER_NAME.to_string(),
1583 );
1584
1585 let network_address = config.network_address().clone();
1586
1587 let bind_future = async move {
1588 let server = server_builder
1589 .bind(&network_address, Some(tls_config))
1590 .await
1591 .map_err(|err| anyhow!("Failed to bind to {network_address}: {err}"))?;
1592
1593 let local_addr = server.local_addr();
1594 info!("Listening to traffic on {local_addr}");
1595
1596 Ok(server)
1597 };
1598
1599 Ok(SpawnOnce::new(bind_future))
1600 }
1601
1602 async fn reexecute_pending_consensus_certs(
1620 epoch_store: &Arc<AuthorityPerEpochStore>,
1621 state: &Arc<AuthorityState>,
1622 ) {
1623 let mut pending_consensus_certificates = Vec::new();
1624 let mut additional_certs = Vec::new();
1625
1626 for tx in epoch_store.get_all_pending_consensus_transactions() {
1627 match tx.kind {
1628 ConsensusTransactionKind::CertifiedTransaction(tx)
1631 if !tx.contains_shared_object() =>
1632 {
1633 let tx = *tx;
1634 let tx = VerifiedExecutableTransaction::new_from_certificate(
1637 VerifiedCertificate::new_unchecked(tx),
1638 );
1639 if let Some(fx_digest) = epoch_store
1642 .get_signed_effects_digest(tx.digest())
1643 .expect("db error")
1644 {
1645 pending_consensus_certificates.push((tx, fx_digest));
1646 } else {
1647 additional_certs.push(tx);
1648 }
1649 }
1650 _ => (),
1651 }
1652 }
1653
1654 let digests = pending_consensus_certificates
1655 .iter()
1656 .map(|(tx, _)| *tx.digest())
1657 .collect::<Vec<_>>();
1658
1659 info!(
1660 "reexecuting {} pending consensus certificates: {:?}",
1661 digests.len(),
1662 digests
1663 );
1664
1665 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1666 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1667
1668 let timeout = if cfg!(msim) { 120 } else { 60 };
1674 if tokio::time::timeout(
1675 std::time::Duration::from_secs(timeout),
1676 state
1677 .get_transaction_cache_reader()
1678 .try_notify_read_executed_effects_digests(&digests),
1679 )
1680 .await
1681 .is_err()
1682 {
1683 if let Ok(executed_effects_digests) = state
1685 .get_transaction_cache_reader()
1686 .try_multi_get_executed_effects_digests(&digests)
1687 {
1688 let pending_digests = digests
1689 .iter()
1690 .zip(executed_effects_digests.iter())
1691 .filter_map(|(digest, executed_effects_digest)| {
1692 if executed_effects_digest.is_none() {
1693 Some(digest)
1694 } else {
1695 None
1696 }
1697 })
1698 .collect::<Vec<_>>();
1699 debug_fatal!(
1700 "Timed out waiting for effects digests to be executed: {:?}",
1701 pending_digests
1702 );
1703 } else {
1704 debug_fatal!(
1705 "Timed out waiting for effects digests to be executed, digests not found"
1706 );
1707 }
1708 }
1709 }
1710
1711 pub fn state(&self) -> Arc<AuthorityState> {
1712 self.state.clone()
1713 }
1714
1715 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1717 self.state.reference_gas_price_for_testing()
1718 }
1719
1720 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1721 self.state.committee_store().clone()
1722 }
1723
1724 pub fn clone_authority_aggregator(
1734 &self,
1735 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1736 self.transaction_orchestrator
1737 .as_ref()
1738 .map(|to| to.clone_authority_aggregator())
1739 }
1740
1741 pub fn transaction_orchestrator(
1742 &self,
1743 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1744 self.transaction_orchestrator.clone()
1745 }
1746
1747 pub fn subscribe_to_transaction_orchestrator_effects(
1748 &self,
1749 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1750 self.transaction_orchestrator
1751 .as_ref()
1752 .map(|to| to.subscribe_to_effects_queue())
1753 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1754 }
1755
1756 pub async fn monitor_reconfiguration(
1762 self: Arc<Self>,
1763 mut epoch_store: Arc<AuthorityPerEpochStore>,
1764 ) -> Result<()> {
1765 let checkpoint_executor_metrics =
1766 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1767
1768 loop {
1769 let mut accumulator_guard = self.accumulator.lock().await;
1770 let accumulator = accumulator_guard.take().unwrap();
1771 info!(
1772 "Creating checkpoint executor for epoch {}",
1773 epoch_store.epoch()
1774 );
1775
1776 let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1778 guard.as_ref().map(|handle| {
1779 let tx = handle.checkpoint_data_broadcaster().clone();
1780 Box::new(move |data: &CheckpointData| {
1781 tx.send_traced(data);
1782 }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1783 })
1784 } else {
1785 None
1786 };
1787
1788 let checkpoint_executor = CheckpointExecutor::new(
1789 epoch_store.clone(),
1790 self.checkpoint_store.clone(),
1791 self.state.clone(),
1792 accumulator.clone(),
1793 self.backpressure_manager.clone(),
1794 self.config.checkpoint_executor_config.clone(),
1795 checkpoint_executor_metrics.clone(),
1796 data_sender,
1797 );
1798
1799 let run_with_range = self.config.run_with_range;
1800
1801 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1802
1803 if let Some(components) = &*self.validator_components.lock().await {
1805 tokio::time::sleep(Duration::from_millis(1)).await;
1807
1808 let config = cur_epoch_store.protocol_config();
1809 let binary_config = to_binary_config(config);
1810 let transaction = ConsensusTransaction::new_capability_notification_v1(
1811 AuthorityCapabilitiesV1::new(
1812 self.state.name,
1813 cur_epoch_store.get_chain_identifier().chain(),
1814 self.config
1815 .supported_protocol_versions
1816 .expect("Supported versions should be populated")
1817 .truncate_below(config.version),
1819 self.state
1820 .get_available_system_packages(&binary_config)
1821 .await,
1822 ),
1823 );
1824 info!(?transaction, "submitting capabilities to consensus");
1825 components
1826 .consensus_adapter
1827 .submit(transaction, None, &cur_epoch_store)?;
1828 } else if self.state.is_active_validator(&cur_epoch_store)
1829 && cur_epoch_store
1830 .protocol_config()
1831 .track_non_committee_eligible_validators()
1832 {
1833 let epoch_store = cur_epoch_store.clone();
1837 let node_clone = self.clone();
1838 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1839 node_clone
1840 .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1841 .instrument(trace_span!(
1842 "send_signed_capability_notification_to_committee_with_retry"
1843 ))
1844 .await;
1845 }));
1846 }
1847
1848 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1849
1850 if stop_condition == StopReason::RunWithRangeCondition {
1851 IotaNode::shutdown(&self).await;
1852 self.shutdown_channel_tx
1853 .send(run_with_range)
1854 .expect("RunWithRangeCondition met but failed to send shutdown message");
1855 return Ok(());
1856 }
1857
1858 let latest_system_state = self
1860 .state
1861 .get_object_cache_reader()
1862 .try_get_iota_system_state_object_unsafe()
1863 .expect("Read IOTA System State object cannot fail");
1864
1865 #[cfg(msim)]
1866 if !self
1867 .sim_state
1868 .sim_safe_mode_expected
1869 .load(Ordering::Relaxed)
1870 {
1871 debug_assert!(!latest_system_state.safe_mode());
1872 }
1873
1874 #[cfg(not(msim))]
1875 debug_assert!(!latest_system_state.safe_mode());
1876
1877 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1878 if self.state.is_fullnode(&cur_epoch_store) {
1879 warn!(
1880 "Failed to send end of epoch notification to subscriber: {:?}",
1881 err
1882 );
1883 }
1884 }
1885
1886 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1887 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1888
1889 self.auth_agg.store(Arc::new(
1890 self.auth_agg
1891 .load()
1892 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1893 ));
1894
1895 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1896 let next_epoch = next_epoch_committee.epoch();
1897 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1898
1899 info!(
1900 next_epoch,
1901 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1902 );
1903
1904 fail_point_async!("reconfig_delay");
1905
1906 let authority_names_to_peer_ids =
1911 new_epoch_start_state.get_authority_names_to_peer_ids();
1912 self.connection_monitor_status
1913 .update_mapping_for_epoch(authority_names_to_peer_ids);
1914
1915 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1916
1917 send_trusted_peer_change(
1918 &self.config,
1919 &self.trusted_peer_change_tx,
1920 &new_epoch_start_state,
1921 );
1922
1923 let mut validator_components_lock_guard = self.validator_components.lock().await;
1924
1925 let new_epoch_store = self
1929 .reconfigure_state(
1930 &self.state,
1931 &cur_epoch_store,
1932 next_epoch_committee.clone(),
1933 new_epoch_start_state,
1934 accumulator.clone(),
1935 )
1936 .await?;
1937
1938 let new_validator_components = if let Some(ValidatorComponents {
1939 validator_server_handle,
1940 validator_overload_monitor_handle,
1941 consensus_manager,
1942 consensus_store_pruner,
1943 consensus_adapter,
1944 mut checkpoint_service_tasks,
1945 checkpoint_metrics,
1946 iota_tx_validator_metrics,
1947 validator_registry_id,
1948 }) = validator_components_lock_guard.take()
1949 {
1950 info!("Reconfiguring the validator.");
1951 checkpoint_service_tasks.abort_all();
1956 while let Some(result) = checkpoint_service_tasks.join_next().await {
1957 if let Err(err) = result {
1958 if err.is_panic() {
1959 std::panic::resume_unwind(err.into_panic());
1960 }
1961 warn!("Error in checkpoint service task: {:?}", err);
1962 }
1963 }
1964 info!("Checkpoint service has shut down.");
1965
1966 consensus_manager.shutdown().await;
1967 info!("Consensus has shut down.");
1968
1969 info!("Epoch store finished reconfiguration.");
1970
1971 let accumulator_metrics = Arc::into_inner(accumulator)
1974 .expect("Accumulator should have no other references at this point")
1975 .metrics();
1976 let new_accumulator = Arc::new(StateAccumulator::new(
1977 self.state.get_accumulator_store().clone(),
1978 accumulator_metrics,
1979 ));
1980 let weak_accumulator = Arc::downgrade(&new_accumulator);
1981 *accumulator_guard = Some(new_accumulator);
1982
1983 consensus_store_pruner.prune(next_epoch).await;
1984
1985 if self.state.is_committee_validator(&new_epoch_store) {
1986 Some(
1988 Self::start_epoch_specific_validator_components(
1989 &self.config,
1990 self.state.clone(),
1991 consensus_adapter,
1992 self.checkpoint_store.clone(),
1993 new_epoch_store.clone(),
1994 self.state_sync_handle.clone(),
1995 self.randomness_handle.clone(),
1996 consensus_manager,
1997 consensus_store_pruner,
1998 weak_accumulator,
1999 self.backpressure_manager.clone(),
2000 validator_server_handle,
2001 validator_overload_monitor_handle,
2002 checkpoint_metrics,
2003 self.metrics.clone(),
2004 iota_tx_validator_metrics,
2005 validator_registry_id,
2006 )
2007 .await?,
2008 )
2009 } else {
2010 info!("This node is no longer a validator after reconfiguration");
2011 if self.registry_service.remove(validator_registry_id) {
2012 debug!("Removed validator metrics registry");
2013 } else {
2014 warn!("Failed to remove validator metrics registry");
2015 }
2016 validator_server_handle.shutdown();
2017 debug!("Validator grpc server shutdown triggered");
2018
2019 None
2020 }
2021 } else {
2022 let accumulator_metrics = Arc::into_inner(accumulator)
2025 .expect("Accumulator should have no other references at this point")
2026 .metrics();
2027 let new_accumulator = Arc::new(StateAccumulator::new(
2028 self.state.get_accumulator_store().clone(),
2029 accumulator_metrics,
2030 ));
2031 let weak_accumulator = Arc::downgrade(&new_accumulator);
2032 *accumulator_guard = Some(new_accumulator);
2033
2034 if self.state.is_committee_validator(&new_epoch_store) {
2035 info!("Promoting the node from fullnode to validator, starting grpc server");
2036
2037 let mut components = Self::construct_validator_components(
2038 self.config.clone(),
2039 self.state.clone(),
2040 Arc::new(next_epoch_committee.clone()),
2041 new_epoch_store.clone(),
2042 self.checkpoint_store.clone(),
2043 self.state_sync_handle.clone(),
2044 self.randomness_handle.clone(),
2045 weak_accumulator,
2046 self.backpressure_manager.clone(),
2047 self.connection_monitor_status.clone(),
2048 &self.registry_service,
2049 self.metrics.clone(),
2050 )
2051 .await?;
2052
2053 components.validator_server_handle =
2054 components.validator_server_handle.start().await;
2055
2056 Some(components)
2057 } else {
2058 None
2059 }
2060 };
2061 *validator_components_lock_guard = new_validator_components;
2062
2063 cur_epoch_store.release_db_handles();
2066
2067 if cfg!(msim)
2068 && !matches!(
2069 self.config
2070 .authority_store_pruning_config
2071 .num_epochs_to_retain_for_checkpoints(),
2072 None | Some(u64::MAX) | Some(0)
2073 )
2074 {
2075 self.state
2076 .prune_checkpoints_for_eligible_epochs_for_testing(
2077 self.config.clone(),
2078 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2079 )
2080 .await?;
2081 }
2082
2083 epoch_store = new_epoch_store;
2084 info!("Reconfiguration finished");
2085 }
2086 }
2087
2088 async fn shutdown(&self) {
2089 if let Some(validator_components) = &*self.validator_components.lock().await {
2090 validator_components.consensus_manager.shutdown().await;
2091 }
2092
2093 if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2095 info!("Shutting down gRPC server");
2096 if let Err(e) = grpc_handle.shutdown().await {
2097 warn!("Failed to gracefully shutdown gRPC server: {e}");
2098 }
2099 }
2100 }
2101
2102 async fn reconfigure_state(
2105 &self,
2106 state: &Arc<AuthorityState>,
2107 cur_epoch_store: &AuthorityPerEpochStore,
2108 next_epoch_committee: Committee,
2109 next_epoch_start_system_state: EpochStartSystemState,
2110 accumulator: Arc<StateAccumulator>,
2111 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2112 let next_epoch = next_epoch_committee.epoch();
2113
2114 let last_checkpoint = self
2115 .checkpoint_store
2116 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2117 .expect("Error loading last checkpoint for current epoch")
2118 .expect("Could not load last checkpoint for current epoch");
2119 let epoch_supply_change = last_checkpoint
2120 .end_of_epoch_data
2121 .as_ref()
2122 .ok_or_else(|| {
2123 IotaError::from("last checkpoint in epoch should contain end of epoch data")
2124 })?
2125 .epoch_supply_change;
2126
2127 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2128
2129 assert_eq!(
2130 Some(last_checkpoint_seq),
2131 self.checkpoint_store
2132 .get_highest_executed_checkpoint_seq_number()
2133 .expect("Error loading highest executed checkpoint sequence number")
2134 );
2135
2136 let epoch_start_configuration = EpochStartConfiguration::new(
2137 next_epoch_start_system_state,
2138 *last_checkpoint.digest(),
2139 state.get_object_store().as_ref(),
2140 EpochFlag::default_flags_for_new_epoch(&state.config),
2141 )
2142 .expect("EpochStartConfiguration construction cannot fail");
2143
2144 let new_epoch_store = self
2145 .state
2146 .reconfigure(
2147 cur_epoch_store,
2148 self.config.supported_protocol_versions.unwrap(),
2149 next_epoch_committee,
2150 epoch_start_configuration,
2151 accumulator,
2152 &self.config.expensive_safety_check_config,
2153 epoch_supply_change,
2154 last_checkpoint_seq,
2155 )
2156 .await
2157 .expect("Reconfigure authority state cannot fail");
2158 info!(next_epoch, "Node State has been reconfigured");
2159 assert_eq!(next_epoch, new_epoch_store.epoch());
2160 self.state.get_reconfig_api().update_epoch_flags_metrics(
2161 cur_epoch_store.epoch_start_config().flags(),
2162 new_epoch_store.epoch_start_config().flags(),
2163 );
2164
2165 Ok(new_epoch_store)
2166 }
2167
2168 pub fn get_config(&self) -> &NodeConfig {
2169 &self.config
2170 }
2171
2172 async fn execute_transaction_immediately_at_zero_epoch(
2173 state: &Arc<AuthorityState>,
2174 epoch_store: &Arc<AuthorityPerEpochStore>,
2175 tx: &Transaction,
2176 span: tracing::Span,
2177 ) {
2178 let _guard = span.enter();
2179 let transaction =
2180 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2181 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2182 tx.data().clone(),
2183 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2184 ),
2185 );
2186 state
2187 .try_execute_immediately(&transaction, None, epoch_store)
2188 .unwrap();
2189 }
2190
2191 pub fn randomness_handle(&self) -> randomness::Handle {
2192 self.randomness_handle.clone()
2193 }
2194
2195 async fn send_signed_capability_notification_to_committee_with_retry(
2201 &self,
2202 epoch_store: &Arc<AuthorityPerEpochStore>,
2203 ) {
2204 const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2205 const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2206 const MAX_RETRY_INTERVAL_SECS: u64 = 300; let config = epoch_store.protocol_config();
2210 let binary_config = to_binary_config(config);
2211
2212 let capabilities = AuthorityCapabilitiesV1::new(
2214 self.state.name,
2215 epoch_store.get_chain_identifier().chain(),
2216 self.config
2217 .supported_protocol_versions
2218 .expect("Supported versions should be populated")
2219 .truncate_below(config.version),
2220 self.state
2221 .get_available_system_packages(&binary_config)
2222 .await,
2223 );
2224
2225 let signature = AuthoritySignature::new_secure(
2227 &IntentMessage::new(
2228 Intent::iota_app(IntentScope::AuthorityCapabilities),
2229 &capabilities,
2230 ),
2231 &epoch_store.epoch(),
2232 self.config.authority_key_pair(),
2233 );
2234
2235 let request = HandleCapabilityNotificationRequestV1 {
2236 message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2237 };
2238
2239 let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2240
2241 loop {
2242 let auth_agg = self.auth_agg.load();
2243 match auth_agg
2244 .send_capability_notification_to_quorum(request.clone())
2245 .await
2246 {
2247 Ok(_) => {
2248 info!("Successfully sent capability notification to committee");
2249 break;
2250 }
2251 Err(err) => {
2252 match &err {
2253 AggregatorSendCapabilityNotificationError::RetryableNotification {
2254 errors,
2255 } => {
2256 warn!(
2257 "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2258 retry_interval, errors
2259 );
2260 }
2261 AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2262 errors,
2263 } => {
2264 error!(
2265 "Failed to send capability notification to committee (non-retryable error): {:?}",
2266 errors
2267 );
2268 break;
2269 }
2270 };
2271
2272 tokio::time::sleep(retry_interval).await;
2274
2275 retry_interval = std::cmp::min(
2277 retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2278 Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2279 );
2280 }
2281 }
2282 }
2283 }
2284}
2285
2286#[cfg(not(msim))]
2287impl IotaNode {
2288 async fn fetch_jwks(
2289 _authority: AuthorityName,
2290 provider: &OIDCProvider,
2291 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2292 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2293 let client = reqwest::Client::new();
2294 fetch_jwks(provider, &client)
2295 .await
2296 .map_err(|_| IotaError::JWKRetrieval)
2297 }
2298}
2299
2300#[cfg(msim)]
2301impl IotaNode {
2302 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2303 self.sim_state.sim_node.id()
2304 }
2305
2306 pub fn set_safe_mode_expected(&self, new_value: bool) {
2307 info!("Setting safe mode expected to {}", new_value);
2308 self.sim_state
2309 .sim_safe_mode_expected
2310 .store(new_value, Ordering::Relaxed);
2311 }
2312
2313 async fn fetch_jwks(
2314 authority: AuthorityName,
2315 provider: &OIDCProvider,
2316 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2317 get_jwk_injector()(authority, provider)
2318 }
2319}
2320
2321enum SpawnOnce {
2322 Unstarted(Mutex<BoxFuture<'static, Result<iota_network_stack::server::Server>>>),
2324 #[allow(unused)]
2325 Started(iota_http::ServerHandle),
2326}
2327
2328impl SpawnOnce {
2329 pub fn new(
2330 future: impl Future<Output = Result<iota_network_stack::server::Server>> + Send + 'static,
2331 ) -> Self {
2332 Self::Unstarted(Mutex::new(Box::pin(future)))
2333 }
2334
2335 pub async fn start(self) -> Self {
2336 match self {
2337 Self::Unstarted(future) => {
2338 let server = future
2339 .into_inner()
2340 .await
2341 .unwrap_or_else(|err| panic!("Failed to start validator gRPC server: {err}"));
2342 let handle = server.handle().clone();
2343 tokio::spawn(async move {
2344 if let Err(err) = server.serve().await {
2345 info!("Server stopped: {err}");
2346 }
2347 info!("Server stopped");
2348 });
2349 Self::Started(handle)
2350 }
2351 Self::Started(_) => self,
2352 }
2353 }
2354
2355 pub fn shutdown(self) {
2356 if let SpawnOnce::Started(handle) = self {
2357 handle.trigger_shutdown();
2358 }
2359 }
2360}
2361
2362fn send_trusted_peer_change(
2365 config: &NodeConfig,
2366 sender: &watch::Sender<TrustedPeerChangeEvent>,
2367 new_epoch_start_state: &EpochStartSystemState,
2368) {
2369 let new_committee =
2370 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2371
2372 sender.send_modify(|event| {
2373 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2374 event.new_committee = new_committee;
2375 })
2376}
2377
2378fn build_kv_store(
2379 state: &Arc<AuthorityState>,
2380 config: &NodeConfig,
2381 registry: &Registry,
2382) -> Result<Arc<TransactionKeyValueStore>> {
2383 let metrics = KeyValueStoreMetrics::new(registry);
2384 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2385
2386 let base_url = &config.transaction_kv_store_read_config.base_url;
2387
2388 if base_url.is_empty() {
2389 info!("no http kv store url provided, using local db only");
2390 return Ok(Arc::new(db_store));
2391 }
2392
2393 base_url.parse::<url::Url>().tap_err(|e| {
2394 error!(
2395 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2396 base_url, e
2397 )
2398 })?;
2399
2400 let http_store = HttpKVStore::new_kv(
2401 base_url,
2402 config.transaction_kv_store_read_config.cache_size,
2403 metrics.clone(),
2404 )?;
2405 info!("using local key-value store with fallback to http key-value store");
2406 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2407 db_store,
2408 http_store,
2409 metrics,
2410 "json_rpc_fallback",
2411 )))
2412}
2413
2414async fn build_grpc_server(
2429 config: &NodeConfig,
2430 state: Arc<AuthorityState>,
2431 state_sync_store: RocksDbStore,
2432 executor: Option<Arc<dyn iota_types::transaction_executor::TransactionExecutor>>,
2433 prometheus_registry: &Registry,
2434) -> Result<Option<GrpcServerHandle>> {
2435 if config.consensus_config().is_some() || !config.enable_grpc_api {
2437 return Ok(None);
2438 }
2439
2440 let Some(grpc_config) = &config.grpc_api_config else {
2441 return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2442 };
2443
2444 let chain_id = state.get_chain_identifier();
2446
2447 let rest_read_store = Arc::new(RestReadStore::new(state.clone(), state_sync_store));
2448
2449 let shutdown_token = CancellationToken::new();
2451
2452 let grpc_reader = Arc::new(GrpcReader::from_rest_state_reader(
2454 rest_read_store,
2455 Some(env!("CARGO_PKG_VERSION").to_string()),
2456 ));
2457
2458 let grpc_server_metrics = iota_grpc_server::GrpcServerMetrics::new(prometheus_registry);
2460
2461 let handle = start_grpc_server(
2464 grpc_reader,
2465 executor,
2466 grpc_config.clone(),
2467 shutdown_token,
2468 chain_id,
2469 Some(grpc_server_metrics),
2470 )
2471 .await?;
2472
2473 Ok(Some(handle))
2474}
2475
2476pub async fn build_http_server(
2493 state: Arc<AuthorityState>,
2494 store: RocksDbStore,
2495 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2496 config: &NodeConfig,
2497 prometheus_registry: &Registry,
2498 _custom_runtime: Option<Handle>,
2499 software_version: &'static str,
2500) -> Result<Option<iota_http::ServerHandle>> {
2501 if config.consensus_config().is_some() {
2503 return Ok(None);
2504 }
2505
2506 let mut router = axum::Router::new();
2507
2508 let json_rpc_router = {
2509 let mut server = JsonRpcServerBuilder::new(
2510 env!("CARGO_PKG_VERSION"),
2511 prometheus_registry,
2512 config.policy_config.clone(),
2513 config.firewall_config.clone(),
2514 );
2515
2516 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2517
2518 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2519 server.register_module(ReadApi::new(
2520 state.clone(),
2521 kv_store.clone(),
2522 metrics.clone(),
2523 ))?;
2524 server.register_module(CoinReadApi::new(
2525 state.clone(),
2526 kv_store.clone(),
2527 metrics.clone(),
2528 )?)?;
2529
2530 if config.run_with_range.is_none() {
2533 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2534 }
2535 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2536
2537 if let Some(transaction_orchestrator) = transaction_orchestrator {
2538 server.register_module(TransactionExecutionApi::new(
2539 state.clone(),
2540 transaction_orchestrator.clone(),
2541 metrics.clone(),
2542 ))?;
2543 }
2544
2545 let iota_names_config = config
2546 .iota_names_config
2547 .clone()
2548 .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2549
2550 server.register_module(IndexerApi::new(
2551 state.clone(),
2552 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2553 kv_store,
2554 metrics,
2555 iota_names_config,
2556 config.indexer_max_subscriptions,
2557 ))?;
2558 server.register_module(MoveUtils::new(state.clone()))?;
2559
2560 let server_type = config.jsonrpc_server_type();
2561
2562 server.to_router(server_type).await?
2563 };
2564
2565 router = router.merge(json_rpc_router);
2566
2567 if config.enable_rest_api {
2568 let mut rest_service = iota_rest_api::RestService::new(
2569 Arc::new(RestReadStore::new(state.clone(), store)),
2570 software_version,
2571 );
2572
2573 if let Some(config) = config.rest.clone() {
2574 rest_service.with_config(config);
2575 }
2576
2577 rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2578
2579 if let Some(transaction_orchestrator) = transaction_orchestrator {
2580 rest_service.with_executor(transaction_orchestrator.clone())
2581 }
2582
2583 router = router.merge(rest_service.into_router());
2584 }
2585
2586 router = router
2589 .route("/health", axum::routing::get(health_check_handler))
2590 .route_layer(axum::Extension(state));
2591
2592 let layers = ServiceBuilder::new()
2593 .map_request(|mut request: axum::http::Request<_>| {
2594 if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2595 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2596 request.extensions_mut().insert(axum_connect_info);
2597 }
2598 request
2599 })
2600 .layer(axum::middleware::from_fn(server_timing_middleware));
2601
2602 router = router.layer(layers);
2603
2604 let handle = iota_http::Builder::new()
2605 .serve(&config.json_rpc_address, router)
2606 .map_err(|e| anyhow::anyhow!("{e}"))?;
2607 info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2608
2609 Ok(Some(handle))
2610}
2611
2612#[derive(Debug, serde::Serialize, serde::Deserialize)]
2613pub struct Threshold {
2614 pub threshold_seconds: Option<u32>,
2615}
2616
2617async fn health_check_handler(
2618 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2619 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2620) -> impl axum::response::IntoResponse {
2621 if let Some(threshold_seconds) = threshold_seconds {
2622 let summary = match state
2624 .get_checkpoint_store()
2625 .get_highest_executed_checkpoint()
2626 {
2627 Ok(Some(summary)) => summary,
2628 Ok(None) => {
2629 warn!("Highest executed checkpoint not found");
2630 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2631 }
2632 Err(err) => {
2633 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2634 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2635 }
2636 };
2637
2638 let latest_chain_time = summary.timestamp();
2640 let threshold =
2641 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2642
2643 if latest_chain_time < threshold {
2645 warn!(
2646 ?latest_chain_time,
2647 ?threshold,
2648 "failing health check due to checkpoint lag"
2649 );
2650 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2651 }
2652 }
2653 (axum::http::StatusCode::OK, "up")
2655}
2656
2657#[cfg(not(test))]
2658fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2659 protocol_config.max_transactions_per_checkpoint() as usize
2660}
2661
2662#[cfg(test)]
2663fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2664 2
2665}