1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::{BTreeSet, HashMap, HashSet},
9 fmt,
10 future::Future,
11 path::PathBuf,
12 str::FromStr,
13 sync::{Arc, Weak},
14 time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19 callback::CallbackLayer,
20 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::{TryFutureExt, future::BoxFuture};
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30 ConsensusConfig, NodeConfig,
31 node::{DBCheckpointConfig, RunWithRange},
32 node_config_metrics::NodeConfigMetrics,
33 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36 authority::{
37 AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38 authority_per_epoch_store::AuthorityPerEpochStore,
39 authority_store_pruner::ObjectsCompactionFilter,
40 authority_store_tables::{
41 AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42 },
43 backpressure::BackpressureManager,
44 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45 },
46 authority_aggregator::{
47 AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
48 },
49 authority_client::NetworkAuthorityClient,
50 authority_server::{ValidatorService, ValidatorServiceMetrics},
51 checkpoints::{
52 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
53 SubmitCheckpointToConsensus,
54 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
55 },
56 connection_monitor::ConnectionMonitor,
57 consensus_adapter::{
58 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
59 ConsensusClient,
60 },
61 consensus_handler::ConsensusHandlerInitializer,
62 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
63 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
64 db_checkpoint_handler::DBCheckpointHandler,
65 epoch::{
66 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
67 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
68 reconfiguration::ReconfigurationInitiator,
69 },
70 execution_cache::build_execution_cache,
71 jsonrpc_index::IndexStore,
72 module_cache_metrics::ResolverMetrics,
73 overload_monitor::overload_monitor,
74 rest_index::RestIndexStore,
75 safe_client::SafeClientMetricsBase,
76 signature_verifier::SignatureVerifierMetrics,
77 state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
78 storage::{RestReadStore, RocksDbStore},
79 traffic_controller::metrics::TrafficControllerMetrics,
80 transaction_orchestrator::TransactionOrchestrator,
81 validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_api::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87 transaction_builder_api::TransactionBuilderApi,
88 transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93 RegistryID, RegistryService,
94 hardware_metrics::register_hardware_metrics,
95 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96 server_timing_middleware, spawn_monitored_task,
97};
98use iota_network::{
99 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
100};
101use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
102use iota_protocol_config::ProtocolConfig;
103use iota_rest_api::RestMetrics;
104use iota_snapshot::uploader::StateSnapshotUploader;
105use iota_storage::{
106 FileCompression, StorageFormat,
107 http_key_value_store::HttpKVStore,
108 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
109 key_value_store_metrics::KeyValueStoreMetrics,
110};
111use iota_types::{
112 base_types::{AuthorityName, ConciseableName, EpochId},
113 committee::Committee,
114 crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
115 digests::ChainIdentifier,
116 error::{IotaError, IotaResult},
117 executable_transaction::VerifiedExecutableTransaction,
118 execution_config_utils::to_binary_config,
119 full_checkpoint_content::CheckpointData,
120 iota_system_state::{
121 IotaSystemState, IotaSystemStateTrait,
122 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
123 },
124 messages_checkpoint::CertifiedCheckpointSummary,
125 messages_consensus::{
126 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
127 SignedAuthorityCapabilitiesV1, check_total_jwk_size,
128 },
129 messages_grpc::HandleCapabilityNotificationRequestV1,
130 quorum_driver_types::QuorumDriverEffectsQueueResult,
131 supported_protocol_versions::SupportedProtocolVersions,
132 transaction::{Transaction, VerifiedCertificate},
133};
134use prometheus::Registry;
135use shared_crypto::intent::{Intent, IntentMessage, IntentScope};
136#[cfg(msim)]
137pub use simulator::set_jwk_injector;
138#[cfg(msim)]
139use simulator::*;
140use tap::tap::TapFallible;
141use tokio::{
142 runtime::Handle,
143 sync::{Mutex, broadcast, mpsc, watch},
144 task::{JoinHandle, JoinSet},
145};
146use tokio_util::sync::CancellationToken;
147use tower::ServiceBuilder;
148use tracing::{Instrument, debug, error, error_span, info, warn};
149use typed_store::{
150 DBMetrics,
151 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
152};
153
154use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
155
156pub mod admin;
157mod handle;
158pub mod metrics;
159
160pub struct ValidatorComponents {
161 validator_server_spawn_handle: SpawnOnce,
162 validator_server_handle: iota_http::ServerHandle,
163 validator_overload_monitor_handle: Option<JoinHandle<()>>,
164 consensus_manager: ConsensusManager,
165 consensus_store_pruner: ConsensusStorePruner,
166 consensus_adapter: Arc<ConsensusAdapter>,
167 checkpoint_service_tasks: JoinSet<()>,
169 checkpoint_metrics: Arc<CheckpointMetrics>,
170 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
171 validator_registry_id: RegistryID,
172}
173
174#[cfg(msim)]
175mod simulator {
176 use std::sync::atomic::AtomicBool;
177
178 use super::*;
179
180 pub(super) struct SimState {
181 pub sim_node: iota_simulator::runtime::NodeHandle,
182 pub sim_safe_mode_expected: AtomicBool,
183 _leak_detector: iota_simulator::NodeLeakDetector,
184 }
185
186 impl Default for SimState {
187 fn default() -> Self {
188 Self {
189 sim_node: iota_simulator::runtime::NodeHandle::current(),
190 sim_safe_mode_expected: AtomicBool::new(false),
191 _leak_detector: iota_simulator::NodeLeakDetector::new(),
192 }
193 }
194 }
195
196 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
197 + Send
198 + Sync
199 + 'static;
200
201 fn default_fetch_jwks(
202 _authority: AuthorityName,
203 _provider: &OIDCProvider,
204 ) -> IotaResult<Vec<(JwkId, JWK)>> {
205 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
206 parse_jwks(
208 iota_types::zk_login_util::DEFAULT_JWK_BYTES,
209 &OIDCProvider::Twitch,
210 )
211 .map_err(|_| IotaError::JWKRetrieval)
212 }
213
214 thread_local! {
215 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
216 }
217
218 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
219 JWK_INJECTOR.with(|injector| injector.borrow().clone())
220 }
221
222 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
223 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
224 }
225}
226
227pub struct IotaNode {
228 config: NodeConfig,
229 validator_components: Mutex<Option<ValidatorComponents>>,
230 _http_server: Option<iota_http::ServerHandle>,
233 state: Arc<AuthorityState>,
234 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
235 registry_service: RegistryService,
236 metrics: Arc<IotaNodeMetrics>,
237
238 _discovery: discovery::Handle,
239 state_sync_handle: state_sync::Handle,
240 randomness_handle: randomness::Handle,
241 checkpoint_store: Arc<CheckpointStore>,
242 accumulator: Mutex<Option<Arc<StateAccumulator>>>,
243 connection_monitor_status: Arc<ConnectionMonitorStatus>,
244
245 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
247
248 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
251
252 backpressure_manager: Arc<BackpressureManager>,
253
254 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
255
256 #[cfg(msim)]
257 sim_state: SimState,
258
259 _state_archive_handle: Option<broadcast::Sender<()>>,
260
261 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
262 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
264
265 grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
267
268 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
274}
275
276impl fmt::Debug for IotaNode {
277 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
278 f.debug_struct("IotaNode")
279 .field("name", &self.state.name.concise())
280 .finish()
281 }
282}
283
284static MAX_JWK_KEYS_PER_FETCH: usize = 100;
285
286impl IotaNode {
287 pub async fn start(
288 config: NodeConfig,
289 registry_service: RegistryService,
290 custom_rpc_runtime: Option<Handle>,
291 ) -> Result<Arc<IotaNode>> {
292 Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
293 }
294
295 fn start_jwk_updater(
300 config: &NodeConfig,
301 metrics: Arc<IotaNodeMetrics>,
302 authority: AuthorityName,
303 epoch_store: Arc<AuthorityPerEpochStore>,
304 consensus_adapter: Arc<ConsensusAdapter>,
305 ) {
306 let epoch = epoch_store.epoch();
307
308 let supported_providers = config
309 .zklogin_oauth_providers
310 .get(&epoch_store.get_chain_identifier().chain())
311 .unwrap_or(&BTreeSet::new())
312 .iter()
313 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
314 .collect::<Vec<_>>();
315
316 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
317
318 info!(
319 ?fetch_interval,
320 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
321 );
322
323 fn validate_jwk(
324 metrics: &Arc<IotaNodeMetrics>,
325 provider: &OIDCProvider,
326 id: &JwkId,
327 jwk: &JWK,
328 ) -> bool {
329 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
330 warn!(
331 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
332 id.iss, provider
333 );
334 metrics
335 .invalid_jwks
336 .with_label_values(&[&provider.to_string()])
337 .inc();
338 return false;
339 };
340
341 if iss_provider != *provider {
342 warn!(
343 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
344 id.iss, provider, iss_provider
345 );
346 metrics
347 .invalid_jwks
348 .with_label_values(&[&provider.to_string()])
349 .inc();
350 return false;
351 }
352
353 if !check_total_jwk_size(id, jwk) {
354 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
355 metrics
356 .invalid_jwks
357 .with_label_values(&[&provider.to_string()])
358 .inc();
359 return false;
360 }
361
362 true
363 }
364
365 for p in supported_providers.into_iter() {
374 let provider_str = p.to_string();
375 let epoch_store = epoch_store.clone();
376 let consensus_adapter = consensus_adapter.clone();
377 let metrics = metrics.clone();
378 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
379 async move {
380 let mut seen = HashSet::new();
383 loop {
384 info!("fetching JWK for provider {:?}", p);
385 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
386 match Self::fetch_jwks(authority, &p).await {
387 Err(e) => {
388 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
389 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
390 tokio::time::sleep(Duration::from_secs(30)).await;
392 continue;
393 }
394 Ok(mut keys) => {
395 metrics.total_jwks
396 .with_label_values(&[&provider_str])
397 .inc_by(keys.len() as u64);
398
399 keys.retain(|(id, jwk)| {
400 validate_jwk(&metrics, &p, id, jwk) &&
401 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
402 seen.insert((id.clone(), jwk.clone()))
403 });
404
405 metrics.unique_jwks
406 .with_label_values(&[&provider_str])
407 .inc_by(keys.len() as u64);
408
409 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
412 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
413 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
414 }
415
416 for (id, jwk) in keys.into_iter() {
417 info!("Submitting JWK to consensus: {:?}", id);
418
419 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
420 consensus_adapter.submit(txn, None, &epoch_store)
421 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
422 .ok();
423 }
424 }
425 }
426 tokio::time::sleep(fetch_interval).await;
427 }
428 }
429 .instrument(error_span!("jwk_updater_task", epoch)),
430 ));
431 }
432 }
433
434 pub async fn start_async(
435 config: NodeConfig,
436 registry_service: RegistryService,
437 custom_rpc_runtime: Option<Handle>,
438 software_version: &'static str,
439 ) -> Result<Arc<IotaNode>> {
440 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
441 let mut config = config.clone();
442 if config.supported_protocol_versions.is_none() {
443 info!(
444 "populating config.supported_protocol_versions with default {:?}",
445 SupportedProtocolVersions::SYSTEM_DEFAULT
446 );
447 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
448 }
449
450 let run_with_range = config.run_with_range;
451 let is_validator = config.consensus_config().is_some();
452 let is_full_node = !is_validator;
453 let prometheus_registry = registry_service.default_registry();
454
455 info!(node =? config.authority_public_key(),
456 "Initializing iota-node listening on {}", config.network_address
457 );
458
459 let genesis = config.genesis()?.clone();
460
461 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
462 info!("IOTA chain identifier: {chain_identifier}");
463
464 let db_corrupted_path = &config.db_path().join("status");
466 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
467 panic!("Failed to check database corruption: {err}");
468 }
469
470 DBMetrics::init(&prometheus_registry);
472
473 iota_metrics::init_metrics(&prometheus_registry);
475 #[cfg(not(msim))]
478 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
479
480 register_hardware_metrics(®istry_service, &config.db_path)
482 .expect("Failed registering hardware metrics");
483 prometheus_registry
485 .register(iota_metrics::uptime_metric(
486 if is_validator {
487 "validator"
488 } else {
489 "fullnode"
490 },
491 software_version,
492 &chain_identifier.to_string(),
493 ))
494 .expect("Failed registering uptime metric");
495
496 let migration_tx_data = if genesis.contains_migrations() {
499 Some(config.load_migration_tx_data()?)
502 } else {
503 None
504 };
505
506 let secret = Arc::pin(config.authority_key_pair().copy());
507 let genesis_committee = genesis.committee()?;
508 let committee_store = Arc::new(CommitteeStore::new(
509 config.db_path().join("epochs"),
510 &genesis_committee,
511 None,
512 ));
513
514 let mut pruner_db = None;
515 if config
516 .authority_store_pruning_config
517 .enable_compaction_filter
518 {
519 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
520 &config.db_path().join("store"),
521 )));
522 }
523 let compaction_filter = pruner_db
524 .clone()
525 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
526
527 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
529 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
530 enable_write_stall,
531 compaction_filter,
532 };
533 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
534 &config.db_path().join("store"),
535 Some(perpetual_tables_options),
536 ));
537 let is_genesis = perpetual_tables
538 .database_is_empty()
539 .expect("Database read should not fail at init.");
540 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
541 let backpressure_manager =
542 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
543
544 let store = AuthorityStore::open(
545 perpetual_tables,
546 &genesis,
547 &config,
548 &prometheus_registry,
549 migration_tx_data.as_ref(),
550 )
551 .await?;
552
553 let cur_epoch = store.get_recovery_epoch_at_restart()?;
554 let committee = committee_store
555 .get_committee(&cur_epoch)?
556 .expect("Committee of the current epoch must exist");
557 let epoch_start_configuration = store
558 .get_epoch_start_configuration()?
559 .expect("EpochStartConfiguration of the current epoch must exist");
560 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
561 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
562
563 let cache_traits = build_execution_cache(
564 &config.execution_cache_config,
565 &epoch_start_configuration,
566 &prometheus_registry,
567 &store,
568 backpressure_manager.clone(),
569 );
570
571 let auth_agg = {
572 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
573 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
574 Arc::new(ArcSwap::new(Arc::new(
575 AuthorityAggregator::new_from_epoch_start_state(
576 epoch_start_configuration.epoch_start_state(),
577 &committee_store,
578 safe_client_metrics_base,
579 auth_agg_metrics,
580 ),
581 )))
582 };
583
584 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
585 let epoch_store = AuthorityPerEpochStore::new(
586 config.authority_public_key(),
587 committee.clone(),
588 &config.db_path().join("store"),
589 Some(epoch_options.options),
590 EpochMetrics::new(®istry_service.default_registry()),
591 epoch_start_configuration,
592 cache_traits.backing_package_store.clone(),
593 cache_traits.object_store.clone(),
594 cache_metrics,
595 signature_verifier_metrics,
596 &config.expensive_safety_check_config,
597 ChainIdentifier::from(*genesis.checkpoint().digest()),
598 checkpoint_store
599 .get_highest_executed_checkpoint_seq_number()
600 .expect("checkpoint store read cannot fail")
601 .unwrap_or(0),
602 );
603
604 info!("created epoch store");
605
606 replay_log!(
607 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
608 epoch_store.epoch(),
609 epoch_store.protocol_config()
610 );
611
612 if is_genesis {
614 info!("checking IOTA conservation at genesis");
615 cache_traits
620 .reconfig_api
621 .try_expensive_check_iota_conservation(&epoch_store, None)
622 .expect("IOTA conservation check cannot fail at genesis");
623 }
624
625 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
626 let default_buffer_stake = epoch_store
627 .protocol_config()
628 .buffer_stake_for_protocol_upgrade_bps();
629 if effective_buffer_stake != default_buffer_stake {
630 warn!(
631 ?effective_buffer_stake,
632 ?default_buffer_stake,
633 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
634 );
635 }
636
637 checkpoint_store.insert_genesis_checkpoint(
638 genesis.checkpoint(),
639 genesis.checkpoint_contents().clone(),
640 &epoch_store,
641 );
642
643 unmark_db_corruption(db_corrupted_path)?;
645
646 info!("creating state sync store");
647 let state_sync_store = RocksDbStore::new(
648 cache_traits.clone(),
649 committee_store.clone(),
650 checkpoint_store.clone(),
651 );
652
653 let index_store = if is_full_node && config.enable_index_processing {
654 info!("creating index store");
655 Some(Arc::new(IndexStore::new(
656 config.db_path().join("indexes"),
657 &prometheus_registry,
658 epoch_store
659 .protocol_config()
660 .max_move_identifier_len_as_option(),
661 config.remove_deprecated_tables,
662 )))
663 } else {
664 None
665 };
666
667 let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
668 {
669 Some(Arc::new(RestIndexStore::new(
670 config.db_path().join("rest_index"),
671 &store,
672 &checkpoint_store,
673 &epoch_store,
674 &cache_traits.backing_package_store,
675 )))
676 } else {
677 None
678 };
679
680 info!("creating archive reader");
681 let archive_readers =
686 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
687 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
688 let (randomness_tx, randomness_rx) = mpsc::channel(
689 config
690 .p2p_config
691 .randomness
692 .clone()
693 .unwrap_or_default()
694 .mailbox_capacity(),
695 );
696 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
697 Self::create_p2p_network(
698 &config,
699 state_sync_store.clone(),
700 chain_identifier,
701 trusted_peer_change_rx,
702 archive_readers.clone(),
703 randomness_tx,
704 &prometheus_registry,
705 )?;
706
707 send_trusted_peer_change(
710 &config,
711 &trusted_peer_change_tx,
712 epoch_store.epoch_start_state(),
713 );
714
715 info!("start state archival");
716 let state_archive_handle =
718 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
719 .await?;
720
721 info!("start snapshot upload");
722 let state_snapshot_handle =
724 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
725
726 info!("start db checkpoint");
728 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
729 &config,
730 &prometheus_registry,
731 state_snapshot_handle.is_some(),
732 )?;
733
734 let mut genesis_objects = genesis.objects().to_vec();
735 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
736 genesis_objects.extend(migration_tx_data.get_objects());
737 }
738
739 let authority_name = config.authority_public_key();
740 let validator_tx_finalizer =
741 config
742 .enable_validator_tx_finalizer
743 .then_some(Arc::new(ValidatorTxFinalizer::new(
744 auth_agg.clone(),
745 authority_name,
746 &prometheus_registry,
747 )));
748
749 info!("create authority state");
750 let state = AuthorityState::new(
751 authority_name,
752 secret,
753 config.supported_protocol_versions.unwrap(),
754 store.clone(),
755 cache_traits.clone(),
756 epoch_store.clone(),
757 committee_store.clone(),
758 index_store.clone(),
759 rest_index,
760 checkpoint_store.clone(),
761 &prometheus_registry,
762 &genesis_objects,
763 &db_checkpoint_config,
764 config.clone(),
765 archive_readers,
766 validator_tx_finalizer,
767 chain_identifier,
768 pruner_db,
769 )
770 .await;
771
772 if epoch_store.epoch() == 0 {
774 let genesis_tx = &genesis.transaction();
775 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
776 Self::execute_transaction_immediately_at_zero_epoch(
778 &state,
779 &epoch_store,
780 genesis_tx,
781 span,
782 )
783 .await;
784
785 if let Some(migration_tx_data) = migration_tx_data {
787 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
788 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
789 Self::execute_transaction_immediately_at_zero_epoch(
790 &state,
791 &epoch_store,
792 tx,
793 span,
794 )
795 .await;
796 }
797 }
798 }
799
800 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
803
804 if config
805 .expensive_safety_check_config
806 .enable_secondary_index_checks()
807 {
808 if let Some(indexes) = state.indexes.clone() {
809 iota_core::verify_indexes::verify_indexes(
810 state.get_accumulator_store().as_ref(),
811 indexes,
812 )
813 .expect("secondary indexes are inconsistent");
814 }
815 }
816
817 let (end_of_epoch_channel, end_of_epoch_receiver) =
818 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
819
820 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
821 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
822 auth_agg.load_full(),
823 state.clone(),
824 end_of_epoch_receiver,
825 &config.db_path(),
826 &prometheus_registry,
827 )))
828 } else {
829 None
830 };
831
832 let http_server = build_http_server(
833 state.clone(),
834 state_sync_store.clone(),
835 &transaction_orchestrator.clone(),
836 &config,
837 &prometheus_registry,
838 custom_rpc_runtime,
839 software_version,
840 )
841 .await?;
842
843 let accumulator = Arc::new(StateAccumulator::new(
844 cache_traits.accumulator_store.clone(),
845 StateAccumulatorMetrics::new(&prometheus_registry),
846 ));
847
848 let authority_names_to_peer_ids = epoch_store
849 .epoch_start_state()
850 .get_authority_names_to_peer_ids();
851
852 let network_connection_metrics =
853 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
854
855 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
856
857 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
858 p2p_network.downgrade(),
859 network_connection_metrics,
860 HashMap::new(),
861 None,
862 );
863
864 let connection_monitor_status = ConnectionMonitorStatus {
865 connection_statuses,
866 authority_names_to_peer_ids,
867 };
868
869 let connection_monitor_status = Arc::new(connection_monitor_status);
870 let iota_node_metrics =
871 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
872
873 let grpc_server_handle =
874 build_grpc_server(&config, state.clone(), state_sync_store.clone()).await?;
875
876 let validator_components = if state.is_committee_validator(&epoch_store) {
877 let (components, _) = futures::join!(
878 Self::construct_validator_components(
879 config.clone(),
880 state.clone(),
881 committee,
882 epoch_store.clone(),
883 checkpoint_store.clone(),
884 state_sync_handle.clone(),
885 randomness_handle.clone(),
886 Arc::downgrade(&accumulator),
887 backpressure_manager.clone(),
888 connection_monitor_status.clone(),
889 ®istry_service,
890 iota_node_metrics.clone(),
891 ),
892 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
893 );
894 let mut components = components?;
895
896 components.consensus_adapter.submit_recovered(&epoch_store);
897
898 components.validator_server_spawn_handle =
900 components.validator_server_spawn_handle.start();
901
902 Some(components)
903 } else {
904 None
905 };
906
907 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
909
910 let node = Self {
911 config,
912 validator_components: Mutex::new(validator_components),
913 _http_server: http_server,
914 state,
915 transaction_orchestrator,
916 registry_service,
917 metrics: iota_node_metrics,
918
919 _discovery: discovery_handle,
920 state_sync_handle,
921 randomness_handle,
922 checkpoint_store,
923 accumulator: Mutex::new(Some(accumulator)),
924 end_of_epoch_channel,
925 connection_monitor_status,
926 trusted_peer_change_tx,
927 backpressure_manager,
928
929 _db_checkpoint_handle: db_checkpoint_handle,
930
931 #[cfg(msim)]
932 sim_state: Default::default(),
933
934 _state_archive_handle: state_archive_handle,
935 _state_snapshot_uploader_handle: state_snapshot_handle,
936 shutdown_channel_tx: shutdown_channel,
937
938 grpc_server_handle: Mutex::new(grpc_server_handle),
939
940 auth_agg,
941 };
942
943 info!("IotaNode started!");
944 let node = Arc::new(node);
945 let node_copy = node.clone();
946 spawn_monitored_task!(async move {
947 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
948 if let Err(error) = result {
949 warn!("Reconfiguration finished with error {:?}", error);
950 }
951 });
952
953 Ok(node)
954 }
955
956 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
957 self.end_of_epoch_channel.subscribe()
958 }
959
960 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
961 self.shutdown_channel_tx.subscribe()
962 }
963
964 pub fn current_epoch_for_testing(&self) -> EpochId {
965 self.state.current_epoch_for_testing()
966 }
967
968 pub fn db_checkpoint_path(&self) -> PathBuf {
969 self.config.db_checkpoint_path()
970 }
971
972 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
974 info!("close_epoch (current epoch = {})", epoch_store.epoch());
975 self.validator_components
976 .lock()
977 .await
978 .as_ref()
979 .ok_or_else(|| IotaError::from("Node is not a validator"))?
980 .consensus_adapter
981 .close_epoch(epoch_store);
982 Ok(())
983 }
984
985 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
986 self.state
987 .clear_override_protocol_upgrade_buffer_stake(epoch)
988 }
989
990 pub fn set_override_protocol_upgrade_buffer_stake(
991 &self,
992 epoch: EpochId,
993 buffer_stake_bps: u64,
994 ) -> IotaResult {
995 self.state
996 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
997 }
998
999 pub async fn close_epoch_for_testing(&self) -> IotaResult {
1002 let epoch_store = self.state.epoch_store_for_testing();
1003 self.close_epoch(&epoch_store).await
1004 }
1005
1006 async fn start_state_archival(
1007 config: &NodeConfig,
1008 prometheus_registry: &Registry,
1009 state_sync_store: RocksDbStore,
1010 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1011 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1012 let local_store_config = ObjectStoreConfig {
1013 object_store: Some(ObjectStoreType::File),
1014 directory: Some(config.archive_path()),
1015 ..Default::default()
1016 };
1017 let archive_writer = ArchiveWriter::new(
1018 local_store_config,
1019 remote_store_config.clone(),
1020 FileCompression::Zstd,
1021 StorageFormat::Blob,
1022 Duration::from_secs(600),
1023 256 * 1024 * 1024,
1024 prometheus_registry,
1025 )
1026 .await?;
1027 Ok(Some(archive_writer.start(state_sync_store).await?))
1028 } else {
1029 Ok(None)
1030 }
1031 }
1032
1033 fn start_state_snapshot(
1036 config: &NodeConfig,
1037 prometheus_registry: &Registry,
1038 checkpoint_store: Arc<CheckpointStore>,
1039 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1040 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1041 let snapshot_uploader = StateSnapshotUploader::new(
1042 &config.db_checkpoint_path(),
1043 &config.snapshot_path(),
1044 remote_store_config.clone(),
1045 60,
1046 prometheus_registry,
1047 checkpoint_store,
1048 )?;
1049 Ok(Some(snapshot_uploader.start()))
1050 } else {
1051 Ok(None)
1052 }
1053 }
1054
1055 fn start_db_checkpoint(
1056 config: &NodeConfig,
1057 prometheus_registry: &Registry,
1058 state_snapshot_enabled: bool,
1059 ) -> Result<(
1060 DBCheckpointConfig,
1061 Option<tokio::sync::broadcast::Sender<()>>,
1062 )> {
1063 let checkpoint_path = Some(
1064 config
1065 .db_checkpoint_config
1066 .checkpoint_path
1067 .clone()
1068 .unwrap_or_else(|| config.db_checkpoint_path()),
1069 );
1070 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1071 DBCheckpointConfig {
1072 checkpoint_path,
1073 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1074 true
1075 } else {
1076 config
1077 .db_checkpoint_config
1078 .perform_db_checkpoints_at_epoch_end
1079 },
1080 ..config.db_checkpoint_config.clone()
1081 }
1082 } else {
1083 config.db_checkpoint_config.clone()
1084 };
1085
1086 match (
1087 db_checkpoint_config.object_store_config.as_ref(),
1088 state_snapshot_enabled,
1089 ) {
1090 (None, false) => Ok((db_checkpoint_config, None)),
1095 (_, _) => {
1096 let handler = DBCheckpointHandler::new(
1097 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1098 db_checkpoint_config.object_store_config.as_ref(),
1099 60,
1100 db_checkpoint_config
1101 .prune_and_compact_before_upload
1102 .unwrap_or(true),
1103 config.authority_store_pruning_config.clone(),
1104 prometheus_registry,
1105 state_snapshot_enabled,
1106 )?;
1107 Ok((
1108 db_checkpoint_config,
1109 Some(DBCheckpointHandler::start(handler)),
1110 ))
1111 }
1112 }
1113 }
1114
1115 fn create_p2p_network(
1116 config: &NodeConfig,
1117 state_sync_store: RocksDbStore,
1118 chain_identifier: ChainIdentifier,
1119 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1120 archive_readers: ArchiveReaderBalancer,
1121 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1122 prometheus_registry: &Registry,
1123 ) -> Result<(
1124 Network,
1125 discovery::Handle,
1126 state_sync::Handle,
1127 randomness::Handle,
1128 )> {
1129 let (state_sync, state_sync_server) = state_sync::Builder::new()
1130 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1131 .store(state_sync_store)
1132 .archive_readers(archive_readers)
1133 .with_metrics(prometheus_registry)
1134 .build();
1135
1136 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1137 .config(config.p2p_config.clone())
1138 .build();
1139
1140 let (randomness, randomness_router) =
1141 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1142 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1143 .with_metrics(prometheus_registry)
1144 .build();
1145
1146 let p2p_network = {
1147 let routes = anemo::Router::new()
1148 .add_rpc_service(discovery_server)
1149 .add_rpc_service(state_sync_server);
1150 let routes = routes.merge(randomness_router);
1151
1152 let inbound_network_metrics =
1153 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1154 let outbound_network_metrics =
1155 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1156
1157 let service = ServiceBuilder::new()
1158 .layer(
1159 TraceLayer::new_for_server_errors()
1160 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1161 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1162 )
1163 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1164 Arc::new(inbound_network_metrics),
1165 config.p2p_config.excessive_message_size(),
1166 )))
1167 .service(routes);
1168
1169 let outbound_layer = ServiceBuilder::new()
1170 .layer(
1171 TraceLayer::new_for_client_and_server_errors()
1172 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1173 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1174 )
1175 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1176 Arc::new(outbound_network_metrics),
1177 config.p2p_config.excessive_message_size(),
1178 )))
1179 .into_inner();
1180
1181 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1182 anemo_config.max_frame_size = Some(1 << 30);
1185
1186 let mut quic_config = anemo_config.quic.unwrap_or_default();
1189 if quic_config.socket_send_buffer_size.is_none() {
1190 quic_config.socket_send_buffer_size = Some(20 << 20);
1191 }
1192 if quic_config.socket_receive_buffer_size.is_none() {
1193 quic_config.socket_receive_buffer_size = Some(20 << 20);
1194 }
1195 quic_config.allow_failed_socket_buffer_size_setting = true;
1196
1197 if quic_config.max_concurrent_bidi_streams.is_none() {
1200 quic_config.max_concurrent_bidi_streams = Some(500);
1201 }
1202 if quic_config.max_concurrent_uni_streams.is_none() {
1203 quic_config.max_concurrent_uni_streams = Some(500);
1204 }
1205 if quic_config.stream_receive_window.is_none() {
1206 quic_config.stream_receive_window = Some(100 << 20);
1207 }
1208 if quic_config.receive_window.is_none() {
1209 quic_config.receive_window = Some(200 << 20);
1210 }
1211 if quic_config.send_window.is_none() {
1212 quic_config.send_window = Some(200 << 20);
1213 }
1214 if quic_config.crypto_buffer_size.is_none() {
1215 quic_config.crypto_buffer_size = Some(1 << 20);
1216 }
1217 if quic_config.max_idle_timeout_ms.is_none() {
1218 quic_config.max_idle_timeout_ms = Some(30_000);
1219 }
1220 if quic_config.keep_alive_interval_ms.is_none() {
1221 quic_config.keep_alive_interval_ms = Some(5_000);
1222 }
1223 anemo_config.quic = Some(quic_config);
1224
1225 let server_name = format!("iota-{chain_identifier}");
1226 let network = Network::bind(config.p2p_config.listen_address)
1227 .server_name(&server_name)
1228 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1229 .config(anemo_config)
1230 .outbound_request_layer(outbound_layer)
1231 .start(service)?;
1232 info!(
1233 server_name = server_name,
1234 "P2p network started on {}",
1235 network.local_addr()
1236 );
1237
1238 network
1239 };
1240
1241 let discovery_handle =
1242 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1243 let state_sync_handle = state_sync.start(p2p_network.clone());
1244 let randomness_handle = randomness.start(p2p_network.clone());
1245
1246 Ok((
1247 p2p_network,
1248 discovery_handle,
1249 state_sync_handle,
1250 randomness_handle,
1251 ))
1252 }
1253
1254 async fn construct_validator_components(
1257 config: NodeConfig,
1258 state: Arc<AuthorityState>,
1259 committee: Arc<Committee>,
1260 epoch_store: Arc<AuthorityPerEpochStore>,
1261 checkpoint_store: Arc<CheckpointStore>,
1262 state_sync_handle: state_sync::Handle,
1263 randomness_handle: randomness::Handle,
1264 accumulator: Weak<StateAccumulator>,
1265 backpressure_manager: Arc<BackpressureManager>,
1266 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1267 registry_service: &RegistryService,
1268 iota_node_metrics: Arc<IotaNodeMetrics>,
1269 ) -> Result<ValidatorComponents> {
1270 let mut config_clone = config.clone();
1271 let consensus_config = config_clone
1272 .consensus_config
1273 .as_mut()
1274 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1275 let validator_registry = Registry::new();
1276 let validator_registry_id = registry_service.add(validator_registry.clone());
1277
1278 let client = Arc::new(UpdatableConsensusClient::new());
1279 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1280 &committee,
1281 consensus_config,
1282 state.name,
1283 connection_monitor_status.clone(),
1284 &validator_registry,
1285 client.clone(),
1286 checkpoint_store.clone(),
1287 ));
1288 let consensus_manager = ConsensusManager::new(
1289 &config,
1290 consensus_config,
1291 registry_service,
1292 &validator_registry,
1293 client,
1294 );
1295
1296 let consensus_store_pruner = ConsensusStorePruner::new(
1299 consensus_manager.get_storage_base_path(),
1300 consensus_config.db_retention_epochs(),
1301 consensus_config.db_pruner_period(),
1302 &validator_registry,
1303 );
1304
1305 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1306 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1307
1308 let (validator_server_spawn_handle, validator_server_handle) =
1309 Self::start_grpc_validator_service(
1310 &config,
1311 state.clone(),
1312 consensus_adapter.clone(),
1313 &validator_registry,
1314 )
1315 .await?;
1316
1317 let validator_overload_monitor_handle = if config
1320 .authority_overload_config
1321 .max_load_shedding_percentage
1322 > 0
1323 {
1324 let authority_state = Arc::downgrade(&state);
1325 let overload_config = config.authority_overload_config.clone();
1326 fail_point!("starting_overload_monitor");
1327 Some(spawn_monitored_task!(overload_monitor(
1328 authority_state,
1329 overload_config,
1330 )))
1331 } else {
1332 None
1333 };
1334
1335 Self::start_epoch_specific_validator_components(
1336 &config,
1337 state.clone(),
1338 consensus_adapter,
1339 checkpoint_store,
1340 epoch_store,
1341 state_sync_handle,
1342 randomness_handle,
1343 consensus_manager,
1344 consensus_store_pruner,
1345 accumulator,
1346 backpressure_manager,
1347 validator_server_spawn_handle,
1348 validator_server_handle,
1349 validator_overload_monitor_handle,
1350 checkpoint_metrics,
1351 iota_node_metrics,
1352 iota_tx_validator_metrics,
1353 validator_registry_id,
1354 )
1355 .await
1356 }
1357
1358 async fn start_epoch_specific_validator_components(
1361 config: &NodeConfig,
1362 state: Arc<AuthorityState>,
1363 consensus_adapter: Arc<ConsensusAdapter>,
1364 checkpoint_store: Arc<CheckpointStore>,
1365 epoch_store: Arc<AuthorityPerEpochStore>,
1366 state_sync_handle: state_sync::Handle,
1367 randomness_handle: randomness::Handle,
1368 consensus_manager: ConsensusManager,
1369 consensus_store_pruner: ConsensusStorePruner,
1370 accumulator: Weak<StateAccumulator>,
1371 backpressure_manager: Arc<BackpressureManager>,
1372 validator_server_spawn_handle: SpawnOnce,
1373 validator_server_handle: iota_http::ServerHandle,
1374 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1375 checkpoint_metrics: Arc<CheckpointMetrics>,
1376 iota_node_metrics: Arc<IotaNodeMetrics>,
1377 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1378 validator_registry_id: RegistryID,
1379 ) -> Result<ValidatorComponents> {
1380 let checkpoint_service = Self::build_checkpoint_service(
1381 config,
1382 consensus_adapter.clone(),
1383 checkpoint_store.clone(),
1384 epoch_store.clone(),
1385 state.clone(),
1386 state_sync_handle,
1387 accumulator,
1388 checkpoint_metrics.clone(),
1389 );
1390
1391 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1396
1397 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1398
1399 let randomness_manager = RandomnessManager::try_new(
1400 Arc::downgrade(&epoch_store),
1401 Box::new(consensus_adapter.clone()),
1402 randomness_handle,
1403 config.authority_key_pair(),
1404 )
1405 .await;
1406 if let Some(randomness_manager) = randomness_manager {
1407 epoch_store
1408 .set_randomness_manager(randomness_manager)
1409 .await?;
1410 }
1411
1412 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1413 state.clone(),
1414 checkpoint_service.clone(),
1415 epoch_store.clone(),
1416 low_scoring_authorities,
1417 backpressure_manager,
1418 );
1419
1420 info!("Starting consensus manager");
1421
1422 consensus_manager
1423 .start(
1424 config,
1425 epoch_store.clone(),
1426 consensus_handler_initializer,
1427 IotaTxValidator::new(
1428 epoch_store.clone(),
1429 checkpoint_service.clone(),
1430 state.transaction_manager().clone(),
1431 iota_tx_validator_metrics.clone(),
1432 ),
1433 )
1434 .await;
1435
1436 if !epoch_store
1437 .epoch_start_config()
1438 .is_data_quarantine_active_from_beginning_of_epoch()
1439 {
1440 checkpoint_store
1441 .reexecute_local_checkpoints(&state, &epoch_store)
1442 .await;
1443 }
1444
1445 info!("Spawning checkpoint service");
1446 let checkpoint_service_tasks = checkpoint_service.spawn().await;
1447
1448 if epoch_store.authenticator_state_enabled() {
1449 Self::start_jwk_updater(
1450 config,
1451 iota_node_metrics,
1452 state.name,
1453 epoch_store.clone(),
1454 consensus_adapter.clone(),
1455 );
1456 }
1457
1458 Ok(ValidatorComponents {
1459 validator_server_spawn_handle,
1460 validator_server_handle,
1461 validator_overload_monitor_handle,
1462 consensus_manager,
1463 consensus_store_pruner,
1464 consensus_adapter,
1465 checkpoint_service_tasks,
1466 checkpoint_metrics,
1467 iota_tx_validator_metrics,
1468 validator_registry_id,
1469 })
1470 }
1471
1472 fn build_checkpoint_service(
1479 config: &NodeConfig,
1480 consensus_adapter: Arc<ConsensusAdapter>,
1481 checkpoint_store: Arc<CheckpointStore>,
1482 epoch_store: Arc<AuthorityPerEpochStore>,
1483 state: Arc<AuthorityState>,
1484 state_sync_handle: state_sync::Handle,
1485 accumulator: Weak<StateAccumulator>,
1486 checkpoint_metrics: Arc<CheckpointMetrics>,
1487 ) -> Arc<CheckpointService> {
1488 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1489 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1490
1491 debug!(
1492 "Starting checkpoint service with epoch start timestamp {}
1493 and epoch duration {}",
1494 epoch_start_timestamp_ms, epoch_duration_ms
1495 );
1496
1497 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1498 sender: consensus_adapter,
1499 signer: state.secret.clone(),
1500 authority: config.authority_public_key(),
1501 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1502 .checked_add(epoch_duration_ms)
1503 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1504 metrics: checkpoint_metrics.clone(),
1505 });
1506
1507 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1508 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1509 let max_checkpoint_size_bytes =
1510 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1511
1512 CheckpointService::build(
1513 state.clone(),
1514 checkpoint_store,
1515 epoch_store,
1516 state.get_transaction_cache_reader().clone(),
1517 accumulator,
1518 checkpoint_output,
1519 Box::new(certified_checkpoint_output),
1520 checkpoint_metrics,
1521 max_tx_per_checkpoint,
1522 max_checkpoint_size_bytes,
1523 )
1524 }
1525
1526 fn construct_consensus_adapter(
1527 committee: &Committee,
1528 consensus_config: &ConsensusConfig,
1529 authority: AuthorityName,
1530 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1531 prometheus_registry: &Registry,
1532 consensus_client: Arc<dyn ConsensusClient>,
1533 checkpoint_store: Arc<CheckpointStore>,
1534 ) -> ConsensusAdapter {
1535 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1536 ConsensusAdapter::new(
1540 consensus_client,
1541 checkpoint_store,
1542 authority,
1543 connection_monitor_status,
1544 consensus_config.max_pending_transactions(),
1545 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1546 consensus_config.max_submit_position,
1547 consensus_config.submit_delay_step_override(),
1548 ca_metrics,
1549 )
1550 }
1551
1552 async fn start_grpc_validator_service(
1553 config: &NodeConfig,
1554 state: Arc<AuthorityState>,
1555 consensus_adapter: Arc<ConsensusAdapter>,
1556 prometheus_registry: &Registry,
1557 ) -> Result<(SpawnOnce, iota_http::ServerHandle)> {
1558 let validator_service = ValidatorService::new(
1559 state.clone(),
1560 consensus_adapter,
1561 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1562 TrafficControllerMetrics::new(prometheus_registry),
1563 config.policy_config.clone(),
1564 config.firewall_config.clone(),
1565 );
1566
1567 let mut server_conf = iota_network_stack::config::Config::new();
1568 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1569 server_conf.load_shed = config.grpc_load_shed;
1570 let mut server_builder =
1571 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1572
1573 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1574
1575 let tls_config = iota_tls::create_rustls_server_config(
1576 config.network_key_pair().copy().private(),
1577 IOTA_TLS_SERVER_NAME.to_string(),
1578 );
1579
1580 let server = server_builder
1581 .bind(config.network_address(), Some(tls_config))
1582 .await
1583 .map_err(|err| anyhow!(err.to_string()))?;
1584
1585 let local_addr = server.local_addr();
1586 info!("Listening to traffic on {local_addr}");
1587
1588 let server_handle = server.handle().clone();
1590
1591 Ok((
1592 SpawnOnce::new(server.serve().map_err(Into::into)),
1593 server_handle,
1594 ))
1595 }
1596
1597 async fn reexecute_pending_consensus_certs(
1615 epoch_store: &Arc<AuthorityPerEpochStore>,
1616 state: &Arc<AuthorityState>,
1617 ) {
1618 let mut pending_consensus_certificates = Vec::new();
1619 let mut additional_certs = Vec::new();
1620
1621 for tx in epoch_store.get_all_pending_consensus_transactions() {
1622 match tx.kind {
1623 ConsensusTransactionKind::CertifiedTransaction(tx)
1626 if !tx.contains_shared_object() =>
1627 {
1628 let tx = *tx;
1629 let tx = VerifiedExecutableTransaction::new_from_certificate(
1632 VerifiedCertificate::new_unchecked(tx),
1633 );
1634 if let Some(fx_digest) = epoch_store
1637 .get_signed_effects_digest(tx.digest())
1638 .expect("db error")
1639 {
1640 pending_consensus_certificates.push((tx, fx_digest));
1641 } else {
1642 additional_certs.push(tx);
1643 }
1644 }
1645 _ => (),
1646 }
1647 }
1648
1649 let digests = pending_consensus_certificates
1650 .iter()
1651 .map(|(tx, _)| *tx.digest())
1652 .collect::<Vec<_>>();
1653
1654 info!(
1655 "reexecuting {} pending consensus certificates: {:?}",
1656 digests.len(),
1657 digests
1658 );
1659
1660 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1661 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1662
1663 let timeout = if cfg!(msim) { 120 } else { 60 };
1669 if tokio::time::timeout(
1670 std::time::Duration::from_secs(timeout),
1671 state
1672 .get_transaction_cache_reader()
1673 .try_notify_read_executed_effects_digests(&digests),
1674 )
1675 .await
1676 .is_err()
1677 {
1678 if let Ok(executed_effects_digests) = state
1680 .get_transaction_cache_reader()
1681 .try_multi_get_executed_effects_digests(&digests)
1682 {
1683 let pending_digests = digests
1684 .iter()
1685 .zip(executed_effects_digests.iter())
1686 .filter_map(|(digest, executed_effects_digest)| {
1687 if executed_effects_digest.is_none() {
1688 Some(digest)
1689 } else {
1690 None
1691 }
1692 })
1693 .collect::<Vec<_>>();
1694 debug_fatal!(
1695 "Timed out waiting for effects digests to be executed: {:?}",
1696 pending_digests
1697 );
1698 } else {
1699 debug_fatal!(
1700 "Timed out waiting for effects digests to be executed, digests not found"
1701 );
1702 }
1703 }
1704 }
1705
1706 pub fn state(&self) -> Arc<AuthorityState> {
1707 self.state.clone()
1708 }
1709
1710 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1712 self.state.reference_gas_price_for_testing()
1713 }
1714
1715 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1716 self.state.committee_store().clone()
1717 }
1718
1719 pub fn clone_authority_aggregator(
1729 &self,
1730 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1731 self.transaction_orchestrator
1732 .as_ref()
1733 .map(|to| to.clone_authority_aggregator())
1734 }
1735
1736 pub fn transaction_orchestrator(
1737 &self,
1738 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1739 self.transaction_orchestrator.clone()
1740 }
1741
1742 pub fn subscribe_to_transaction_orchestrator_effects(
1743 &self,
1744 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1745 self.transaction_orchestrator
1746 .as_ref()
1747 .map(|to| to.subscribe_to_effects_queue())
1748 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1749 }
1750
1751 pub async fn monitor_reconfiguration(
1757 self: Arc<Self>,
1758 mut epoch_store: Arc<AuthorityPerEpochStore>,
1759 ) -> Result<()> {
1760 let checkpoint_executor_metrics =
1761 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1762
1763 loop {
1764 let mut accumulator_guard = self.accumulator.lock().await;
1765 let accumulator = accumulator_guard.take().unwrap();
1766 info!(
1767 "Creating checkpoint executor for epoch {}",
1768 epoch_store.epoch()
1769 );
1770
1771 let summary_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1773 guard.as_ref().map(|handle| {
1774 let tx = handle.checkpoint_summary_broadcaster().clone();
1775 Box::new(move |summary: &CertifiedCheckpointSummary| {
1776 tx.send_traced(summary);
1777 }) as Box<dyn Fn(&CertifiedCheckpointSummary) + Send + Sync>
1778 })
1779 } else {
1780 None
1781 };
1782 let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1783 guard.as_ref().map(|handle| {
1784 let tx = handle.checkpoint_data_broadcaster().clone();
1785 Box::new(move |data: &CheckpointData| {
1786 tx.send_traced(data);
1787 }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1788 })
1789 } else {
1790 None
1791 };
1792
1793 let checkpoint_executor = CheckpointExecutor::new(
1794 epoch_store.clone(),
1795 self.checkpoint_store.clone(),
1796 self.state.clone(),
1797 accumulator.clone(),
1798 self.backpressure_manager.clone(),
1799 self.config.checkpoint_executor_config.clone(),
1800 checkpoint_executor_metrics.clone(),
1801 summary_sender,
1802 data_sender,
1803 );
1804
1805 let run_with_range = self.config.run_with_range;
1806
1807 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1808
1809 if let Some(components) = &*self.validator_components.lock().await {
1811 tokio::time::sleep(Duration::from_millis(1)).await;
1813
1814 let config = cur_epoch_store.protocol_config();
1815 let binary_config = to_binary_config(config);
1816 let transaction = ConsensusTransaction::new_capability_notification_v1(
1817 AuthorityCapabilitiesV1::new(
1818 self.state.name,
1819 cur_epoch_store.get_chain_identifier().chain(),
1820 self.config
1821 .supported_protocol_versions
1822 .expect("Supported versions should be populated")
1823 .truncate_below(config.version),
1825 self.state
1826 .get_available_system_packages(&binary_config)
1827 .await,
1828 ),
1829 );
1830 info!(?transaction, "submitting capabilities to consensus");
1831 components
1832 .consensus_adapter
1833 .submit(transaction, None, &cur_epoch_store)?;
1834 } else if self.state.is_active_validator(&cur_epoch_store)
1835 && cur_epoch_store
1836 .protocol_config()
1837 .track_non_committee_eligible_validators()
1838 {
1839 let epoch_store = cur_epoch_store.clone();
1843 let node_clone = self.clone();
1844 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1845 node_clone
1846 .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1847 .await;
1848 }));
1849 }
1850
1851 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1852
1853 if stop_condition == StopReason::RunWithRangeCondition {
1854 IotaNode::shutdown(&self).await;
1855 self.shutdown_channel_tx
1856 .send(run_with_range)
1857 .expect("RunWithRangeCondition met but failed to send shutdown message");
1858 return Ok(());
1859 }
1860
1861 let latest_system_state = self
1863 .state
1864 .get_object_cache_reader()
1865 .try_get_iota_system_state_object_unsafe()
1866 .expect("Read IOTA System State object cannot fail");
1867
1868 #[cfg(msim)]
1869 if !self
1870 .sim_state
1871 .sim_safe_mode_expected
1872 .load(Ordering::Relaxed)
1873 {
1874 debug_assert!(!latest_system_state.safe_mode());
1875 }
1876
1877 #[cfg(not(msim))]
1878 debug_assert!(!latest_system_state.safe_mode());
1879
1880 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1881 if self.state.is_fullnode(&cur_epoch_store) {
1882 warn!(
1883 "Failed to send end of epoch notification to subscriber: {:?}",
1884 err
1885 );
1886 }
1887 }
1888
1889 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1890 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1891
1892 self.auth_agg.store(Arc::new(
1893 self.auth_agg
1894 .load()
1895 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1896 ));
1897
1898 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1899 let next_epoch = next_epoch_committee.epoch();
1900 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1901
1902 info!(
1903 next_epoch,
1904 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1905 );
1906
1907 fail_point_async!("reconfig_delay");
1908
1909 let authority_names_to_peer_ids =
1914 new_epoch_start_state.get_authority_names_to_peer_ids();
1915 self.connection_monitor_status
1916 .update_mapping_for_epoch(authority_names_to_peer_ids);
1917
1918 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1919
1920 send_trusted_peer_change(
1921 &self.config,
1922 &self.trusted_peer_change_tx,
1923 &new_epoch_start_state,
1924 );
1925
1926 let mut validator_components_lock_guard = self.validator_components.lock().await;
1927
1928 let new_epoch_store = self
1932 .reconfigure_state(
1933 &self.state,
1934 &cur_epoch_store,
1935 next_epoch_committee.clone(),
1936 new_epoch_start_state,
1937 accumulator.clone(),
1938 )
1939 .await?;
1940
1941 let new_validator_components = if let Some(ValidatorComponents {
1942 validator_server_spawn_handle,
1943 validator_server_handle,
1944 validator_overload_monitor_handle,
1945 consensus_manager,
1946 consensus_store_pruner,
1947 consensus_adapter,
1948 mut checkpoint_service_tasks,
1949 checkpoint_metrics,
1950 iota_tx_validator_metrics,
1951 validator_registry_id,
1952 }) = validator_components_lock_guard.take()
1953 {
1954 info!("Reconfiguring the validator.");
1955 checkpoint_service_tasks.abort_all();
1960 while let Some(result) = checkpoint_service_tasks.join_next().await {
1961 if let Err(err) = result {
1962 if err.is_panic() {
1963 std::panic::resume_unwind(err.into_panic());
1964 }
1965 warn!("Error in checkpoint service task: {:?}", err);
1966 }
1967 }
1968 info!("Checkpoint service has shut down.");
1969
1970 consensus_manager.shutdown().await;
1971 info!("Consensus has shut down.");
1972
1973 info!("Epoch store finished reconfiguration.");
1974
1975 let accumulator_metrics = Arc::into_inner(accumulator)
1978 .expect("Accumulator should have no other references at this point")
1979 .metrics();
1980 let new_accumulator = Arc::new(StateAccumulator::new(
1981 self.state.get_accumulator_store().clone(),
1982 accumulator_metrics,
1983 ));
1984 let weak_accumulator = Arc::downgrade(&new_accumulator);
1985 *accumulator_guard = Some(new_accumulator);
1986
1987 consensus_store_pruner.prune(next_epoch).await;
1988
1989 if self.state.is_committee_validator(&new_epoch_store) {
1990 Some(
1992 Self::start_epoch_specific_validator_components(
1993 &self.config,
1994 self.state.clone(),
1995 consensus_adapter,
1996 self.checkpoint_store.clone(),
1997 new_epoch_store.clone(),
1998 self.state_sync_handle.clone(),
1999 self.randomness_handle.clone(),
2000 consensus_manager,
2001 consensus_store_pruner,
2002 weak_accumulator,
2003 self.backpressure_manager.clone(),
2004 validator_server_spawn_handle,
2005 validator_server_handle,
2006 validator_overload_monitor_handle,
2007 checkpoint_metrics,
2008 self.metrics.clone(),
2009 iota_tx_validator_metrics,
2010 validator_registry_id,
2011 )
2012 .await?,
2013 )
2014 } else {
2015 info!("This node is no longer a validator after reconfiguration");
2016 if self.registry_service.remove(validator_registry_id) {
2017 debug!("Removed validator metrics registry");
2018 } else {
2019 warn!("Failed to remove validator metrics registry");
2020 }
2021 validator_server_handle.trigger_shutdown();
2022 debug!("Validator grpc server shutdown triggered");
2023
2024 None
2025 }
2026 } else {
2027 let accumulator_metrics = Arc::into_inner(accumulator)
2030 .expect("Accumulator should have no other references at this point")
2031 .metrics();
2032 let new_accumulator = Arc::new(StateAccumulator::new(
2033 self.state.get_accumulator_store().clone(),
2034 accumulator_metrics,
2035 ));
2036 let weak_accumulator = Arc::downgrade(&new_accumulator);
2037 *accumulator_guard = Some(new_accumulator);
2038
2039 if self.state.is_committee_validator(&new_epoch_store) {
2040 info!("Promoting the node from fullnode to validator, starting grpc server");
2041
2042 let mut components = Self::construct_validator_components(
2043 self.config.clone(),
2044 self.state.clone(),
2045 Arc::new(next_epoch_committee.clone()),
2046 new_epoch_store.clone(),
2047 self.checkpoint_store.clone(),
2048 self.state_sync_handle.clone(),
2049 self.randomness_handle.clone(),
2050 weak_accumulator,
2051 self.backpressure_manager.clone(),
2052 self.connection_monitor_status.clone(),
2053 &self.registry_service,
2054 self.metrics.clone(),
2055 )
2056 .await?;
2057
2058 components.validator_server_spawn_handle =
2059 components.validator_server_spawn_handle.start();
2060
2061 Some(components)
2062 } else {
2063 None
2064 }
2065 };
2066 *validator_components_lock_guard = new_validator_components;
2067
2068 cur_epoch_store.release_db_handles();
2071
2072 if cfg!(msim)
2073 && !matches!(
2074 self.config
2075 .authority_store_pruning_config
2076 .num_epochs_to_retain_for_checkpoints(),
2077 None | Some(u64::MAX) | Some(0)
2078 )
2079 {
2080 self.state
2081 .prune_checkpoints_for_eligible_epochs_for_testing(
2082 self.config.clone(),
2083 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2084 )
2085 .await?;
2086 }
2087
2088 epoch_store = new_epoch_store;
2089 info!("Reconfiguration finished");
2090 }
2091 }
2092
2093 async fn shutdown(&self) {
2094 if let Some(validator_components) = &*self.validator_components.lock().await {
2095 validator_components.consensus_manager.shutdown().await;
2096 }
2097
2098 if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2100 info!("Shutting down gRPC server");
2101 if let Err(e) = grpc_handle.shutdown().await {
2102 warn!("Failed to gracefully shutdown gRPC server: {e}");
2103 }
2104 }
2105 }
2106
2107 async fn reconfigure_state(
2110 &self,
2111 state: &Arc<AuthorityState>,
2112 cur_epoch_store: &AuthorityPerEpochStore,
2113 next_epoch_committee: Committee,
2114 next_epoch_start_system_state: EpochStartSystemState,
2115 accumulator: Arc<StateAccumulator>,
2116 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2117 let next_epoch = next_epoch_committee.epoch();
2118
2119 let last_checkpoint = self
2120 .checkpoint_store
2121 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2122 .expect("Error loading last checkpoint for current epoch")
2123 .expect("Could not load last checkpoint for current epoch");
2124 let epoch_supply_change = last_checkpoint
2125 .end_of_epoch_data
2126 .as_ref()
2127 .ok_or_else(|| {
2128 IotaError::from("last checkpoint in epoch should contain end of epoch data")
2129 })?
2130 .epoch_supply_change;
2131
2132 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2133
2134 assert_eq!(
2135 Some(last_checkpoint_seq),
2136 self.checkpoint_store
2137 .get_highest_executed_checkpoint_seq_number()
2138 .expect("Error loading highest executed checkpoint sequence number")
2139 );
2140
2141 let epoch_start_configuration = EpochStartConfiguration::new(
2142 next_epoch_start_system_state,
2143 *last_checkpoint.digest(),
2144 state.get_object_store().as_ref(),
2145 EpochFlag::default_flags_for_new_epoch(&state.config),
2146 )
2147 .expect("EpochStartConfiguration construction cannot fail");
2148
2149 let new_epoch_store = self
2150 .state
2151 .reconfigure(
2152 cur_epoch_store,
2153 self.config.supported_protocol_versions.unwrap(),
2154 next_epoch_committee,
2155 epoch_start_configuration,
2156 accumulator,
2157 &self.config.expensive_safety_check_config,
2158 epoch_supply_change,
2159 last_checkpoint_seq,
2160 )
2161 .await
2162 .expect("Reconfigure authority state cannot fail");
2163 info!(next_epoch, "Node State has been reconfigured");
2164 assert_eq!(next_epoch, new_epoch_store.epoch());
2165 self.state.get_reconfig_api().update_epoch_flags_metrics(
2166 cur_epoch_store.epoch_start_config().flags(),
2167 new_epoch_store.epoch_start_config().flags(),
2168 );
2169
2170 Ok(new_epoch_store)
2171 }
2172
2173 pub fn get_config(&self) -> &NodeConfig {
2174 &self.config
2175 }
2176
2177 async fn execute_transaction_immediately_at_zero_epoch(
2178 state: &Arc<AuthorityState>,
2179 epoch_store: &Arc<AuthorityPerEpochStore>,
2180 tx: &Transaction,
2181 span: tracing::Span,
2182 ) {
2183 let _guard = span.enter();
2184 let transaction =
2185 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2186 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2187 tx.data().clone(),
2188 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2189 ),
2190 );
2191 state
2192 .try_execute_immediately(&transaction, None, epoch_store)
2193 .unwrap();
2194 }
2195
2196 pub fn randomness_handle(&self) -> randomness::Handle {
2197 self.randomness_handle.clone()
2198 }
2199
2200 async fn send_signed_capability_notification_to_committee_with_retry(
2206 &self,
2207 epoch_store: &Arc<AuthorityPerEpochStore>,
2208 ) {
2209 const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2210 const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2211 const MAX_RETRY_INTERVAL_SECS: u64 = 300; let config = epoch_store.protocol_config();
2215 let binary_config = to_binary_config(config);
2216
2217 let capabilities = AuthorityCapabilitiesV1::new(
2219 self.state.name,
2220 epoch_store.get_chain_identifier().chain(),
2221 self.config
2222 .supported_protocol_versions
2223 .expect("Supported versions should be populated")
2224 .truncate_below(config.version),
2225 self.state
2226 .get_available_system_packages(&binary_config)
2227 .await,
2228 );
2229
2230 let signature = AuthoritySignature::new_secure(
2232 &IntentMessage::new(
2233 Intent::iota_app(IntentScope::AuthorityCapabilities),
2234 &capabilities,
2235 ),
2236 &epoch_store.epoch(),
2237 self.config.authority_key_pair(),
2238 );
2239
2240 let request = HandleCapabilityNotificationRequestV1 {
2241 message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2242 };
2243
2244 let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2245
2246 loop {
2247 let auth_agg = self.auth_agg.load();
2248 match auth_agg
2249 .send_capability_notification_to_quorum(request.clone())
2250 .await
2251 {
2252 Ok(_) => {
2253 info!("Successfully sent capability notification to committee");
2254 break;
2255 }
2256 Err(err) => {
2257 match &err {
2258 AggregatorSendCapabilityNotificationError::RetryableNotification {
2259 errors,
2260 } => {
2261 warn!(
2262 "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2263 retry_interval, errors
2264 );
2265 }
2266 AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2267 errors,
2268 } => {
2269 error!(
2270 "Failed to send capability notification to committee (non-retryable error): {:?}",
2271 errors
2272 );
2273 break;
2274 }
2275 };
2276
2277 tokio::time::sleep(retry_interval).await;
2279
2280 retry_interval = std::cmp::min(
2282 retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2283 Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2284 );
2285 }
2286 }
2287 }
2288 }
2289}
2290
2291#[cfg(not(msim))]
2292impl IotaNode {
2293 async fn fetch_jwks(
2294 _authority: AuthorityName,
2295 provider: &OIDCProvider,
2296 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2297 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2298 let client = reqwest::Client::new();
2299 fetch_jwks(provider, &client)
2300 .await
2301 .map_err(|_| IotaError::JWKRetrieval)
2302 }
2303}
2304
2305#[cfg(msim)]
2306impl IotaNode {
2307 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2308 self.sim_state.sim_node.id()
2309 }
2310
2311 pub fn set_safe_mode_expected(&self, new_value: bool) {
2312 info!("Setting safe mode expected to {}", new_value);
2313 self.sim_state
2314 .sim_safe_mode_expected
2315 .store(new_value, Ordering::Relaxed);
2316 }
2317
2318 async fn fetch_jwks(
2319 authority: AuthorityName,
2320 provider: &OIDCProvider,
2321 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2322 get_jwk_injector()(authority, provider)
2323 }
2324}
2325
2326enum SpawnOnce {
2327 Unstarted(Mutex<BoxFuture<'static, Result<()>>>),
2329 #[allow(unused)]
2330 Started(JoinHandle<Result<()>>),
2331}
2332
2333impl SpawnOnce {
2334 pub fn new(future: impl Future<Output = Result<()>> + Send + 'static) -> Self {
2335 Self::Unstarted(Mutex::new(Box::pin(future)))
2336 }
2337
2338 pub fn start(self) -> Self {
2339 match self {
2340 Self::Unstarted(future) => {
2341 let future = future.into_inner();
2342 let handle = tokio::spawn(future);
2343 Self::Started(handle)
2344 }
2345 Self::Started(_) => self,
2346 }
2347 }
2348}
2349
2350fn send_trusted_peer_change(
2353 config: &NodeConfig,
2354 sender: &watch::Sender<TrustedPeerChangeEvent>,
2355 new_epoch_start_state: &EpochStartSystemState,
2356) {
2357 let new_committee =
2358 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2359
2360 sender.send_modify(|event| {
2361 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2362 event.new_committee = new_committee;
2363 })
2364}
2365
2366fn build_kv_store(
2367 state: &Arc<AuthorityState>,
2368 config: &NodeConfig,
2369 registry: &Registry,
2370) -> Result<Arc<TransactionKeyValueStore>> {
2371 let metrics = KeyValueStoreMetrics::new(registry);
2372 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2373
2374 let base_url = &config.transaction_kv_store_read_config.base_url;
2375
2376 if base_url.is_empty() {
2377 info!("no http kv store url provided, using local db only");
2378 return Ok(Arc::new(db_store));
2379 }
2380
2381 base_url.parse::<url::Url>().tap_err(|e| {
2382 error!(
2383 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2384 base_url, e
2385 )
2386 })?;
2387
2388 let http_store = HttpKVStore::new_kv(
2389 base_url,
2390 config.transaction_kv_store_read_config.cache_size,
2391 metrics.clone(),
2392 )?;
2393 info!("using local key-value store with fallback to http key-value store");
2394 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2395 db_store,
2396 http_store,
2397 metrics,
2398 "json_rpc_fallback",
2399 )))
2400}
2401
2402async fn build_grpc_server(
2417 config: &NodeConfig,
2418 state: Arc<AuthorityState>,
2419 state_sync_store: RocksDbStore,
2420) -> Result<Option<GrpcServerHandle>> {
2421 if config.consensus_config().is_some() || !config.enable_grpc_api {
2423 return Ok(None);
2424 }
2425
2426 let Some(grpc_config) = &config.grpc_api_config else {
2427 return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2428 };
2429
2430 let rest_read_store = Arc::new(RestReadStore::new(state.clone(), state_sync_store));
2431
2432 let shutdown_token = CancellationToken::new();
2434
2435 let grpc_reader = Arc::new(GrpcReader::from_rest_state_reader(rest_read_store));
2437
2438 let event_subscriber =
2440 state.subscription_handler.clone() as Arc<dyn iota_grpc_api::EventSubscriber>;
2441
2442 let handle = start_grpc_server(
2445 grpc_reader,
2446 event_subscriber,
2447 grpc_config.clone(),
2448 shutdown_token,
2449 )
2450 .await?;
2451
2452 Ok(Some(handle))
2453}
2454
2455pub async fn build_http_server(
2472 state: Arc<AuthorityState>,
2473 store: RocksDbStore,
2474 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2475 config: &NodeConfig,
2476 prometheus_registry: &Registry,
2477 _custom_runtime: Option<Handle>,
2478 software_version: &'static str,
2479) -> Result<Option<iota_http::ServerHandle>> {
2480 if config.consensus_config().is_some() {
2482 return Ok(None);
2483 }
2484
2485 let mut router = axum::Router::new();
2486
2487 let json_rpc_router = {
2488 let mut server = JsonRpcServerBuilder::new(
2489 env!("CARGO_PKG_VERSION"),
2490 prometheus_registry,
2491 config.policy_config.clone(),
2492 config.firewall_config.clone(),
2493 );
2494
2495 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2496
2497 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2498 server.register_module(ReadApi::new(
2499 state.clone(),
2500 kv_store.clone(),
2501 metrics.clone(),
2502 ))?;
2503 server.register_module(CoinReadApi::new(
2504 state.clone(),
2505 kv_store.clone(),
2506 metrics.clone(),
2507 )?)?;
2508
2509 if config.run_with_range.is_none() {
2512 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2513 }
2514 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2515
2516 if let Some(transaction_orchestrator) = transaction_orchestrator {
2517 server.register_module(TransactionExecutionApi::new(
2518 state.clone(),
2519 transaction_orchestrator.clone(),
2520 metrics.clone(),
2521 ))?;
2522 }
2523
2524 let iota_names_config = config.iota_names_config.clone().unwrap_or_default();
2527
2528 server.register_module(IndexerApi::new(
2529 state.clone(),
2530 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2531 kv_store,
2532 metrics,
2533 iota_names_config,
2534 config.indexer_max_subscriptions,
2535 ))?;
2536 server.register_module(MoveUtils::new(state.clone()))?;
2537
2538 let server_type = config.jsonrpc_server_type();
2539
2540 server.to_router(server_type).await?
2541 };
2542
2543 router = router.merge(json_rpc_router);
2544
2545 if config.enable_rest_api {
2546 let mut rest_service = iota_rest_api::RestService::new(
2547 Arc::new(RestReadStore::new(state.clone(), store)),
2548 software_version,
2549 );
2550
2551 if let Some(config) = config.rest.clone() {
2552 rest_service.with_config(config);
2553 }
2554
2555 rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2556
2557 if let Some(transaction_orchestrator) = transaction_orchestrator {
2558 rest_service.with_executor(transaction_orchestrator.clone())
2559 }
2560
2561 router = router.merge(rest_service.into_router());
2562 }
2563
2564 router = router
2567 .route("/health", axum::routing::get(health_check_handler))
2568 .route_layer(axum::Extension(state));
2569
2570 let layers = ServiceBuilder::new()
2571 .map_request(|mut request: axum::http::Request<_>| {
2572 if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2573 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2574 request.extensions_mut().insert(axum_connect_info);
2575 }
2576 request
2577 })
2578 .layer(axum::middleware::from_fn(server_timing_middleware));
2579
2580 router = router.layer(layers);
2581
2582 let handle = iota_http::Builder::new()
2583 .serve(&config.json_rpc_address, router)
2584 .map_err(|e| anyhow::anyhow!("{e}"))?;
2585 info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2586
2587 Ok(Some(handle))
2588}
2589
2590#[derive(Debug, serde::Serialize, serde::Deserialize)]
2591pub struct Threshold {
2592 pub threshold_seconds: Option<u32>,
2593}
2594
2595async fn health_check_handler(
2596 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2597 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2598) -> impl axum::response::IntoResponse {
2599 if let Some(threshold_seconds) = threshold_seconds {
2600 let summary = match state
2602 .get_checkpoint_store()
2603 .get_highest_executed_checkpoint()
2604 {
2605 Ok(Some(summary)) => summary,
2606 Ok(None) => {
2607 warn!("Highest executed checkpoint not found");
2608 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2609 }
2610 Err(err) => {
2611 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2612 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2613 }
2614 };
2615
2616 let latest_chain_time = summary.timestamp();
2618 let threshold =
2619 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2620
2621 if latest_chain_time < threshold {
2623 warn!(
2624 ?latest_chain_time,
2625 ?threshold,
2626 "failing health check due to checkpoint lag"
2627 );
2628 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2629 }
2630 }
2631 (axum::http::StatusCode::OK, "up")
2633}
2634
2635#[cfg(not(test))]
2636fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2637 protocol_config.max_transactions_per_checkpoint() as usize
2638}
2639
2640#[cfg(test)]
2641fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2642 2
2643}