1#[cfg(msim)]
6use std::sync::atomic::Ordering;
7use std::{
8 collections::{BTreeSet, HashMap, HashSet},
9 fmt,
10 future::Future,
11 path::PathBuf,
12 str::FromStr,
13 sync::{Arc, Weak},
14 time::Duration,
15};
16
17use anemo::Network;
18use anemo_tower::{
19 callback::CallbackLayer,
20 trace::{DefaultMakeSpan, DefaultOnFailure, TraceLayer},
21};
22use anyhow::{Result, anyhow};
23use arc_swap::ArcSwap;
24use fastcrypto_zkp::bn254::zk_login::{JWK, JwkId, OIDCProvider};
25use futures::{TryFutureExt, future::BoxFuture};
26pub use handle::IotaNodeHandle;
27use iota_archival::{reader::ArchiveReaderBalancer, writer::ArchiveWriter};
28use iota_common::debug_fatal;
29use iota_config::{
30 ConsensusConfig, NodeConfig,
31 node::{DBCheckpointConfig, RunWithRange},
32 node_config_metrics::NodeConfigMetrics,
33 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
34};
35use iota_core::{
36 authority::{
37 AuthorityState, AuthorityStore, RandomnessRoundReceiver,
38 authority_per_epoch_store::AuthorityPerEpochStore,
39 authority_store_pruner::ObjectsCompactionFilter,
40 authority_store_tables::{
41 AuthorityPerpetualTables, AuthorityPerpetualTablesOptions, AuthorityPrunerTables,
42 },
43 backpressure::BackpressureManager,
44 epoch_start_configuration::{EpochFlag, EpochStartConfigTrait, EpochStartConfiguration},
45 },
46 authority_aggregator::{
47 AggregatorSendCapabilityNotificationError, AuthAggMetrics, AuthorityAggregator,
48 },
49 authority_client::NetworkAuthorityClient,
50 authority_server::{ValidatorService, ValidatorServiceMetrics},
51 checkpoints::{
52 CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
53 SubmitCheckpointToConsensus,
54 checkpoint_executor::{CheckpointExecutor, StopReason, metrics::CheckpointExecutorMetrics},
55 },
56 connection_monitor::ConnectionMonitor,
57 consensus_adapter::{
58 CheckConnection, ConnectionMonitorStatus, ConsensusAdapter, ConsensusAdapterMetrics,
59 ConsensusClient,
60 },
61 consensus_handler::ConsensusHandlerInitializer,
62 consensus_manager::{ConsensusManager, ConsensusManagerTrait, UpdatableConsensusClient},
63 consensus_validator::{IotaTxValidator, IotaTxValidatorMetrics},
64 db_checkpoint_handler::DBCheckpointHandler,
65 epoch::{
66 committee_store::CommitteeStore, consensus_store_pruner::ConsensusStorePruner,
67 epoch_metrics::EpochMetrics, randomness::RandomnessManager,
68 reconfiguration::ReconfigurationInitiator,
69 },
70 execution_cache::build_execution_cache,
71 jsonrpc_index::IndexStore,
72 module_cache_metrics::ResolverMetrics,
73 overload_monitor::overload_monitor,
74 rest_index::RestIndexStore,
75 safe_client::SafeClientMetricsBase,
76 signature_verifier::SignatureVerifierMetrics,
77 state_accumulator::{StateAccumulator, StateAccumulatorMetrics},
78 storage::{RestReadStore, RocksDbStore},
79 traffic_controller::metrics::TrafficControllerMetrics,
80 transaction_orchestrator::TransactionOrchestrator,
81 validator_tx_finalizer::ValidatorTxFinalizer,
82};
83use iota_grpc_server::{GrpcReader, GrpcServerHandle, start_grpc_server};
84use iota_json_rpc::{
85 JsonRpcServerBuilder, coin_api::CoinReadApi, governance_api::GovernanceReadApi,
86 indexer_api::IndexerApi, move_utils::MoveUtils, read_api::ReadApi,
87 transaction_builder_api::TransactionBuilderApi,
88 transaction_execution_api::TransactionExecutionApi,
89};
90use iota_json_rpc_api::JsonRpcMetrics;
91use iota_macros::{fail_point, fail_point_async, replay_log};
92use iota_metrics::{
93 RegistryID, RegistryService,
94 hardware_metrics::register_hardware_metrics,
95 metrics_network::{MetricsMakeCallbackHandler, NetworkConnectionMetrics, NetworkMetrics},
96 server_timing_middleware, spawn_monitored_task,
97};
98use iota_names::config::IotaNamesConfig;
99use iota_network::{
100 api::ValidatorServer, discovery, discovery::TrustedPeerChangeEvent, randomness, state_sync,
101};
102use iota_network_stack::server::{IOTA_TLS_SERVER_NAME, ServerBuilder};
103use iota_protocol_config::ProtocolConfig;
104use iota_rest_api::RestMetrics;
105use iota_sdk_types::crypto::{Intent, IntentMessage, IntentScope};
106use iota_snapshot::uploader::StateSnapshotUploader;
107use iota_storage::{
108 FileCompression, StorageFormat,
109 http_key_value_store::HttpKVStore,
110 key_value_store::{FallbackTransactionKVStore, TransactionKeyValueStore},
111 key_value_store_metrics::KeyValueStoreMetrics,
112};
113use iota_types::{
114 base_types::{AuthorityName, ConciseableName, EpochId},
115 committee::Committee,
116 crypto::{AuthoritySignature, IotaAuthoritySignature, KeypairTraits, RandomnessRound},
117 digests::ChainIdentifier,
118 error::{IotaError, IotaResult},
119 executable_transaction::VerifiedExecutableTransaction,
120 execution_config_utils::to_binary_config,
121 full_checkpoint_content::CheckpointData,
122 iota_system_state::{
123 IotaSystemState, IotaSystemStateTrait,
124 epoch_start_iota_system_state::{EpochStartSystemState, EpochStartSystemStateTrait},
125 },
126 messages_checkpoint::CertifiedCheckpointSummary,
127 messages_consensus::{
128 AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind,
129 SignedAuthorityCapabilitiesV1, check_total_jwk_size,
130 },
131 messages_grpc::HandleCapabilityNotificationRequestV1,
132 quorum_driver_types::QuorumDriverEffectsQueueResult,
133 supported_protocol_versions::SupportedProtocolVersions,
134 transaction::{Transaction, VerifiedCertificate},
135};
136use prometheus::Registry;
137#[cfg(msim)]
138pub use simulator::set_jwk_injector;
139#[cfg(msim)]
140use simulator::*;
141use tap::tap::TapFallible;
142use tokio::{
143 runtime::Handle,
144 sync::{Mutex, broadcast, mpsc, watch},
145 task::{JoinHandle, JoinSet},
146};
147use tokio_util::sync::CancellationToken;
148use tower::ServiceBuilder;
149use tracing::{Instrument, debug, error, error_span, info, warn};
150use typed_store::{
151 DBMetrics,
152 rocks::{check_and_mark_db_corruption, default_db_options, unmark_db_corruption},
153};
154
155use crate::metrics::{GrpcMetrics, IotaNodeMetrics};
156
157pub mod admin;
158mod handle;
159pub mod metrics;
160
161pub struct ValidatorComponents {
162 validator_server_spawn_handle: SpawnOnce,
163 validator_server_handle: iota_http::ServerHandle,
164 validator_overload_monitor_handle: Option<JoinHandle<()>>,
165 consensus_manager: ConsensusManager,
166 consensus_store_pruner: ConsensusStorePruner,
167 consensus_adapter: Arc<ConsensusAdapter>,
168 checkpoint_service_tasks: JoinSet<()>,
170 checkpoint_metrics: Arc<CheckpointMetrics>,
171 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
172 validator_registry_id: RegistryID,
173}
174
175#[cfg(msim)]
176mod simulator {
177 use std::sync::atomic::AtomicBool;
178
179 use super::*;
180
181 pub(super) struct SimState {
182 pub sim_node: iota_simulator::runtime::NodeHandle,
183 pub sim_safe_mode_expected: AtomicBool,
184 _leak_detector: iota_simulator::NodeLeakDetector,
185 }
186
187 impl Default for SimState {
188 fn default() -> Self {
189 Self {
190 sim_node: iota_simulator::runtime::NodeHandle::current(),
191 sim_safe_mode_expected: AtomicBool::new(false),
192 _leak_detector: iota_simulator::NodeLeakDetector::new(),
193 }
194 }
195 }
196
197 type JwkInjector = dyn Fn(AuthorityName, &OIDCProvider) -> IotaResult<Vec<(JwkId, JWK)>>
198 + Send
199 + Sync
200 + 'static;
201
202 fn default_fetch_jwks(
203 _authority: AuthorityName,
204 _provider: &OIDCProvider,
205 ) -> IotaResult<Vec<(JwkId, JWK)>> {
206 use fastcrypto_zkp::bn254::zk_login::parse_jwks;
207 parse_jwks(
209 iota_types::zk_login_util::DEFAULT_JWK_BYTES,
210 &OIDCProvider::Twitch,
211 )
212 .map_err(|_| IotaError::JWKRetrieval)
213 }
214
215 thread_local! {
216 static JWK_INJECTOR: std::cell::RefCell<Arc<JwkInjector>> = std::cell::RefCell::new(Arc::new(default_fetch_jwks));
217 }
218
219 pub(super) fn get_jwk_injector() -> Arc<JwkInjector> {
220 JWK_INJECTOR.with(|injector| injector.borrow().clone())
221 }
222
223 pub fn set_jwk_injector(injector: Arc<JwkInjector>) {
224 JWK_INJECTOR.with(|cell| *cell.borrow_mut() = injector);
225 }
226}
227
228pub struct IotaNode {
229 config: NodeConfig,
230 validator_components: Mutex<Option<ValidatorComponents>>,
231 _http_server: Option<iota_http::ServerHandle>,
234 state: Arc<AuthorityState>,
235 transaction_orchestrator: Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
236 registry_service: RegistryService,
237 metrics: Arc<IotaNodeMetrics>,
238
239 _discovery: discovery::Handle,
240 state_sync_handle: state_sync::Handle,
241 randomness_handle: randomness::Handle,
242 checkpoint_store: Arc<CheckpointStore>,
243 accumulator: Mutex<Option<Arc<StateAccumulator>>>,
244 connection_monitor_status: Arc<ConnectionMonitorStatus>,
245
246 end_of_epoch_channel: broadcast::Sender<IotaSystemState>,
248
249 trusted_peer_change_tx: watch::Sender<TrustedPeerChangeEvent>,
252
253 backpressure_manager: Arc<BackpressureManager>,
254
255 _db_checkpoint_handle: Option<tokio::sync::broadcast::Sender<()>>,
256
257 #[cfg(msim)]
258 sim_state: SimState,
259
260 _state_archive_handle: Option<broadcast::Sender<()>>,
261
262 _state_snapshot_uploader_handle: Option<broadcast::Sender<()>>,
263 shutdown_channel_tx: broadcast::Sender<Option<RunWithRange>>,
265
266 grpc_server_handle: Mutex<Option<GrpcServerHandle>>,
268
269 auth_agg: Arc<ArcSwap<AuthorityAggregator<NetworkAuthorityClient>>>,
275}
276
277impl fmt::Debug for IotaNode {
278 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
279 f.debug_struct("IotaNode")
280 .field("name", &self.state.name.concise())
281 .finish()
282 }
283}
284
285static MAX_JWK_KEYS_PER_FETCH: usize = 100;
286
287impl IotaNode {
288 pub async fn start(
289 config: NodeConfig,
290 registry_service: RegistryService,
291 custom_rpc_runtime: Option<Handle>,
292 ) -> Result<Arc<IotaNode>> {
293 Self::start_async(config, registry_service, custom_rpc_runtime, "unknown").await
294 }
295
296 fn start_jwk_updater(
301 config: &NodeConfig,
302 metrics: Arc<IotaNodeMetrics>,
303 authority: AuthorityName,
304 epoch_store: Arc<AuthorityPerEpochStore>,
305 consensus_adapter: Arc<ConsensusAdapter>,
306 ) {
307 let epoch = epoch_store.epoch();
308
309 let supported_providers = config
310 .zklogin_oauth_providers
311 .get(&epoch_store.get_chain_identifier().chain())
312 .unwrap_or(&BTreeSet::new())
313 .iter()
314 .map(|s| OIDCProvider::from_str(s).expect("Invalid provider string"))
315 .collect::<Vec<_>>();
316
317 let fetch_interval = Duration::from_secs(config.jwk_fetch_interval_seconds);
318
319 info!(
320 ?fetch_interval,
321 "Starting JWK updater tasks with supported providers: {:?}", supported_providers
322 );
323
324 fn validate_jwk(
325 metrics: &Arc<IotaNodeMetrics>,
326 provider: &OIDCProvider,
327 id: &JwkId,
328 jwk: &JWK,
329 ) -> bool {
330 let Ok(iss_provider) = OIDCProvider::from_iss(&id.iss) else {
331 warn!(
332 "JWK iss {:?} (retrieved from {:?}) is not a valid provider",
333 id.iss, provider
334 );
335 metrics
336 .invalid_jwks
337 .with_label_values(&[&provider.to_string()])
338 .inc();
339 return false;
340 };
341
342 if iss_provider != *provider {
343 warn!(
344 "JWK iss {:?} (retrieved from {:?}) does not match provider {:?}",
345 id.iss, provider, iss_provider
346 );
347 metrics
348 .invalid_jwks
349 .with_label_values(&[&provider.to_string()])
350 .inc();
351 return false;
352 }
353
354 if !check_total_jwk_size(id, jwk) {
355 warn!("JWK {:?} (retrieved from {:?}) is too large", id, provider);
356 metrics
357 .invalid_jwks
358 .with_label_values(&[&provider.to_string()])
359 .inc();
360 return false;
361 }
362
363 true
364 }
365
366 for p in supported_providers.into_iter() {
375 let provider_str = p.to_string();
376 let epoch_store = epoch_store.clone();
377 let consensus_adapter = consensus_adapter.clone();
378 let metrics = metrics.clone();
379 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(
380 async move {
381 let mut seen = HashSet::new();
384 loop {
385 info!("fetching JWK for provider {:?}", p);
386 metrics.jwk_requests.with_label_values(&[&provider_str]).inc();
387 match Self::fetch_jwks(authority, &p).await {
388 Err(e) => {
389 metrics.jwk_request_errors.with_label_values(&[&provider_str]).inc();
390 warn!("Error when fetching JWK for provider {:?} {:?}", p, e);
391 tokio::time::sleep(Duration::from_secs(30)).await;
393 continue;
394 }
395 Ok(mut keys) => {
396 metrics.total_jwks
397 .with_label_values(&[&provider_str])
398 .inc_by(keys.len() as u64);
399
400 keys.retain(|(id, jwk)| {
401 validate_jwk(&metrics, &p, id, jwk) &&
402 !epoch_store.jwk_active_in_current_epoch(id, jwk) &&
403 seen.insert((id.clone(), jwk.clone()))
404 });
405
406 metrics.unique_jwks
407 .with_label_values(&[&provider_str])
408 .inc_by(keys.len() as u64);
409
410 if keys.len() > MAX_JWK_KEYS_PER_FETCH {
413 warn!("Provider {:?} sent too many JWKs, only the first {} will be used", p, MAX_JWK_KEYS_PER_FETCH);
414 keys.truncate(MAX_JWK_KEYS_PER_FETCH);
415 }
416
417 for (id, jwk) in keys.into_iter() {
418 info!("Submitting JWK to consensus: {:?}", id);
419
420 let txn = ConsensusTransaction::new_jwk_fetched(authority, id, jwk);
421 consensus_adapter.submit(txn, None, &epoch_store)
422 .tap_err(|e| warn!("Error when submitting JWKs to consensus {:?}", e))
423 .ok();
424 }
425 }
426 }
427 tokio::time::sleep(fetch_interval).await;
428 }
429 }
430 .instrument(error_span!("jwk_updater_task", epoch)),
431 ));
432 }
433 }
434
435 pub async fn start_async(
436 config: NodeConfig,
437 registry_service: RegistryService,
438 custom_rpc_runtime: Option<Handle>,
439 software_version: &'static str,
440 ) -> Result<Arc<IotaNode>> {
441 NodeConfigMetrics::new(®istry_service.default_registry()).record_metrics(&config);
442 let mut config = config.clone();
443 if config.supported_protocol_versions.is_none() {
444 info!(
445 "populating config.supported_protocol_versions with default {:?}",
446 SupportedProtocolVersions::SYSTEM_DEFAULT
447 );
448 config.supported_protocol_versions = Some(SupportedProtocolVersions::SYSTEM_DEFAULT);
449 }
450
451 let run_with_range = config.run_with_range;
452 let is_validator = config.consensus_config().is_some();
453 let is_full_node = !is_validator;
454 let prometheus_registry = registry_service.default_registry();
455
456 info!(node =? config.authority_public_key(),
457 "Initializing iota-node listening on {}", config.network_address
458 );
459
460 let genesis = config.genesis()?.clone();
461
462 let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
463 info!("IOTA chain identifier: {chain_identifier}");
464
465 let db_corrupted_path = &config.db_path().join("status");
467 if let Err(err) = check_and_mark_db_corruption(db_corrupted_path) {
468 panic!("Failed to check database corruption: {err}");
469 }
470
471 DBMetrics::init(&prometheus_registry);
473
474 iota_metrics::init_metrics(&prometheus_registry);
476 #[cfg(not(msim))]
479 iota_metrics::thread_stall_monitor::start_thread_stall_monitor();
480
481 register_hardware_metrics(®istry_service, &config.db_path)
483 .expect("Failed registering hardware metrics");
484 prometheus_registry
486 .register(iota_metrics::uptime_metric(
487 if is_validator {
488 "validator"
489 } else {
490 "fullnode"
491 },
492 software_version,
493 &chain_identifier.to_string(),
494 ))
495 .expect("Failed registering uptime metric");
496
497 let migration_tx_data = if genesis.contains_migrations() {
500 Some(config.load_migration_tx_data()?)
503 } else {
504 None
505 };
506
507 let secret = Arc::pin(config.authority_key_pair().copy());
508 let genesis_committee = genesis.committee()?;
509 let committee_store = Arc::new(CommitteeStore::new(
510 config.db_path().join("epochs"),
511 &genesis_committee,
512 None,
513 ));
514
515 let mut pruner_db = None;
516 if config
517 .authority_store_pruning_config
518 .enable_compaction_filter
519 {
520 pruner_db = Some(Arc::new(AuthorityPrunerTables::open(
521 &config.db_path().join("store"),
522 )));
523 }
524 let compaction_filter = pruner_db
525 .clone()
526 .map(|db| ObjectsCompactionFilter::new(db, &prometheus_registry));
527
528 let enable_write_stall = config.enable_db_write_stall.unwrap_or(is_validator);
530 let perpetual_tables_options = AuthorityPerpetualTablesOptions {
531 enable_write_stall,
532 compaction_filter,
533 };
534 let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(
535 &config.db_path().join("store"),
536 Some(perpetual_tables_options),
537 ));
538 let is_genesis = perpetual_tables
539 .database_is_empty()
540 .expect("Database read should not fail at init.");
541 let checkpoint_store = CheckpointStore::new(&config.db_path().join("checkpoints"));
542 let backpressure_manager =
543 BackpressureManager::new_from_checkpoint_store(&checkpoint_store);
544
545 let store = AuthorityStore::open(
546 perpetual_tables,
547 &genesis,
548 &config,
549 &prometheus_registry,
550 migration_tx_data.as_ref(),
551 )
552 .await?;
553
554 let cur_epoch = store.get_recovery_epoch_at_restart()?;
555 let committee = committee_store
556 .get_committee(&cur_epoch)?
557 .expect("Committee of the current epoch must exist");
558 let epoch_start_configuration = store
559 .get_epoch_start_configuration()?
560 .expect("EpochStartConfiguration of the current epoch must exist");
561 let cache_metrics = Arc::new(ResolverMetrics::new(&prometheus_registry));
562 let signature_verifier_metrics = SignatureVerifierMetrics::new(&prometheus_registry);
563
564 let cache_traits = build_execution_cache(
565 &config.execution_cache_config,
566 &epoch_start_configuration,
567 &prometheus_registry,
568 &store,
569 backpressure_manager.clone(),
570 );
571
572 let auth_agg = {
573 let safe_client_metrics_base = SafeClientMetricsBase::new(&prometheus_registry);
574 let auth_agg_metrics = Arc::new(AuthAggMetrics::new(&prometheus_registry));
575 Arc::new(ArcSwap::new(Arc::new(
576 AuthorityAggregator::new_from_epoch_start_state(
577 epoch_start_configuration.epoch_start_state(),
578 &committee_store,
579 safe_client_metrics_base,
580 auth_agg_metrics,
581 ),
582 )))
583 };
584
585 let chain_id = ChainIdentifier::from(*genesis.checkpoint().digest());
586 let chain = match config.chain_override_for_testing {
587 Some(chain) => chain,
588 None => ChainIdentifier::from(*genesis.checkpoint().digest()).chain(),
589 };
590
591 let epoch_options = default_db_options().optimize_db_for_write_throughput(4);
592 let epoch_store = AuthorityPerEpochStore::new(
593 config.authority_public_key(),
594 committee.clone(),
595 &config.db_path().join("store"),
596 Some(epoch_options.options),
597 EpochMetrics::new(®istry_service.default_registry()),
598 epoch_start_configuration,
599 cache_traits.backing_package_store.clone(),
600 cache_traits.object_store.clone(),
601 cache_metrics,
602 signature_verifier_metrics,
603 &config.expensive_safety_check_config,
604 (chain_id, chain),
605 checkpoint_store
606 .get_highest_executed_checkpoint_seq_number()
607 .expect("checkpoint store read cannot fail")
608 .unwrap_or(0),
609 );
610
611 info!("created epoch store");
612
613 replay_log!(
614 "Beginning replay run. Epoch: {:?}, Protocol config: {:?}",
615 epoch_store.epoch(),
616 epoch_store.protocol_config()
617 );
618
619 if is_genesis {
621 info!("checking IOTA conservation at genesis");
622 cache_traits
627 .reconfig_api
628 .try_expensive_check_iota_conservation(&epoch_store, None)
629 .expect("IOTA conservation check cannot fail at genesis");
630 }
631
632 let effective_buffer_stake = epoch_store.get_effective_buffer_stake_bps();
633 let default_buffer_stake = epoch_store
634 .protocol_config()
635 .buffer_stake_for_protocol_upgrade_bps();
636 if effective_buffer_stake != default_buffer_stake {
637 warn!(
638 ?effective_buffer_stake,
639 ?default_buffer_stake,
640 "buffer_stake_for_protocol_upgrade_bps is currently overridden"
641 );
642 }
643
644 checkpoint_store.insert_genesis_checkpoint(
645 genesis.checkpoint(),
646 genesis.checkpoint_contents().clone(),
647 &epoch_store,
648 );
649
650 unmark_db_corruption(db_corrupted_path)?;
652
653 info!("creating state sync store");
654 let state_sync_store = RocksDbStore::new(
655 cache_traits.clone(),
656 committee_store.clone(),
657 checkpoint_store.clone(),
658 );
659
660 let index_store = if is_full_node && config.enable_index_processing {
661 info!("creating index store");
662 Some(Arc::new(IndexStore::new(
663 config.db_path().join("indexes"),
664 &prometheus_registry,
665 epoch_store
666 .protocol_config()
667 .max_move_identifier_len_as_option(),
668 config.remove_deprecated_tables,
669 )))
670 } else {
671 None
672 };
673
674 let rest_index = if is_full_node && config.enable_rest_api && config.enable_index_processing
675 {
676 Some(Arc::new(RestIndexStore::new(
677 config.db_path().join("rest_index"),
678 &store,
679 &checkpoint_store,
680 &epoch_store,
681 &cache_traits.backing_package_store,
682 )))
683 } else {
684 None
685 };
686
687 info!("creating archive reader");
688 let archive_readers =
693 ArchiveReaderBalancer::new(config.archive_reader_config(), &prometheus_registry)?;
694 let (trusted_peer_change_tx, trusted_peer_change_rx) = watch::channel(Default::default());
695 let (randomness_tx, randomness_rx) = mpsc::channel(
696 config
697 .p2p_config
698 .randomness
699 .clone()
700 .unwrap_or_default()
701 .mailbox_capacity(),
702 );
703 let (p2p_network, discovery_handle, state_sync_handle, randomness_handle) =
704 Self::create_p2p_network(
705 &config,
706 state_sync_store.clone(),
707 chain_identifier,
708 trusted_peer_change_rx,
709 archive_readers.clone(),
710 randomness_tx,
711 &prometheus_registry,
712 )?;
713
714 send_trusted_peer_change(
717 &config,
718 &trusted_peer_change_tx,
719 epoch_store.epoch_start_state(),
720 );
721
722 info!("start state archival");
723 let state_archive_handle =
725 Self::start_state_archival(&config, &prometheus_registry, state_sync_store.clone())
726 .await?;
727
728 info!("start snapshot upload");
729 let state_snapshot_handle =
731 Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
732
733 info!("start db checkpoint");
735 let (db_checkpoint_config, db_checkpoint_handle) = Self::start_db_checkpoint(
736 &config,
737 &prometheus_registry,
738 state_snapshot_handle.is_some(),
739 )?;
740
741 let mut genesis_objects = genesis.objects().to_vec();
742 if let Some(migration_tx_data) = migration_tx_data.as_ref() {
743 genesis_objects.extend(migration_tx_data.get_objects());
744 }
745
746 let authority_name = config.authority_public_key();
747 let validator_tx_finalizer =
748 config
749 .enable_validator_tx_finalizer
750 .then_some(Arc::new(ValidatorTxFinalizer::new(
751 auth_agg.clone(),
752 authority_name,
753 &prometheus_registry,
754 )));
755
756 info!("create authority state");
757 let state = AuthorityState::new(
758 authority_name,
759 secret,
760 config.supported_protocol_versions.unwrap(),
761 store.clone(),
762 cache_traits.clone(),
763 epoch_store.clone(),
764 committee_store.clone(),
765 index_store.clone(),
766 rest_index,
767 checkpoint_store.clone(),
768 &prometheus_registry,
769 &genesis_objects,
770 &db_checkpoint_config,
771 config.clone(),
772 archive_readers,
773 validator_tx_finalizer,
774 chain_identifier,
775 pruner_db,
776 )
777 .await;
778
779 if epoch_store.epoch() == 0 {
781 let genesis_tx = &genesis.transaction();
782 let span = error_span!("genesis_txn", tx_digest = ?genesis_tx.digest());
783 Self::execute_transaction_immediately_at_zero_epoch(
785 &state,
786 &epoch_store,
787 genesis_tx,
788 span,
789 )
790 .await;
791
792 if let Some(migration_tx_data) = migration_tx_data {
794 for (tx_digest, (tx, _, _)) in migration_tx_data.txs_data() {
795 let span = error_span!("migration_txn", tx_digest = ?tx_digest);
796 Self::execute_transaction_immediately_at_zero_epoch(
797 &state,
798 &epoch_store,
799 tx,
800 span,
801 )
802 .await;
803 }
804 }
805 }
806
807 RandomnessRoundReceiver::spawn(state.clone(), randomness_rx);
810
811 if config
812 .expensive_safety_check_config
813 .enable_secondary_index_checks()
814 {
815 if let Some(indexes) = state.indexes.clone() {
816 iota_core::verify_indexes::verify_indexes(
817 state.get_accumulator_store().as_ref(),
818 indexes,
819 )
820 .expect("secondary indexes are inconsistent");
821 }
822 }
823
824 let (end_of_epoch_channel, end_of_epoch_receiver) =
825 broadcast::channel(config.end_of_epoch_broadcast_channel_capacity);
826
827 let transaction_orchestrator = if is_full_node && run_with_range.is_none() {
828 Some(Arc::new(TransactionOrchestrator::new_with_auth_aggregator(
829 auth_agg.load_full(),
830 state.clone(),
831 end_of_epoch_receiver,
832 &config.db_path(),
833 &prometheus_registry,
834 )))
835 } else {
836 None
837 };
838
839 let http_server = build_http_server(
840 state.clone(),
841 state_sync_store.clone(),
842 &transaction_orchestrator.clone(),
843 &config,
844 &prometheus_registry,
845 custom_rpc_runtime,
846 software_version,
847 )
848 .await?;
849
850 let accumulator = Arc::new(StateAccumulator::new(
851 cache_traits.accumulator_store.clone(),
852 StateAccumulatorMetrics::new(&prometheus_registry),
853 ));
854
855 let authority_names_to_peer_ids = epoch_store
856 .epoch_start_state()
857 .get_authority_names_to_peer_ids();
858
859 let network_connection_metrics =
860 NetworkConnectionMetrics::new("iota", ®istry_service.default_registry());
861
862 let authority_names_to_peer_ids = ArcSwap::from_pointee(authority_names_to_peer_ids);
863
864 let (_connection_monitor_handle, connection_statuses) = ConnectionMonitor::spawn(
865 p2p_network.downgrade(),
866 network_connection_metrics,
867 HashMap::new(),
868 None,
869 );
870
871 let connection_monitor_status = ConnectionMonitorStatus {
872 connection_statuses,
873 authority_names_to_peer_ids,
874 };
875
876 let connection_monitor_status = Arc::new(connection_monitor_status);
877 let iota_node_metrics =
878 Arc::new(IotaNodeMetrics::new(®istry_service.default_registry()));
879
880 let grpc_server_handle =
881 build_grpc_server(&config, state.clone(), state_sync_store.clone()).await?;
882
883 let validator_components = if state.is_committee_validator(&epoch_store) {
884 let (components, _) = futures::join!(
885 Self::construct_validator_components(
886 config.clone(),
887 state.clone(),
888 committee,
889 epoch_store.clone(),
890 checkpoint_store.clone(),
891 state_sync_handle.clone(),
892 randomness_handle.clone(),
893 Arc::downgrade(&accumulator),
894 backpressure_manager.clone(),
895 connection_monitor_status.clone(),
896 ®istry_service,
897 iota_node_metrics.clone(),
898 ),
899 Self::reexecute_pending_consensus_certs(&epoch_store, &state,)
900 );
901 let mut components = components?;
902
903 components.consensus_adapter.submit_recovered(&epoch_store);
904
905 components.validator_server_spawn_handle =
907 components.validator_server_spawn_handle.start();
908
909 Some(components)
910 } else {
911 None
912 };
913
914 let (shutdown_channel, _) = broadcast::channel::<Option<RunWithRange>>(1);
916
917 let node = Self {
918 config,
919 validator_components: Mutex::new(validator_components),
920 _http_server: http_server,
921 state,
922 transaction_orchestrator,
923 registry_service,
924 metrics: iota_node_metrics,
925
926 _discovery: discovery_handle,
927 state_sync_handle,
928 randomness_handle,
929 checkpoint_store,
930 accumulator: Mutex::new(Some(accumulator)),
931 end_of_epoch_channel,
932 connection_monitor_status,
933 trusted_peer_change_tx,
934 backpressure_manager,
935
936 _db_checkpoint_handle: db_checkpoint_handle,
937
938 #[cfg(msim)]
939 sim_state: Default::default(),
940
941 _state_archive_handle: state_archive_handle,
942 _state_snapshot_uploader_handle: state_snapshot_handle,
943 shutdown_channel_tx: shutdown_channel,
944
945 grpc_server_handle: Mutex::new(grpc_server_handle),
946
947 auth_agg,
948 };
949
950 info!("IotaNode started!");
951 let node = Arc::new(node);
952 let node_copy = node.clone();
953 spawn_monitored_task!(async move {
954 let result = Self::monitor_reconfiguration(node_copy, epoch_store).await;
955 if let Err(error) = result {
956 warn!("Reconfiguration finished with error {:?}", error);
957 }
958 });
959
960 Ok(node)
961 }
962
963 pub fn subscribe_to_epoch_change(&self) -> broadcast::Receiver<IotaSystemState> {
964 self.end_of_epoch_channel.subscribe()
965 }
966
967 pub fn subscribe_to_shutdown_channel(&self) -> broadcast::Receiver<Option<RunWithRange>> {
968 self.shutdown_channel_tx.subscribe()
969 }
970
971 pub fn current_epoch_for_testing(&self) -> EpochId {
972 self.state.current_epoch_for_testing()
973 }
974
975 pub fn db_checkpoint_path(&self) -> PathBuf {
976 self.config.db_checkpoint_path()
977 }
978
979 pub async fn close_epoch(&self, epoch_store: &Arc<AuthorityPerEpochStore>) -> IotaResult {
981 info!("close_epoch (current epoch = {})", epoch_store.epoch());
982 self.validator_components
983 .lock()
984 .await
985 .as_ref()
986 .ok_or_else(|| IotaError::from("Node is not a validator"))?
987 .consensus_adapter
988 .close_epoch(epoch_store);
989 Ok(())
990 }
991
992 pub fn clear_override_protocol_upgrade_buffer_stake(&self, epoch: EpochId) -> IotaResult {
993 self.state
994 .clear_override_protocol_upgrade_buffer_stake(epoch)
995 }
996
997 pub fn set_override_protocol_upgrade_buffer_stake(
998 &self,
999 epoch: EpochId,
1000 buffer_stake_bps: u64,
1001 ) -> IotaResult {
1002 self.state
1003 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_stake_bps)
1004 }
1005
1006 pub async fn close_epoch_for_testing(&self) -> IotaResult {
1009 let epoch_store = self.state.epoch_store_for_testing();
1010 self.close_epoch(&epoch_store).await
1011 }
1012
1013 async fn start_state_archival(
1014 config: &NodeConfig,
1015 prometheus_registry: &Registry,
1016 state_sync_store: RocksDbStore,
1017 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1018 if let Some(remote_store_config) = &config.state_archive_write_config.object_store_config {
1019 let local_store_config = ObjectStoreConfig {
1020 object_store: Some(ObjectStoreType::File),
1021 directory: Some(config.archive_path()),
1022 ..Default::default()
1023 };
1024 let archive_writer = ArchiveWriter::new(
1025 local_store_config,
1026 remote_store_config.clone(),
1027 FileCompression::Zstd,
1028 StorageFormat::Blob,
1029 Duration::from_secs(600),
1030 256 * 1024 * 1024,
1031 prometheus_registry,
1032 )
1033 .await?;
1034 Ok(Some(archive_writer.start(state_sync_store).await?))
1035 } else {
1036 Ok(None)
1037 }
1038 }
1039
1040 fn start_state_snapshot(
1043 config: &NodeConfig,
1044 prometheus_registry: &Registry,
1045 checkpoint_store: Arc<CheckpointStore>,
1046 ) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
1047 if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
1048 let snapshot_uploader = StateSnapshotUploader::new(
1049 &config.db_checkpoint_path(),
1050 &config.snapshot_path(),
1051 remote_store_config.clone(),
1052 60,
1053 prometheus_registry,
1054 checkpoint_store,
1055 )?;
1056 Ok(Some(snapshot_uploader.start()))
1057 } else {
1058 Ok(None)
1059 }
1060 }
1061
1062 fn start_db_checkpoint(
1063 config: &NodeConfig,
1064 prometheus_registry: &Registry,
1065 state_snapshot_enabled: bool,
1066 ) -> Result<(
1067 DBCheckpointConfig,
1068 Option<tokio::sync::broadcast::Sender<()>>,
1069 )> {
1070 let checkpoint_path = Some(
1071 config
1072 .db_checkpoint_config
1073 .checkpoint_path
1074 .clone()
1075 .unwrap_or_else(|| config.db_checkpoint_path()),
1076 );
1077 let db_checkpoint_config = if config.db_checkpoint_config.checkpoint_path.is_none() {
1078 DBCheckpointConfig {
1079 checkpoint_path,
1080 perform_db_checkpoints_at_epoch_end: if state_snapshot_enabled {
1081 true
1082 } else {
1083 config
1084 .db_checkpoint_config
1085 .perform_db_checkpoints_at_epoch_end
1086 },
1087 ..config.db_checkpoint_config.clone()
1088 }
1089 } else {
1090 config.db_checkpoint_config.clone()
1091 };
1092
1093 match (
1094 db_checkpoint_config.object_store_config.as_ref(),
1095 state_snapshot_enabled,
1096 ) {
1097 (None, false) => Ok((db_checkpoint_config, None)),
1102 (_, _) => {
1103 let handler = DBCheckpointHandler::new(
1104 &db_checkpoint_config.checkpoint_path.clone().unwrap(),
1105 db_checkpoint_config.object_store_config.as_ref(),
1106 60,
1107 db_checkpoint_config
1108 .prune_and_compact_before_upload
1109 .unwrap_or(true),
1110 config.authority_store_pruning_config.clone(),
1111 prometheus_registry,
1112 state_snapshot_enabled,
1113 )?;
1114 Ok((
1115 db_checkpoint_config,
1116 Some(DBCheckpointHandler::start(handler)),
1117 ))
1118 }
1119 }
1120 }
1121
1122 fn create_p2p_network(
1123 config: &NodeConfig,
1124 state_sync_store: RocksDbStore,
1125 chain_identifier: ChainIdentifier,
1126 trusted_peer_change_rx: watch::Receiver<TrustedPeerChangeEvent>,
1127 archive_readers: ArchiveReaderBalancer,
1128 randomness_tx: mpsc::Sender<(EpochId, RandomnessRound, Vec<u8>)>,
1129 prometheus_registry: &Registry,
1130 ) -> Result<(
1131 Network,
1132 discovery::Handle,
1133 state_sync::Handle,
1134 randomness::Handle,
1135 )> {
1136 let (state_sync, state_sync_server) = state_sync::Builder::new()
1137 .config(config.p2p_config.state_sync.clone().unwrap_or_default())
1138 .store(state_sync_store)
1139 .archive_readers(archive_readers)
1140 .with_metrics(prometheus_registry)
1141 .build();
1142
1143 let (discovery, discovery_server) = discovery::Builder::new(trusted_peer_change_rx)
1144 .config(config.p2p_config.clone())
1145 .build();
1146
1147 let (randomness, randomness_router) =
1148 randomness::Builder::new(config.authority_public_key(), randomness_tx)
1149 .config(config.p2p_config.randomness.clone().unwrap_or_default())
1150 .with_metrics(prometheus_registry)
1151 .build();
1152
1153 let p2p_network = {
1154 let routes = anemo::Router::new()
1155 .add_rpc_service(discovery_server)
1156 .add_rpc_service(state_sync_server);
1157 let routes = routes.merge(randomness_router);
1158
1159 let inbound_network_metrics =
1160 NetworkMetrics::new("iota", "inbound", prometheus_registry);
1161 let outbound_network_metrics =
1162 NetworkMetrics::new("iota", "outbound", prometheus_registry);
1163
1164 let service = ServiceBuilder::new()
1165 .layer(
1166 TraceLayer::new_for_server_errors()
1167 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1168 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1169 )
1170 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1171 Arc::new(inbound_network_metrics),
1172 config.p2p_config.excessive_message_size(),
1173 )))
1174 .service(routes);
1175
1176 let outbound_layer = ServiceBuilder::new()
1177 .layer(
1178 TraceLayer::new_for_client_and_server_errors()
1179 .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO))
1180 .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)),
1181 )
1182 .layer(CallbackLayer::new(MetricsMakeCallbackHandler::new(
1183 Arc::new(outbound_network_metrics),
1184 config.p2p_config.excessive_message_size(),
1185 )))
1186 .into_inner();
1187
1188 let mut anemo_config = config.p2p_config.anemo_config.clone().unwrap_or_default();
1189 anemo_config.max_frame_size = Some(1 << 30);
1192
1193 let mut quic_config = anemo_config.quic.unwrap_or_default();
1196 if quic_config.socket_send_buffer_size.is_none() {
1197 quic_config.socket_send_buffer_size = Some(20 << 20);
1198 }
1199 if quic_config.socket_receive_buffer_size.is_none() {
1200 quic_config.socket_receive_buffer_size = Some(20 << 20);
1201 }
1202 quic_config.allow_failed_socket_buffer_size_setting = true;
1203
1204 if quic_config.max_concurrent_bidi_streams.is_none() {
1207 quic_config.max_concurrent_bidi_streams = Some(500);
1208 }
1209 if quic_config.max_concurrent_uni_streams.is_none() {
1210 quic_config.max_concurrent_uni_streams = Some(500);
1211 }
1212 if quic_config.stream_receive_window.is_none() {
1213 quic_config.stream_receive_window = Some(100 << 20);
1214 }
1215 if quic_config.receive_window.is_none() {
1216 quic_config.receive_window = Some(200 << 20);
1217 }
1218 if quic_config.send_window.is_none() {
1219 quic_config.send_window = Some(200 << 20);
1220 }
1221 if quic_config.crypto_buffer_size.is_none() {
1222 quic_config.crypto_buffer_size = Some(1 << 20);
1223 }
1224 if quic_config.max_idle_timeout_ms.is_none() {
1225 quic_config.max_idle_timeout_ms = Some(30_000);
1226 }
1227 if quic_config.keep_alive_interval_ms.is_none() {
1228 quic_config.keep_alive_interval_ms = Some(5_000);
1229 }
1230 anemo_config.quic = Some(quic_config);
1231
1232 let server_name = format!("iota-{chain_identifier}");
1233 let network = Network::bind(config.p2p_config.listen_address)
1234 .server_name(&server_name)
1235 .private_key(config.network_key_pair().copy().private().0.to_bytes())
1236 .config(anemo_config)
1237 .outbound_request_layer(outbound_layer)
1238 .start(service)?;
1239 info!(
1240 server_name = server_name,
1241 "P2p network started on {}",
1242 network.local_addr()
1243 );
1244
1245 network
1246 };
1247
1248 let discovery_handle =
1249 discovery.start(p2p_network.clone(), config.network_key_pair().copy());
1250 let state_sync_handle = state_sync.start(p2p_network.clone());
1251 let randomness_handle = randomness.start(p2p_network.clone());
1252
1253 Ok((
1254 p2p_network,
1255 discovery_handle,
1256 state_sync_handle,
1257 randomness_handle,
1258 ))
1259 }
1260
1261 async fn construct_validator_components(
1264 config: NodeConfig,
1265 state: Arc<AuthorityState>,
1266 committee: Arc<Committee>,
1267 epoch_store: Arc<AuthorityPerEpochStore>,
1268 checkpoint_store: Arc<CheckpointStore>,
1269 state_sync_handle: state_sync::Handle,
1270 randomness_handle: randomness::Handle,
1271 accumulator: Weak<StateAccumulator>,
1272 backpressure_manager: Arc<BackpressureManager>,
1273 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1274 registry_service: &RegistryService,
1275 iota_node_metrics: Arc<IotaNodeMetrics>,
1276 ) -> Result<ValidatorComponents> {
1277 let mut config_clone = config.clone();
1278 let consensus_config = config_clone
1279 .consensus_config
1280 .as_mut()
1281 .ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
1282 let validator_registry = Registry::new();
1283 let validator_registry_id = registry_service.add(validator_registry.clone());
1284
1285 let client = Arc::new(UpdatableConsensusClient::new());
1286 let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
1287 &committee,
1288 consensus_config,
1289 state.name,
1290 connection_monitor_status.clone(),
1291 &validator_registry,
1292 client.clone(),
1293 checkpoint_store.clone(),
1294 ));
1295 let consensus_manager = ConsensusManager::new(
1296 &config,
1297 consensus_config,
1298 registry_service,
1299 &validator_registry,
1300 client,
1301 );
1302
1303 let consensus_store_pruner = ConsensusStorePruner::new(
1306 consensus_manager.get_storage_base_path(),
1307 consensus_config.db_retention_epochs(),
1308 consensus_config.db_pruner_period(),
1309 &validator_registry,
1310 );
1311
1312 let checkpoint_metrics = CheckpointMetrics::new(&validator_registry);
1313 let iota_tx_validator_metrics = IotaTxValidatorMetrics::new(&validator_registry);
1314
1315 let (validator_server_spawn_handle, validator_server_handle) =
1316 Self::start_grpc_validator_service(
1317 &config,
1318 state.clone(),
1319 consensus_adapter.clone(),
1320 &validator_registry,
1321 )
1322 .await?;
1323
1324 let validator_overload_monitor_handle = if config
1327 .authority_overload_config
1328 .max_load_shedding_percentage
1329 > 0
1330 {
1331 let authority_state = Arc::downgrade(&state);
1332 let overload_config = config.authority_overload_config.clone();
1333 fail_point!("starting_overload_monitor");
1334 Some(spawn_monitored_task!(overload_monitor(
1335 authority_state,
1336 overload_config,
1337 )))
1338 } else {
1339 None
1340 };
1341
1342 Self::start_epoch_specific_validator_components(
1343 &config,
1344 state.clone(),
1345 consensus_adapter,
1346 checkpoint_store,
1347 epoch_store,
1348 state_sync_handle,
1349 randomness_handle,
1350 consensus_manager,
1351 consensus_store_pruner,
1352 accumulator,
1353 backpressure_manager,
1354 validator_server_spawn_handle,
1355 validator_server_handle,
1356 validator_overload_monitor_handle,
1357 checkpoint_metrics,
1358 iota_node_metrics,
1359 iota_tx_validator_metrics,
1360 validator_registry_id,
1361 )
1362 .await
1363 }
1364
1365 async fn start_epoch_specific_validator_components(
1368 config: &NodeConfig,
1369 state: Arc<AuthorityState>,
1370 consensus_adapter: Arc<ConsensusAdapter>,
1371 checkpoint_store: Arc<CheckpointStore>,
1372 epoch_store: Arc<AuthorityPerEpochStore>,
1373 state_sync_handle: state_sync::Handle,
1374 randomness_handle: randomness::Handle,
1375 consensus_manager: ConsensusManager,
1376 consensus_store_pruner: ConsensusStorePruner,
1377 accumulator: Weak<StateAccumulator>,
1378 backpressure_manager: Arc<BackpressureManager>,
1379 validator_server_spawn_handle: SpawnOnce,
1380 validator_server_handle: iota_http::ServerHandle,
1381 validator_overload_monitor_handle: Option<JoinHandle<()>>,
1382 checkpoint_metrics: Arc<CheckpointMetrics>,
1383 iota_node_metrics: Arc<IotaNodeMetrics>,
1384 iota_tx_validator_metrics: Arc<IotaTxValidatorMetrics>,
1385 validator_registry_id: RegistryID,
1386 ) -> Result<ValidatorComponents> {
1387 let checkpoint_service = Self::build_checkpoint_service(
1388 config,
1389 consensus_adapter.clone(),
1390 checkpoint_store.clone(),
1391 epoch_store.clone(),
1392 state.clone(),
1393 state_sync_handle,
1394 accumulator,
1395 checkpoint_metrics.clone(),
1396 );
1397
1398 let low_scoring_authorities = Arc::new(ArcSwap::new(Arc::new(HashMap::new())));
1403
1404 consensus_adapter.swap_low_scoring_authorities(low_scoring_authorities.clone());
1405
1406 let randomness_manager = RandomnessManager::try_new(
1407 Arc::downgrade(&epoch_store),
1408 Box::new(consensus_adapter.clone()),
1409 randomness_handle,
1410 config.authority_key_pair(),
1411 )
1412 .await;
1413 if let Some(randomness_manager) = randomness_manager {
1414 epoch_store
1415 .set_randomness_manager(randomness_manager)
1416 .await?;
1417 }
1418
1419 let consensus_handler_initializer = ConsensusHandlerInitializer::new(
1420 state.clone(),
1421 checkpoint_service.clone(),
1422 epoch_store.clone(),
1423 low_scoring_authorities,
1424 backpressure_manager,
1425 );
1426
1427 info!("Starting consensus manager");
1428
1429 consensus_manager
1430 .start(
1431 config,
1432 epoch_store.clone(),
1433 consensus_handler_initializer,
1434 IotaTxValidator::new(
1435 epoch_store.clone(),
1436 checkpoint_service.clone(),
1437 state.transaction_manager().clone(),
1438 iota_tx_validator_metrics.clone(),
1439 ),
1440 )
1441 .await;
1442
1443 if !epoch_store
1444 .epoch_start_config()
1445 .is_data_quarantine_active_from_beginning_of_epoch()
1446 {
1447 checkpoint_store
1448 .reexecute_local_checkpoints(&state, &epoch_store)
1449 .await;
1450 }
1451
1452 info!("Spawning checkpoint service");
1453 let checkpoint_service_tasks = checkpoint_service.spawn().await;
1454
1455 if epoch_store.authenticator_state_enabled() {
1456 Self::start_jwk_updater(
1457 config,
1458 iota_node_metrics,
1459 state.name,
1460 epoch_store.clone(),
1461 consensus_adapter.clone(),
1462 );
1463 }
1464
1465 Ok(ValidatorComponents {
1466 validator_server_spawn_handle,
1467 validator_server_handle,
1468 validator_overload_monitor_handle,
1469 consensus_manager,
1470 consensus_store_pruner,
1471 consensus_adapter,
1472 checkpoint_service_tasks,
1473 checkpoint_metrics,
1474 iota_tx_validator_metrics,
1475 validator_registry_id,
1476 })
1477 }
1478
1479 fn build_checkpoint_service(
1486 config: &NodeConfig,
1487 consensus_adapter: Arc<ConsensusAdapter>,
1488 checkpoint_store: Arc<CheckpointStore>,
1489 epoch_store: Arc<AuthorityPerEpochStore>,
1490 state: Arc<AuthorityState>,
1491 state_sync_handle: state_sync::Handle,
1492 accumulator: Weak<StateAccumulator>,
1493 checkpoint_metrics: Arc<CheckpointMetrics>,
1494 ) -> Arc<CheckpointService> {
1495 let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
1496 let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
1497
1498 debug!(
1499 "Starting checkpoint service with epoch start timestamp {}
1500 and epoch duration {}",
1501 epoch_start_timestamp_ms, epoch_duration_ms
1502 );
1503
1504 let checkpoint_output = Box::new(SubmitCheckpointToConsensus {
1505 sender: consensus_adapter,
1506 signer: state.secret.clone(),
1507 authority: config.authority_public_key(),
1508 next_reconfiguration_timestamp_ms: epoch_start_timestamp_ms
1509 .checked_add(epoch_duration_ms)
1510 .expect("Overflow calculating next_reconfiguration_timestamp_ms"),
1511 metrics: checkpoint_metrics.clone(),
1512 });
1513
1514 let certified_checkpoint_output = SendCheckpointToStateSync::new(state_sync_handle);
1515 let max_tx_per_checkpoint = max_tx_per_checkpoint(epoch_store.protocol_config());
1516 let max_checkpoint_size_bytes =
1517 epoch_store.protocol_config().max_checkpoint_size_bytes() as usize;
1518
1519 CheckpointService::build(
1520 state.clone(),
1521 checkpoint_store,
1522 epoch_store,
1523 state.get_transaction_cache_reader().clone(),
1524 accumulator,
1525 checkpoint_output,
1526 Box::new(certified_checkpoint_output),
1527 checkpoint_metrics,
1528 max_tx_per_checkpoint,
1529 max_checkpoint_size_bytes,
1530 )
1531 }
1532
1533 fn construct_consensus_adapter(
1534 committee: &Committee,
1535 consensus_config: &ConsensusConfig,
1536 authority: AuthorityName,
1537 connection_monitor_status: Arc<ConnectionMonitorStatus>,
1538 prometheus_registry: &Registry,
1539 consensus_client: Arc<dyn ConsensusClient>,
1540 checkpoint_store: Arc<CheckpointStore>,
1541 ) -> ConsensusAdapter {
1542 let ca_metrics = ConsensusAdapterMetrics::new(prometheus_registry);
1543 ConsensusAdapter::new(
1547 consensus_client,
1548 checkpoint_store,
1549 authority,
1550 connection_monitor_status,
1551 consensus_config.max_pending_transactions(),
1552 consensus_config.max_pending_transactions() * 2 / committee.num_members(),
1553 consensus_config.max_submit_position,
1554 consensus_config.submit_delay_step_override(),
1555 ca_metrics,
1556 )
1557 }
1558
1559 async fn start_grpc_validator_service(
1560 config: &NodeConfig,
1561 state: Arc<AuthorityState>,
1562 consensus_adapter: Arc<ConsensusAdapter>,
1563 prometheus_registry: &Registry,
1564 ) -> Result<(SpawnOnce, iota_http::ServerHandle)> {
1565 let validator_service = ValidatorService::new(
1566 state.clone(),
1567 consensus_adapter,
1568 Arc::new(ValidatorServiceMetrics::new(prometheus_registry)),
1569 TrafficControllerMetrics::new(prometheus_registry),
1570 config.policy_config.clone(),
1571 config.firewall_config.clone(),
1572 );
1573
1574 let mut server_conf = iota_network_stack::config::Config::new();
1575 server_conf.global_concurrency_limit = config.grpc_concurrency_limit;
1576 server_conf.load_shed = config.grpc_load_shed;
1577 let mut server_builder =
1578 ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry));
1579
1580 server_builder = server_builder.add_service(ValidatorServer::new(validator_service));
1581
1582 let tls_config = iota_tls::create_rustls_server_config(
1583 config.network_key_pair().copy().private(),
1584 IOTA_TLS_SERVER_NAME.to_string(),
1585 );
1586
1587 let server = server_builder
1588 .bind(config.network_address(), Some(tls_config))
1589 .await
1590 .map_err(|err| anyhow!(err.to_string()))?;
1591
1592 let local_addr = server.local_addr();
1593 info!("Listening to traffic on {local_addr}");
1594
1595 let server_handle = server.handle().clone();
1597
1598 Ok((
1599 SpawnOnce::new(server.serve().map_err(Into::into)),
1600 server_handle,
1601 ))
1602 }
1603
1604 async fn reexecute_pending_consensus_certs(
1622 epoch_store: &Arc<AuthorityPerEpochStore>,
1623 state: &Arc<AuthorityState>,
1624 ) {
1625 let mut pending_consensus_certificates = Vec::new();
1626 let mut additional_certs = Vec::new();
1627
1628 for tx in epoch_store.get_all_pending_consensus_transactions() {
1629 match tx.kind {
1630 ConsensusTransactionKind::CertifiedTransaction(tx)
1633 if !tx.contains_shared_object() =>
1634 {
1635 let tx = *tx;
1636 let tx = VerifiedExecutableTransaction::new_from_certificate(
1639 VerifiedCertificate::new_unchecked(tx),
1640 );
1641 if let Some(fx_digest) = epoch_store
1644 .get_signed_effects_digest(tx.digest())
1645 .expect("db error")
1646 {
1647 pending_consensus_certificates.push((tx, fx_digest));
1648 } else {
1649 additional_certs.push(tx);
1650 }
1651 }
1652 _ => (),
1653 }
1654 }
1655
1656 let digests = pending_consensus_certificates
1657 .iter()
1658 .map(|(tx, _)| *tx.digest())
1659 .collect::<Vec<_>>();
1660
1661 info!(
1662 "reexecuting {} pending consensus certificates: {:?}",
1663 digests.len(),
1664 digests
1665 );
1666
1667 state.enqueue_with_expected_effects_digest(pending_consensus_certificates, epoch_store);
1668 state.enqueue_transactions_for_execution(additional_certs, epoch_store);
1669
1670 let timeout = if cfg!(msim) { 120 } else { 60 };
1676 if tokio::time::timeout(
1677 std::time::Duration::from_secs(timeout),
1678 state
1679 .get_transaction_cache_reader()
1680 .try_notify_read_executed_effects_digests(&digests),
1681 )
1682 .await
1683 .is_err()
1684 {
1685 if let Ok(executed_effects_digests) = state
1687 .get_transaction_cache_reader()
1688 .try_multi_get_executed_effects_digests(&digests)
1689 {
1690 let pending_digests = digests
1691 .iter()
1692 .zip(executed_effects_digests.iter())
1693 .filter_map(|(digest, executed_effects_digest)| {
1694 if executed_effects_digest.is_none() {
1695 Some(digest)
1696 } else {
1697 None
1698 }
1699 })
1700 .collect::<Vec<_>>();
1701 debug_fatal!(
1702 "Timed out waiting for effects digests to be executed: {:?}",
1703 pending_digests
1704 );
1705 } else {
1706 debug_fatal!(
1707 "Timed out waiting for effects digests to be executed, digests not found"
1708 );
1709 }
1710 }
1711 }
1712
1713 pub fn state(&self) -> Arc<AuthorityState> {
1714 self.state.clone()
1715 }
1716
1717 pub fn reference_gas_price_for_testing(&self) -> Result<u64, anyhow::Error> {
1719 self.state.reference_gas_price_for_testing()
1720 }
1721
1722 pub fn clone_committee_store(&self) -> Arc<CommitteeStore> {
1723 self.state.committee_store().clone()
1724 }
1725
1726 pub fn clone_authority_aggregator(
1736 &self,
1737 ) -> Option<Arc<AuthorityAggregator<NetworkAuthorityClient>>> {
1738 self.transaction_orchestrator
1739 .as_ref()
1740 .map(|to| to.clone_authority_aggregator())
1741 }
1742
1743 pub fn transaction_orchestrator(
1744 &self,
1745 ) -> Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>> {
1746 self.transaction_orchestrator.clone()
1747 }
1748
1749 pub fn subscribe_to_transaction_orchestrator_effects(
1750 &self,
1751 ) -> Result<tokio::sync::broadcast::Receiver<QuorumDriverEffectsQueueResult>> {
1752 self.transaction_orchestrator
1753 .as_ref()
1754 .map(|to| to.subscribe_to_effects_queue())
1755 .ok_or_else(|| anyhow::anyhow!("Transaction Orchestrator is not enabled in this node."))
1756 }
1757
1758 pub async fn monitor_reconfiguration(
1764 self: Arc<Self>,
1765 mut epoch_store: Arc<AuthorityPerEpochStore>,
1766 ) -> Result<()> {
1767 let checkpoint_executor_metrics =
1768 CheckpointExecutorMetrics::new(&self.registry_service.default_registry());
1769
1770 loop {
1771 let mut accumulator_guard = self.accumulator.lock().await;
1772 let accumulator = accumulator_guard.take().unwrap();
1773 info!(
1774 "Creating checkpoint executor for epoch {}",
1775 epoch_store.epoch()
1776 );
1777
1778 let summary_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1780 guard.as_ref().map(|handle| {
1781 let tx = handle.checkpoint_summary_broadcaster().clone();
1782 Box::new(move |summary: &CertifiedCheckpointSummary| {
1783 tx.send_traced(summary);
1784 }) as Box<dyn Fn(&CertifiedCheckpointSummary) + Send + Sync>
1785 })
1786 } else {
1787 None
1788 };
1789 let data_sender = if let Ok(guard) = self.grpc_server_handle.try_lock() {
1790 guard.as_ref().map(|handle| {
1791 let tx = handle.checkpoint_data_broadcaster().clone();
1792 Box::new(move |data: &CheckpointData| {
1793 tx.send_traced(data);
1794 }) as Box<dyn Fn(&CheckpointData) + Send + Sync>
1795 })
1796 } else {
1797 None
1798 };
1799
1800 let checkpoint_executor = CheckpointExecutor::new(
1801 epoch_store.clone(),
1802 self.checkpoint_store.clone(),
1803 self.state.clone(),
1804 accumulator.clone(),
1805 self.backpressure_manager.clone(),
1806 self.config.checkpoint_executor_config.clone(),
1807 checkpoint_executor_metrics.clone(),
1808 summary_sender,
1809 data_sender,
1810 );
1811
1812 let run_with_range = self.config.run_with_range;
1813
1814 let cur_epoch_store = self.state.load_epoch_store_one_call_per_task();
1815
1816 if let Some(components) = &*self.validator_components.lock().await {
1818 tokio::time::sleep(Duration::from_millis(1)).await;
1820
1821 let config = cur_epoch_store.protocol_config();
1822 let binary_config = to_binary_config(config);
1823 let transaction = ConsensusTransaction::new_capability_notification_v1(
1824 AuthorityCapabilitiesV1::new(
1825 self.state.name,
1826 cur_epoch_store.get_chain_identifier().chain(),
1827 self.config
1828 .supported_protocol_versions
1829 .expect("Supported versions should be populated")
1830 .truncate_below(config.version),
1832 self.state
1833 .get_available_system_packages(&binary_config)
1834 .await,
1835 ),
1836 );
1837 info!(?transaction, "submitting capabilities to consensus");
1838 components
1839 .consensus_adapter
1840 .submit(transaction, None, &cur_epoch_store)?;
1841 } else if self.state.is_active_validator(&cur_epoch_store)
1842 && cur_epoch_store
1843 .protocol_config()
1844 .track_non_committee_eligible_validators()
1845 {
1846 let epoch_store = cur_epoch_store.clone();
1850 let node_clone = self.clone();
1851 spawn_monitored_task!(epoch_store.clone().within_alive_epoch(async move {
1852 node_clone
1853 .send_signed_capability_notification_to_committee_with_retry(&epoch_store)
1854 .await;
1855 }));
1856 }
1857
1858 let stop_condition = checkpoint_executor.run_epoch(run_with_range).await;
1859
1860 if stop_condition == StopReason::RunWithRangeCondition {
1861 IotaNode::shutdown(&self).await;
1862 self.shutdown_channel_tx
1863 .send(run_with_range)
1864 .expect("RunWithRangeCondition met but failed to send shutdown message");
1865 return Ok(());
1866 }
1867
1868 let latest_system_state = self
1870 .state
1871 .get_object_cache_reader()
1872 .try_get_iota_system_state_object_unsafe()
1873 .expect("Read IOTA System State object cannot fail");
1874
1875 #[cfg(msim)]
1876 if !self
1877 .sim_state
1878 .sim_safe_mode_expected
1879 .load(Ordering::Relaxed)
1880 {
1881 debug_assert!(!latest_system_state.safe_mode());
1882 }
1883
1884 #[cfg(not(msim))]
1885 debug_assert!(!latest_system_state.safe_mode());
1886
1887 if let Err(err) = self.end_of_epoch_channel.send(latest_system_state.clone()) {
1888 if self.state.is_fullnode(&cur_epoch_store) {
1889 warn!(
1890 "Failed to send end of epoch notification to subscriber: {:?}",
1891 err
1892 );
1893 }
1894 }
1895
1896 cur_epoch_store.record_is_safe_mode_metric(latest_system_state.safe_mode());
1897 let new_epoch_start_state = latest_system_state.into_epoch_start_state();
1898
1899 self.auth_agg.store(Arc::new(
1900 self.auth_agg
1901 .load()
1902 .recreate_with_new_epoch_start_state(&new_epoch_start_state),
1903 ));
1904
1905 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
1906 let next_epoch = next_epoch_committee.epoch();
1907 assert_eq!(cur_epoch_store.epoch() + 1, next_epoch);
1908
1909 info!(
1910 next_epoch,
1911 "Finished executing all checkpoints in epoch. About to reconfigure the system."
1912 );
1913
1914 fail_point_async!("reconfig_delay");
1915
1916 let authority_names_to_peer_ids =
1921 new_epoch_start_state.get_authority_names_to_peer_ids();
1922 self.connection_monitor_status
1923 .update_mapping_for_epoch(authority_names_to_peer_ids);
1924
1925 cur_epoch_store.record_epoch_reconfig_start_time_metric();
1926
1927 send_trusted_peer_change(
1928 &self.config,
1929 &self.trusted_peer_change_tx,
1930 &new_epoch_start_state,
1931 );
1932
1933 let mut validator_components_lock_guard = self.validator_components.lock().await;
1934
1935 let new_epoch_store = self
1939 .reconfigure_state(
1940 &self.state,
1941 &cur_epoch_store,
1942 next_epoch_committee.clone(),
1943 new_epoch_start_state,
1944 accumulator.clone(),
1945 )
1946 .await?;
1947
1948 let new_validator_components = if let Some(ValidatorComponents {
1949 validator_server_spawn_handle,
1950 validator_server_handle,
1951 validator_overload_monitor_handle,
1952 consensus_manager,
1953 consensus_store_pruner,
1954 consensus_adapter,
1955 mut checkpoint_service_tasks,
1956 checkpoint_metrics,
1957 iota_tx_validator_metrics,
1958 validator_registry_id,
1959 }) = validator_components_lock_guard.take()
1960 {
1961 info!("Reconfiguring the validator.");
1962 checkpoint_service_tasks.abort_all();
1967 while let Some(result) = checkpoint_service_tasks.join_next().await {
1968 if let Err(err) = result {
1969 if err.is_panic() {
1970 std::panic::resume_unwind(err.into_panic());
1971 }
1972 warn!("Error in checkpoint service task: {:?}", err);
1973 }
1974 }
1975 info!("Checkpoint service has shut down.");
1976
1977 consensus_manager.shutdown().await;
1978 info!("Consensus has shut down.");
1979
1980 info!("Epoch store finished reconfiguration.");
1981
1982 let accumulator_metrics = Arc::into_inner(accumulator)
1985 .expect("Accumulator should have no other references at this point")
1986 .metrics();
1987 let new_accumulator = Arc::new(StateAccumulator::new(
1988 self.state.get_accumulator_store().clone(),
1989 accumulator_metrics,
1990 ));
1991 let weak_accumulator = Arc::downgrade(&new_accumulator);
1992 *accumulator_guard = Some(new_accumulator);
1993
1994 consensus_store_pruner.prune(next_epoch).await;
1995
1996 if self.state.is_committee_validator(&new_epoch_store) {
1997 Some(
1999 Self::start_epoch_specific_validator_components(
2000 &self.config,
2001 self.state.clone(),
2002 consensus_adapter,
2003 self.checkpoint_store.clone(),
2004 new_epoch_store.clone(),
2005 self.state_sync_handle.clone(),
2006 self.randomness_handle.clone(),
2007 consensus_manager,
2008 consensus_store_pruner,
2009 weak_accumulator,
2010 self.backpressure_manager.clone(),
2011 validator_server_spawn_handle,
2012 validator_server_handle,
2013 validator_overload_monitor_handle,
2014 checkpoint_metrics,
2015 self.metrics.clone(),
2016 iota_tx_validator_metrics,
2017 validator_registry_id,
2018 )
2019 .await?,
2020 )
2021 } else {
2022 info!("This node is no longer a validator after reconfiguration");
2023 if self.registry_service.remove(validator_registry_id) {
2024 debug!("Removed validator metrics registry");
2025 } else {
2026 warn!("Failed to remove validator metrics registry");
2027 }
2028 validator_server_handle.trigger_shutdown();
2029 debug!("Validator grpc server shutdown triggered");
2030
2031 None
2032 }
2033 } else {
2034 let accumulator_metrics = Arc::into_inner(accumulator)
2037 .expect("Accumulator should have no other references at this point")
2038 .metrics();
2039 let new_accumulator = Arc::new(StateAccumulator::new(
2040 self.state.get_accumulator_store().clone(),
2041 accumulator_metrics,
2042 ));
2043 let weak_accumulator = Arc::downgrade(&new_accumulator);
2044 *accumulator_guard = Some(new_accumulator);
2045
2046 if self.state.is_committee_validator(&new_epoch_store) {
2047 info!("Promoting the node from fullnode to validator, starting grpc server");
2048
2049 let mut components = Self::construct_validator_components(
2050 self.config.clone(),
2051 self.state.clone(),
2052 Arc::new(next_epoch_committee.clone()),
2053 new_epoch_store.clone(),
2054 self.checkpoint_store.clone(),
2055 self.state_sync_handle.clone(),
2056 self.randomness_handle.clone(),
2057 weak_accumulator,
2058 self.backpressure_manager.clone(),
2059 self.connection_monitor_status.clone(),
2060 &self.registry_service,
2061 self.metrics.clone(),
2062 )
2063 .await?;
2064
2065 components.validator_server_spawn_handle =
2066 components.validator_server_spawn_handle.start();
2067
2068 Some(components)
2069 } else {
2070 None
2071 }
2072 };
2073 *validator_components_lock_guard = new_validator_components;
2074
2075 cur_epoch_store.release_db_handles();
2078
2079 if cfg!(msim)
2080 && !matches!(
2081 self.config
2082 .authority_store_pruning_config
2083 .num_epochs_to_retain_for_checkpoints(),
2084 None | Some(u64::MAX) | Some(0)
2085 )
2086 {
2087 self.state
2088 .prune_checkpoints_for_eligible_epochs_for_testing(
2089 self.config.clone(),
2090 iota_core::authority::authority_store_pruner::AuthorityStorePruningMetrics::new_for_test(),
2091 )
2092 .await?;
2093 }
2094
2095 epoch_store = new_epoch_store;
2096 info!("Reconfiguration finished");
2097 }
2098 }
2099
2100 async fn shutdown(&self) {
2101 if let Some(validator_components) = &*self.validator_components.lock().await {
2102 validator_components.consensus_manager.shutdown().await;
2103 }
2104
2105 if let Some(grpc_handle) = self.grpc_server_handle.lock().await.take() {
2107 info!("Shutting down gRPC server");
2108 if let Err(e) = grpc_handle.shutdown().await {
2109 warn!("Failed to gracefully shutdown gRPC server: {e}");
2110 }
2111 }
2112 }
2113
2114 async fn reconfigure_state(
2117 &self,
2118 state: &Arc<AuthorityState>,
2119 cur_epoch_store: &AuthorityPerEpochStore,
2120 next_epoch_committee: Committee,
2121 next_epoch_start_system_state: EpochStartSystemState,
2122 accumulator: Arc<StateAccumulator>,
2123 ) -> IotaResult<Arc<AuthorityPerEpochStore>> {
2124 let next_epoch = next_epoch_committee.epoch();
2125
2126 let last_checkpoint = self
2127 .checkpoint_store
2128 .get_epoch_last_checkpoint(cur_epoch_store.epoch())
2129 .expect("Error loading last checkpoint for current epoch")
2130 .expect("Could not load last checkpoint for current epoch");
2131 let epoch_supply_change = last_checkpoint
2132 .end_of_epoch_data
2133 .as_ref()
2134 .ok_or_else(|| {
2135 IotaError::from("last checkpoint in epoch should contain end of epoch data")
2136 })?
2137 .epoch_supply_change;
2138
2139 let last_checkpoint_seq = *last_checkpoint.sequence_number();
2140
2141 assert_eq!(
2142 Some(last_checkpoint_seq),
2143 self.checkpoint_store
2144 .get_highest_executed_checkpoint_seq_number()
2145 .expect("Error loading highest executed checkpoint sequence number")
2146 );
2147
2148 let epoch_start_configuration = EpochStartConfiguration::new(
2149 next_epoch_start_system_state,
2150 *last_checkpoint.digest(),
2151 state.get_object_store().as_ref(),
2152 EpochFlag::default_flags_for_new_epoch(&state.config),
2153 )
2154 .expect("EpochStartConfiguration construction cannot fail");
2155
2156 let new_epoch_store = self
2157 .state
2158 .reconfigure(
2159 cur_epoch_store,
2160 self.config.supported_protocol_versions.unwrap(),
2161 next_epoch_committee,
2162 epoch_start_configuration,
2163 accumulator,
2164 &self.config.expensive_safety_check_config,
2165 epoch_supply_change,
2166 last_checkpoint_seq,
2167 )
2168 .await
2169 .expect("Reconfigure authority state cannot fail");
2170 info!(next_epoch, "Node State has been reconfigured");
2171 assert_eq!(next_epoch, new_epoch_store.epoch());
2172 self.state.get_reconfig_api().update_epoch_flags_metrics(
2173 cur_epoch_store.epoch_start_config().flags(),
2174 new_epoch_store.epoch_start_config().flags(),
2175 );
2176
2177 Ok(new_epoch_store)
2178 }
2179
2180 pub fn get_config(&self) -> &NodeConfig {
2181 &self.config
2182 }
2183
2184 async fn execute_transaction_immediately_at_zero_epoch(
2185 state: &Arc<AuthorityState>,
2186 epoch_store: &Arc<AuthorityPerEpochStore>,
2187 tx: &Transaction,
2188 span: tracing::Span,
2189 ) {
2190 let _guard = span.enter();
2191 let transaction =
2192 iota_types::executable_transaction::VerifiedExecutableTransaction::new_unchecked(
2193 iota_types::executable_transaction::ExecutableTransaction::new_from_data_and_sig(
2194 tx.data().clone(),
2195 iota_types::executable_transaction::CertificateProof::Checkpoint(0, 0),
2196 ),
2197 );
2198 state
2199 .try_execute_immediately(&transaction, None, epoch_store)
2200 .unwrap();
2201 }
2202
2203 pub fn randomness_handle(&self) -> randomness::Handle {
2204 self.randomness_handle.clone()
2205 }
2206
2207 async fn send_signed_capability_notification_to_committee_with_retry(
2213 &self,
2214 epoch_store: &Arc<AuthorityPerEpochStore>,
2215 ) {
2216 const INITIAL_RETRY_INTERVAL_SECS: u64 = 5;
2217 const RETRY_INTERVAL_INCREMENT_SECS: u64 = 5;
2218 const MAX_RETRY_INTERVAL_SECS: u64 = 300; let config = epoch_store.protocol_config();
2222 let binary_config = to_binary_config(config);
2223
2224 let capabilities = AuthorityCapabilitiesV1::new(
2226 self.state.name,
2227 epoch_store.get_chain_identifier().chain(),
2228 self.config
2229 .supported_protocol_versions
2230 .expect("Supported versions should be populated")
2231 .truncate_below(config.version),
2232 self.state
2233 .get_available_system_packages(&binary_config)
2234 .await,
2235 );
2236
2237 let signature = AuthoritySignature::new_secure(
2239 &IntentMessage::new(
2240 Intent::iota_app(IntentScope::AuthorityCapabilities),
2241 &capabilities,
2242 ),
2243 &epoch_store.epoch(),
2244 self.config.authority_key_pair(),
2245 );
2246
2247 let request = HandleCapabilityNotificationRequestV1 {
2248 message: SignedAuthorityCapabilitiesV1::new_from_data_and_sig(capabilities, signature),
2249 };
2250
2251 let mut retry_interval = Duration::from_secs(INITIAL_RETRY_INTERVAL_SECS);
2252
2253 loop {
2254 let auth_agg = self.auth_agg.load();
2255 match auth_agg
2256 .send_capability_notification_to_quorum(request.clone())
2257 .await
2258 {
2259 Ok(_) => {
2260 info!("Successfully sent capability notification to committee");
2261 break;
2262 }
2263 Err(err) => {
2264 match &err {
2265 AggregatorSendCapabilityNotificationError::RetryableNotification {
2266 errors,
2267 } => {
2268 warn!(
2269 "Failed to send capability notification to committee (retryable error), will retry in {:?}: {:?}",
2270 retry_interval, errors
2271 );
2272 }
2273 AggregatorSendCapabilityNotificationError::NonRetryableNotification {
2274 errors,
2275 } => {
2276 error!(
2277 "Failed to send capability notification to committee (non-retryable error): {:?}",
2278 errors
2279 );
2280 break;
2281 }
2282 };
2283
2284 tokio::time::sleep(retry_interval).await;
2286
2287 retry_interval = std::cmp::min(
2289 retry_interval + Duration::from_secs(RETRY_INTERVAL_INCREMENT_SECS),
2290 Duration::from_secs(MAX_RETRY_INTERVAL_SECS),
2291 );
2292 }
2293 }
2294 }
2295 }
2296}
2297
2298#[cfg(not(msim))]
2299impl IotaNode {
2300 async fn fetch_jwks(
2301 _authority: AuthorityName,
2302 provider: &OIDCProvider,
2303 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2304 use fastcrypto_zkp::bn254::zk_login::fetch_jwks;
2305 let client = reqwest::Client::new();
2306 fetch_jwks(provider, &client)
2307 .await
2308 .map_err(|_| IotaError::JWKRetrieval)
2309 }
2310}
2311
2312#[cfg(msim)]
2313impl IotaNode {
2314 pub fn get_sim_node_id(&self) -> iota_simulator::task::NodeId {
2315 self.sim_state.sim_node.id()
2316 }
2317
2318 pub fn set_safe_mode_expected(&self, new_value: bool) {
2319 info!("Setting safe mode expected to {}", new_value);
2320 self.sim_state
2321 .sim_safe_mode_expected
2322 .store(new_value, Ordering::Relaxed);
2323 }
2324
2325 async fn fetch_jwks(
2326 authority: AuthorityName,
2327 provider: &OIDCProvider,
2328 ) -> IotaResult<Vec<(JwkId, JWK)>> {
2329 get_jwk_injector()(authority, provider)
2330 }
2331}
2332
2333enum SpawnOnce {
2334 Unstarted(Mutex<BoxFuture<'static, Result<()>>>),
2336 #[allow(unused)]
2337 Started(JoinHandle<Result<()>>),
2338}
2339
2340impl SpawnOnce {
2341 pub fn new(future: impl Future<Output = Result<()>> + Send + 'static) -> Self {
2342 Self::Unstarted(Mutex::new(Box::pin(future)))
2343 }
2344
2345 pub fn start(self) -> Self {
2346 match self {
2347 Self::Unstarted(future) => {
2348 let future = future.into_inner();
2349 let handle = tokio::spawn(future);
2350 Self::Started(handle)
2351 }
2352 Self::Started(_) => self,
2353 }
2354 }
2355}
2356
2357fn send_trusted_peer_change(
2360 config: &NodeConfig,
2361 sender: &watch::Sender<TrustedPeerChangeEvent>,
2362 new_epoch_start_state: &EpochStartSystemState,
2363) {
2364 let new_committee =
2365 new_epoch_start_state.get_validator_as_p2p_peers(config.authority_public_key());
2366
2367 sender.send_modify(|event| {
2368 core::mem::swap(&mut event.new_committee, &mut event.old_committee);
2369 event.new_committee = new_committee;
2370 })
2371}
2372
2373fn build_kv_store(
2374 state: &Arc<AuthorityState>,
2375 config: &NodeConfig,
2376 registry: &Registry,
2377) -> Result<Arc<TransactionKeyValueStore>> {
2378 let metrics = KeyValueStoreMetrics::new(registry);
2379 let db_store = TransactionKeyValueStore::new("rocksdb", metrics.clone(), state.clone());
2380
2381 let base_url = &config.transaction_kv_store_read_config.base_url;
2382
2383 if base_url.is_empty() {
2384 info!("no http kv store url provided, using local db only");
2385 return Ok(Arc::new(db_store));
2386 }
2387
2388 base_url.parse::<url::Url>().tap_err(|e| {
2389 error!(
2390 "failed to parse config.transaction_kv_store_config.base_url ({:?}) as url: {}",
2391 base_url, e
2392 )
2393 })?;
2394
2395 let http_store = HttpKVStore::new_kv(
2396 base_url,
2397 config.transaction_kv_store_read_config.cache_size,
2398 metrics.clone(),
2399 )?;
2400 info!("using local key-value store with fallback to http key-value store");
2401 Ok(Arc::new(FallbackTransactionKVStore::new_kv(
2402 db_store,
2403 http_store,
2404 metrics,
2405 "json_rpc_fallback",
2406 )))
2407}
2408
2409async fn build_grpc_server(
2424 config: &NodeConfig,
2425 state: Arc<AuthorityState>,
2426 state_sync_store: RocksDbStore,
2427) -> Result<Option<GrpcServerHandle>> {
2428 if config.consensus_config().is_some() || !config.enable_grpc_api {
2430 return Ok(None);
2431 }
2432
2433 let Some(grpc_config) = &config.grpc_api_config else {
2434 return Err(anyhow!("gRPC API is enabled but no configuration provided"));
2435 };
2436
2437 let rest_read_store = Arc::new(RestReadStore::new(state.clone(), state_sync_store));
2438
2439 let shutdown_token = CancellationToken::new();
2441
2442 let grpc_reader = Arc::new(GrpcReader::from_rest_state_reader(rest_read_store));
2444
2445 let event_subscriber =
2447 state.subscription_handler.clone() as Arc<dyn iota_grpc_server::EventSubscriber>;
2448
2449 let handle = start_grpc_server(
2452 grpc_reader,
2453 event_subscriber,
2454 grpc_config.clone(),
2455 shutdown_token,
2456 )
2457 .await?;
2458
2459 Ok(Some(handle))
2460}
2461
2462pub async fn build_http_server(
2479 state: Arc<AuthorityState>,
2480 store: RocksDbStore,
2481 transaction_orchestrator: &Option<Arc<TransactionOrchestrator<NetworkAuthorityClient>>>,
2482 config: &NodeConfig,
2483 prometheus_registry: &Registry,
2484 _custom_runtime: Option<Handle>,
2485 software_version: &'static str,
2486) -> Result<Option<iota_http::ServerHandle>> {
2487 if config.consensus_config().is_some() {
2489 return Ok(None);
2490 }
2491
2492 let mut router = axum::Router::new();
2493
2494 let json_rpc_router = {
2495 let mut server = JsonRpcServerBuilder::new(
2496 env!("CARGO_PKG_VERSION"),
2497 prometheus_registry,
2498 config.policy_config.clone(),
2499 config.firewall_config.clone(),
2500 );
2501
2502 let kv_store = build_kv_store(&state, config, prometheus_registry)?;
2503
2504 let metrics = Arc::new(JsonRpcMetrics::new(prometheus_registry));
2505 server.register_module(ReadApi::new(
2506 state.clone(),
2507 kv_store.clone(),
2508 metrics.clone(),
2509 ))?;
2510 server.register_module(CoinReadApi::new(
2511 state.clone(),
2512 kv_store.clone(),
2513 metrics.clone(),
2514 )?)?;
2515
2516 if config.run_with_range.is_none() {
2519 server.register_module(TransactionBuilderApi::new(state.clone()))?;
2520 }
2521 server.register_module(GovernanceReadApi::new(state.clone(), metrics.clone()))?;
2522
2523 if let Some(transaction_orchestrator) = transaction_orchestrator {
2524 server.register_module(TransactionExecutionApi::new(
2525 state.clone(),
2526 transaction_orchestrator.clone(),
2527 metrics.clone(),
2528 ))?;
2529 }
2530
2531 let iota_names_config = config
2532 .iota_names_config
2533 .clone()
2534 .unwrap_or_else(|| IotaNamesConfig::from_chain(&state.get_chain_identifier().chain()));
2535
2536 server.register_module(IndexerApi::new(
2537 state.clone(),
2538 ReadApi::new(state.clone(), kv_store.clone(), metrics.clone()),
2539 kv_store,
2540 metrics,
2541 iota_names_config,
2542 config.indexer_max_subscriptions,
2543 ))?;
2544 server.register_module(MoveUtils::new(state.clone()))?;
2545
2546 let server_type = config.jsonrpc_server_type();
2547
2548 server.to_router(server_type).await?
2549 };
2550
2551 router = router.merge(json_rpc_router);
2552
2553 if config.enable_rest_api {
2554 let mut rest_service = iota_rest_api::RestService::new(
2555 Arc::new(RestReadStore::new(state.clone(), store)),
2556 software_version,
2557 );
2558
2559 if let Some(config) = config.rest.clone() {
2560 rest_service.with_config(config);
2561 }
2562
2563 rest_service.with_metrics(RestMetrics::new(prometheus_registry));
2564
2565 if let Some(transaction_orchestrator) = transaction_orchestrator {
2566 rest_service.with_executor(transaction_orchestrator.clone())
2567 }
2568
2569 router = router.merge(rest_service.into_router());
2570 }
2571
2572 router = router
2575 .route("/health", axum::routing::get(health_check_handler))
2576 .route_layer(axum::Extension(state));
2577
2578 let layers = ServiceBuilder::new()
2579 .map_request(|mut request: axum::http::Request<_>| {
2580 if let Some(connect_info) = request.extensions().get::<iota_http::ConnectInfo>() {
2581 let axum_connect_info = axum::extract::ConnectInfo(connect_info.remote_addr);
2582 request.extensions_mut().insert(axum_connect_info);
2583 }
2584 request
2585 })
2586 .layer(axum::middleware::from_fn(server_timing_middleware));
2587
2588 router = router.layer(layers);
2589
2590 let handle = iota_http::Builder::new()
2591 .serve(&config.json_rpc_address, router)
2592 .map_err(|e| anyhow::anyhow!("{e}"))?;
2593 info!(local_addr =? handle.local_addr(), "IOTA JSON-RPC server listening on {}", handle.local_addr());
2594
2595 Ok(Some(handle))
2596}
2597
2598#[derive(Debug, serde::Serialize, serde::Deserialize)]
2599pub struct Threshold {
2600 pub threshold_seconds: Option<u32>,
2601}
2602
2603async fn health_check_handler(
2604 axum::extract::Query(Threshold { threshold_seconds }): axum::extract::Query<Threshold>,
2605 axum::Extension(state): axum::Extension<Arc<AuthorityState>>,
2606) -> impl axum::response::IntoResponse {
2607 if let Some(threshold_seconds) = threshold_seconds {
2608 let summary = match state
2610 .get_checkpoint_store()
2611 .get_highest_executed_checkpoint()
2612 {
2613 Ok(Some(summary)) => summary,
2614 Ok(None) => {
2615 warn!("Highest executed checkpoint not found");
2616 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2617 }
2618 Err(err) => {
2619 warn!("Failed to retrieve highest executed checkpoint: {:?}", err);
2620 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2621 }
2622 };
2623
2624 let latest_chain_time = summary.timestamp();
2626 let threshold =
2627 std::time::SystemTime::now() - Duration::from_secs(threshold_seconds as u64);
2628
2629 if latest_chain_time < threshold {
2631 warn!(
2632 ?latest_chain_time,
2633 ?threshold,
2634 "failing health check due to checkpoint lag"
2635 );
2636 return (axum::http::StatusCode::SERVICE_UNAVAILABLE, "down");
2637 }
2638 }
2639 (axum::http::StatusCode::OK, "up")
2641}
2642
2643#[cfg(not(test))]
2644fn max_tx_per_checkpoint(protocol_config: &ProtocolConfig) -> usize {
2645 protocol_config.max_transactions_per_checkpoint() as usize
2646}
2647
2648#[cfg(test)]
2649fn max_tx_per_checkpoint(_: &ProtocolConfig) -> usize {
2650 2
2651}